在做Map Join案例实操的时候,发现处理后的结果数据居然中文乱码。
大致需求是这样的:有两张表希望输出最终数据格式。
我们采用MapJoin的方式实现,将较小的表pd表加载到缓存中,保存到map集合中,然后Mapper中的map方法处理order表,将pid替换为pname
Mapper类
package com.qcln.mr.mapjoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
public class MJMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
private Map<String,String> pMap = new HashMap<>();
private Text k = new Text();
@Override
protected void setup(Context context) throws IOException {
//读取pd到pMap
//开流
URI[] cacheFiles = context.getCacheFiles();
FileSystem fs = FileSystem.get(context.getConfiguration());
FSDataInputStream open = fs.open(new Path(cacheFiles[0]));
//将文件按行处理,读取到pMap中
// 因为上面的字节流没法读一行,所以要转换流,转换成字符流
BufferedReader br = new BufferedReader(new InputStreamReader(open));
String line;
while(StringUtils.isNotEmpty(line = br.readLine())){
String[] split = line.split("\t");
pMap.put(split[0],split[1]);
}
IOUtils.closeStream(br);
}
/**
* 处理order.txt文件
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
// 将pid替换
k.set(fields[0] + "\t" + pMap.get(fields[1])+ "\t" + fields[2]);
context.write(k,NullWritable.get());
}
}
Driver类
package com.qcln.mr.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
public class MJDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Job job = Job.getInstance(new Configuration());
job.setJarByClass(MJDriver.class);
job.setMapperClass(MJMapper.class);
job.setNumReduceTasks(0);
//添加分布式缓存
job.addCacheFile(URI.create(args[0]));
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job,new Path(args[1]));
FileOutputFormat.setOutputPath(job,new Path(args[2]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}
然而运行结果是:
这显然不对,怎么会有乱码呢?按道理输出应该是这样子的啊。
我明明输入文件都是UTF-8的,为啥处理后就乱码了呢。再去检查代码,发现在流转换操作的时候加上字符编码就不会产生乱码,将代码改成如下
BufferedReader br = new BufferedReader(new InputStreamReader(open,"UTF-8"));
再次测试,乱码问题解决。
所以以后在进行流转换操作的时候要格外留意一下字符编码问题,很容易漏掉