在MapReduce中要求被传输的数据能够被序列化 MapReduce中的序列化机制使用的是AVRO,MapReduce对AVRO进行了封装 被传输的类实现Writable接口实现方法即可
在MapReduce中会自动对被传输的key值进行排序,如果使用一个对象
作为输出键,那么要求对象相对应的类应该实现Comparable接口,考虑到
MapReduce中被传输的对象要求被序列化,所以MapReduce中提供了WritableComparable
接口.
如果ComparaTo方法中返回值为0,则MapReduce在进行计算时会把两个键的值放到
一个迭代器中,输出是第二个key是没有记录的。
我们在使用MapReduce对HDFS中的数据进行计算时,有时可能会有分类
输出的场景,MapReduce中提供了Partitioner类,我们在使用时只需继承
该类,然后重写getPartition方法即可,分区编号默认从0开始。
有多少个分区JobTracter就会分配多少个reduceTask。分区数量要在
驱动类中指定,如果不指定分区类与ReduceTask的数量,则使用默认
的HashPartitioner类进行分区,也就是自定义的分区无效。
1.合并是减少数据总量并没有改变计算结果 - Combiner(合并)实际上只是
让MapTask进行提前聚合,最后ReduceTask在进行总的聚合.
2.并不是所有的场景都适合于用Combiner,像求和、求最值、去重等可以使用
combiner,但是例如求平均的场景不适合与使用Combiner
inputFormat用来获取切片并创建流来读取数据,如果不自定义InputFormat 则默认使用TextInputFormt按行读取Mapper获取的key值为当前行数的偏移量, 自定义inputFormat类只需要继承FileInputFormat类自定义读取文件的即可.
案例:对HDFS中的case.txt文件按月份对每个人的收益进行降序排序
package com.jmy.profitcase;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.util.LineReader;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URL;
// 泛型表示读完之后给mapper的数据类型
public class CaseInputFormat extends FileInputFormat<Text, Text> {
@Override
public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
return new CaseReader();
}
}
class CaseReader extends RecordReader<Text,Text>{
private LineReader reader;
private Text key;
private Text value;
// 初始化Reader
// 最终目的就是拿到读取切片的流
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
FileSplit fs = (FileSplit) inputSplit;
Path path = fs.getPath();
// 连接HDFS获取切片
FileSystem fileSystem = FileSystem.get(URI.create(path.toString()), taskAttemptContext.getConfiguration());
// 打开对应的文件获取输入流
InputStream in = fileSystem.open(path);
// 字节流转换为字符流按行读取
reader = new LineReader(in);
}
// 判断有无下一个键值对
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
key = new Text();
value = new Text();
Text tmp = new Text();
// 将读取到的一行数据传入到这个参数中 返回值为读取到字节数
if (reader.readLine(tmp) == ) {
return false;
}
key.set(tmp);
if (reader.readLine(tmp) == ) {
return false;
}
value.set(tmp);
return true;
}
// 获取当前key值
@Override
public Text getCurrentKey() throws IOException, InterruptedException {
return key;
}
// 获取当前value值
@Override
public Text getCurrentValue() throws IOException, InterruptedException {
return value;
}
// 获取进度
@Override
public float getProgress() throws IOException, InterruptedException {
return ;
}
// 关流
@Override
public void close() throws IOException {
if (reader != null)
reader.close();
}
}
package com.jmy.profitcase;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Case implements WritableComparable<Case> {
private int month;
public int getMonth() {
return month;
}
public void setMonth(int month) {
this.month = month;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
private String name;
private int inCome;
private int outCome;
private int countCome;
public int getInCome() {
return inCome;
}
public void setInCome(int inCome) {
this.inCome = inCome;
}
public int getOutCome() {
return outCome;
}
public void setOutCome(int outCome) {
this.outCome = outCome;
}
public int getCountCome() {
return inCome - outCome;
}
public void setCountCome(int countCome) {
this.countCome = countCome;
}
@Override
public int compareTo(Case o) {
return o.getCountCome() - this.getCountCome();
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(month);
dataOutput.writeUTF(name);
dataOutput.writeInt(inCome);
dataOutput.writeInt(outCome);
dataOutput.writeInt(countCome);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
month = dataInput.readInt();
name = dataInput.readUTF();
inCome = dataInput.readInt();
outCome = dataInput.readInt();
countCome = dataInput.readInt();
}
}
package com.jmy.profitcase;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class CaseMapper extends Mapper<Text,Text,Case,NullWritable> {
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String[] s = key.toString().split(" ");
String[] s1 = value.toString().split(" ");
Case aCase = new Case();
aCase.setMonth(Integer.parseInt(s[]));
aCase.setName(s[]);
aCase.setInCome(Integer.parseInt(s1[]));
aCase.setOutCome(Integer.parseInt(s1[]));
context.write(aCase,NullWritable.get());
}
}
package com.jmy.profitcase;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class CaseReducer extends Reducer<Case, NullWritable,Text, IntWritable> {
@Override
protected void reduce(Case key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
context.write(new Text(key.getName()),new IntWritable(key.getCountCome()));
}
}
package com.jmy.profitcase;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class CaseDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 获取job 提交给JobTracter
Job job = Job.getInstance();
// 入口类
job.setJarByClass(CaseDriver.class);
// mapper类
job.setMapperClass(CaseMapper.class);
// reducer类
job.setReducerClass(CaseReducer.class);
// inputformat类
job.setInputFormatClass(CaseInputFormat.class);
// partitioner类 reduceTask数量
job.setPartitionerClass(CasePartition.class);
job.setNumReduceTasks();
// mapper
job.setMapOutputKeyClass(Case.class);
job.setMapOutputValueClass(NullWritable.class);
// reducer
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 处理文件路径
FileInputFormat.addInputPath(job,new Path("hdfs://10.42.99.103:9000/case.txt"));
// 文件输出路径
FileOutputFormat.setOutputPath(job,new Path("hdfs://10.42.99.103:9000/caseResult"));
// 启动
job.waitForCompletion(true);
}
}
1 ls
2850 100
2 ls
3566 200
3 ls
4555 323
1 zs
19000 2000
2 zs
28599 3900
3 zs
34567 5000
1 ww
355 10
2 ww
555 222
3 ww
667 192
1.job会被提交到JobTracker,JobTracker会访问HDFS中的namenode,
获取Block的存储位置以及大小.
2.JobTracker收到文件信息之后会对文件进行切片,默认Block的大小
就是切片的大小,切片的数量决定了mapTask的数量。
3.JobTacker计算完mapTask与ReduceTask的数量之后会把任务提交给TaskTracker
,为了减少集群间节点间的访问,TaskTracker会与datanode部署在同一个节点上
4. JobTracker在分配任务的时候,会尽量将任务分配给有数据的节点
1.客户端将任务提交给JobTracker:hadoop jar ***.jar
2.准备阶段:
a.检查输入路径是否存在,输出路径是否不存在
b.计算切片数量以及分区
c.如果有需要,可以设置分布式缓存存根账户
d.将jar包提交到HDFS上
e.将任务提交到JobTracker上
3. 提交阶段
a. JobTracker会计算MapTask的数量和ReduceTask的数量。
MapTask的数量由切片数量决定,ReduceTask的数量由分区数量决定
b. JobTracker在划分好之后,会等待TaskTracker的心跳。
当收到TaskTracker的心跳的时候,JobTracker就会将MapTask或者
ReduceTask分配给TaskTracker。在分配的时候,MapTask要尽量满足
数据本地化策略;ReduceTask尽量分配到相对空闲的节点上
c. TaskTracker在领取到任务之后,去连接HDFS下载对应的jar包
体现的逻辑移动数据固定的思想
d. TaskTracker会在本节点上开启一个JVM子进程执行MapTask或者
ReduceTask。每一个MapTask或者ReduceTask的执行都会开启一个JVM
子进程
1. Mapper中的map方法在处理完一行数据之后,会将数据写出到缓冲区中
2. 数据在缓冲区中进行分区、排序,如果指定了Combiner,那么数据在缓冲区中还会进行combine合并 - 采取了快速排序的方式
3. 每一个MapTask自带一个缓冲区,缓冲区本质上是一个环形的字节数组。设置为环形的优势在于能够重复利用缓冲区而不用寻址
4. 缓冲区是维系在内存中,缓冲区的默认容量是100M
5. 缓冲区的容量使用达到一定限度(溢写阈值:0.8,目的是为了避免MapTask写出结果的时候产生大量的阻塞)的时候,MapTask会将缓冲区中的数据溢写(spill)到磁盘上,后续的数据可以继续写到缓冲区中
6. 每一次溢写都会产生一个新的溢写文件。单个溢写文件中的数据是分区且排序的,但是所有的溢写文件中的数据是局部有序整体无序
7. 当MapTask将所有数据都处理完成之后,会将所有的溢写文件合并(merge)成一个结果文件(final out)。如果一部分结果在缓冲区中,一部分结果在溢写文件中,这个时候所有的结果会直接合并到最后的final out中。如果没有产生溢写过程,则缓冲区中的数据直接冲刷到final out中
8. 在merge过程中,数据会再次进行分区和排序,所以final out是整体分区且有序。这个过程中的排序使用的是归并排序。如果指定了Combiner,并且溢写文件的个数大于等于3个,那么在merge过程中自动进行combine
1. 每一个ReduceTask启动fetch线程通过get请求去抓取数据
2. 在抓取数据的时候,每一个ReduceTask只抓取当前分区的数据。在抓取到数据的之后,会将数据存储在本地的磁盘上
3. 在抓取完成之后,ReduceTask会将这些小文件进行merge,合并成一个大文件。在合并过程中,数据会再次进行排序,采取的是归并排序
4. 合并完成之后,会将相同的键对应的值放到一个迭代器中,这个过程称之为分组(group),形成的结构就是一个键对应一个迭代器每一个键触发一次reduce方法