MapReduce 是一种编程模型(没有集群的概念,会把任务提交到 yarn 集群上跑),用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。
当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。(MapReduce在企业里几乎不再使用了,稍微了解即可)。
说明:MapReduce不擅长的方面(慢!)
现在MapReduce逐渐被Spark,Flink等框架取代。但是思想很重要,值得学习。更多关于大数据 Hadoop系列的学习文章,请参阅:进击大数据系列,本系列持续更新中。
注意:原则上MapReduce分为两个阶段:Map Task和Reduce Task,但是由于Shuffling阶段很重要,人为划分了Shuffling阶段,这个阶段发生在Map Task和Reduce Task之间,可以理解为由Map Task后半段和Reduce Task前半段组成。
那么就需要有一个东西来决定怎么切分,它就是InputFormat,而切分大小一般由HDFS块大小决定。
针对第四点说明:比如有3个文件,一个300M,第二个50M,第三个50M,那么一共就是切了5个MapTask出来。
针对每一个文件,第一个300M切了3个,第二个50M切了一个,第三个50M切了一个,共5个。
而如果只有一个文件为128M+1KB,那么就只会切分一个,因为切片判断规则为->如果文件小于切片大小1.1倍,就和上一个切片将就放在一起了,这样可以防止过小的切片在执行任务的时候,调度资源的时间超过执行时间的情况。
作业的运行过程主要包括如下几个步骤:
具体作业执行过程的流程图如下图所示:
在MR的代码中调用waitForCompletion()
方法,里面封装了Job.submit()
方法,而Job.submit()
方法里面会创建一个JobSubmmiter对象。当我们在waitForCompletion(true)
时,则waitForCompletion
方法会每秒轮询作业的执行进度,如果发现与上次查询到的状态有差别,则将详情打印到控制台。如果作业执行成功,就显示作业计数器,否则将导致作业失败的记录输出到控制台。
1.当资源管理器通过方法submitApplication方法被调用后,便将请求传给了yarn的调度器,然后调度器在一个节点管理器上分配一个容器(container0)用来启动application master(主类是MRAppMaster)进程。该进程一旦启动就会向resourcemanager注册并报告自己的信息,application master并且可以监控map和reduce的运行状态。因此application master对作业的初始化是通过创建多个薄记对象以保持对作业进度的跟踪。
2.application master接收作业提交时的hdfs临时共享目录中的资源文件,jar,分片信息,配置信息等。并对每一个分片创建一个map对象,以及通过mapreduce.job.reduces参数(作业通过setNumReduceTasks()方法设定)确定reduce的数量。
3.application master会判断是否使用uber(作业与application master在同一个jvm运行,也就是maptask和reducetask运行在同一个节点上)模式运行作业,uber模式运行条件:map数量小于10个,1个reduce,且输入数据小于一个hdfs块。
可以通过参数:
mapreduce.job.ubertask.enable #是否启用uber模式
mapreduce.job.ubertask.maxmaps #ubertask的最大map数
mapreduce.job.ubertask.maxreduces #ubertask的最大reduce数
mapreduce.job.ubertask.maxbytes #ubertask最大作业大小
4.application master调用setupJob方法设置OutputCommiter,FileOutputCommiter为默认值,表示建立做的最终输出目录和任务输出的临时工作空间。
更多关于大数据 Hadoop系列的学习文章,请参阅:进击大数据系列,本系列持续更新中。
1.在application master判断作业不符合uber模式的情况下,那么application master则会向资源管理器为map和reduce任务申请资源容器。
2.首先就是为map任务发出资源申请请求,直到有5%的map任务完成时,才会为reduce任务所需资源申请发出请求。
3.在任务的分配过程中,reduce任务可以在任何的datanode节点运行,但是map任务执行的时候需要考虑到数据本地化的机制,在给任务指定资源的时候每个map和reduce默认为1G内存,可以通过如下参数配置:
mapreduce.map.memory.mb
mapreduce.map.cpu.vcores
mapreduce.reduce.memory.mb
mapreduce.reduce.cpu.vcores
application master提交申请后,资源管理器为其按需分配资源,这时,application master就与节点管理器通信来启动容器。该任务由主类YarnChild的一个java应用程序执行。在运行任务之前,首先将所需的资源进行本地化,包括作业的配置,jar文件等。接下来就是运行map和reduce任务。YarnChild在单独的JVM中运行。
每个作业和它的每个任务都有一个状态:作业或者任务的状态(运行中,成功,失败等),map和reduce的进度,作业计数器的值,状态消息或描述当作业处于正在运行中的时候,客户端可以直接与application master通信,每秒(可以通过参数mapreduce.client.progressmonitor.pollinterval设置)轮询作业的执行状态,进度等信息。
mapreduce确保每个reduce的输入都是按照键值排序的,系统执行排序,将map的输入作为reduce的输入过程称之为shuffle过程。shuffle也是我们优化的重点部分。shuffle流程图如下图所示:
小于分片大小*1.1
)生成一个map作业,然后通过自定的map方法进行自定义的逻辑计算,计算完毕后会写到本地磁盘。mapreduce.map.sort.spill.percent
配置项修改),将启动一个溢写线程将内存缓冲区的内容溢写到磁盘(spill to disk),这个溢写线程是独立的,不影响map向缓冲区写结果的线程,在溢写到磁盘的过程中,map继续输入到缓冲中,如果期间缓冲区被填满,则map写会被阻塞到溢写磁盘过程完成。溢写是通过轮询的方式将缓冲区中的内存写入到本地mapreduce.cluster.local.dir
目录下。在溢写到磁盘之前,我们会知道reduce的数量,然后会根据reduce的数量划分分区,默认根据hashpartition对溢写的数据写入到相对应的分区。在每个分区中,后台线程会根据key进行排序,所以溢写到磁盘的文件是分区且排序的。如果有combiner函数,它在排序后的输出运行,使得map输出更紧凑。减少写到磁盘的数据和传输给reduce的数据。mapreduce.task.io.sort.factor
控制每次可以合并多少个文件。mapreduce.map.output.compress.codec
参数控制。mapreduce.reduce.shuffle.parallelcopies
控制。mapreduce.reduce.shuffle.input.buffer.percent
指定。一旦Reducer所在节点的内存缓冲区达到阀值,或者缓冲区中的文件数达到阀值,则合并溢写到磁盘。更多关于大数据 Hadoop系列的学习文章,请参阅:进击大数据系列,本系列持续更新中。
Wordcount,即统计一大批文件中每个单词出现的次数,经常被拿来当做MapReduce入门案例。主要处理流程如下:
MapReduce将作业的整个运行过程分为两个阶段: Map(映射)阶段和Reduce(归约)阶段。
Mapper负责“分”,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:
Map 阶段由一定数量的 Map Task 组成,包含如下几个步骤:
Reducer负责对map阶段的结果进行汇总。
Reduce阶段由一定数量的Reduce Task组成,包含如下几个步骤:
以Wordcount为例,MapReduce的内部执行过程如下图所示:
外部物理结构如下图所示:
Combiner可以看做是 local reducer,在Mapper计算完成后将相同的key对应的value进行合并( Wordcount例子),如下图所示:
Combiner通常与Reducer逻辑是一样的,使用Combiner有如下好处:
需要注意的是,并不是所有的MapReduce场景都能够使用Combiner,计算结果可以累加的场景一般可以使用,例如Sum,其他的例如求平均值 Average 则不能使用 Combiner。
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>D:\software\hadoop\data\namenode</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>D:\software\hadoop\data\datanode</value>
</property>
<!-- mapred-site.xml-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
<value>org.apache.hadoop.mapred.ShuffleHandler</value>
</property>
set JAVA_HOME=C:\PROGRA~1\Java\jdk1.8.0_271
set HADOOP_LOG_DIR=%HADOOP_LOG_DIR%\log
单个MapReduce分为:Mapper、Reducer 和 Driver。 也可以多个MapReduce串行执行
1.Mapper阶段
(1)用户自定义的Mapper要继承自己的父类
(2)Mapper的输入数据是KV对的形式(KV的类型可自定义)
(3)Mapper中的业务逻辑写在map()方法中
(4)Mapper的输出数据是KV对的形式(KV的类型可自定义)
(5)map()方法(MapTask进程)对每一个<K,V>调用一次
2.Reducer阶段
(2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV
(3)Reducer的业务逻辑写在reduce()方法中
(4)ReduceTask进程对每一组相同k的<k,v>组调用一次reduce()方法
3.Driver阶段
相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是
封装了MapReduce程序相关运行参数的job对象
案例说明:读取一个文件,按空格对文件内容分词,最终按单词排序输出每个单词出现的次数。
<!-- MAVEN包-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>${hadoop.version}</version>
</dependency>
//2.编写好mapreduce代码本地测试(两种常见方式)
//3.上传jar包到服务器运行
hadoop jar apache-hadoop-1.0-SNAPSHOT.jar com.wxl.hadoop.mapReduce.wordCount.WordCountDriver /mapreduce/input/word.txt /mapreduce/output
package com.wxl.hadoop.mapReduce.wordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.util.Date;
public class WordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
///本地测试,正式环境需要注释掉//
Date date = new Date();//保证输出的目录不重复
args = new String[]{"D:\\ideawork\\bigdata\\apache-hadoop\\src\\main\\resources\\mapreduce\\input\\word.txt",
"D:\\ideawork\\bigdata\\apache-hadoop\\src\\main\\resources\\mapreduce\\output\\" + date.getTime()};
// 1 获取配置信息以及获取 job 对象
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
// 2 关联本 Driver 程序的 jar
job.setJarByClass(WordCountDriver.class);
// 3 关联 Mapper 和 Reducer 的 jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置 Mapper 输出的 kv 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出 kv 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交 job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
}
}
//Map阶段
class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();//分割后的单词
IntWritable v = new IntWritable(1);//计数
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 按空格切割
String[] words = line.split(" ");
// 3 输出
for (String word : words) {
//排除空值
if (word.trim() == "" || word.length() == 0) {
continue;
}
System.out.println("map输出>>>" + word);
// 设置输出的key为切割的单词
k.set(word);
// 按单词和计数输出
context.write(k, v);
}
}
}
//Reducer
class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出
v.set(sum);
// 得到最终的结果
context.write(key, v);
}
}
package com.wxl.hadoop.mapReduce.wordCount;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import java.io.IOException;
import java.util.Date;
public class WordCountMapReduce extends Configured implements Tool {
public static void main(String[] args) throws Exception {
///本地测试,正式环境需要注释掉//
Date date = new Date();//保证输出的目录不重复
args = new String[]{"D:\\ideawork\\bigdata\\apache-hadoop\\src\\main\\resources\\mapreduce\\input\\word.txt",
"D:\\ideawork\\bigdata\\apache-hadoop\\src\\main\\resources\\mapreduce\\output\\" + date.getTime()};
// run job
int status = ToolRunner.run(new WordCountMapReduce(), args);
// exit program
System.exit(status);
}
// Driver
public int run(String[] args) throws Exception {
// 1 获取配置信息以及获取 job 对象
Configuration conf = super.getConf();
//设置job名
Job job = Job.getInstance(conf, this.getClass().getSimpleName());
// 2 关联本 Driver 程序的 jar
job.setJarByClass(this.getClass());
// 3 关联 Mapper 和 Reducer 的 jar
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置 Mapper 输出的 kv 类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出 kv 类型
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置输入和输出路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// submit job
boolean status = job.waitForCompletion(true);
return status ? 0 : 1;
}
//Map阶段
public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
Text k = new Text();//分割后的单词
IntWritable v = new IntWritable(1);//计数
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 获取一行
String line = value.toString();
// 2 按空格切割
String[] words = line.split(" ");
// 3 输出
for (String word : words) {
//排除空值
if (word.trim() == "" || word.length() == 0) {
continue;
}
System.out.println("map输出>>>" + word);
// 设置输出的key为切割的单词
k.set(word);
// 按单词和计数输出
context.write(k, v);
}
}
}
//Reducer
public static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
int sum;
IntWritable v = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 1 累加求和
sum = 0;
for (IntWritable count : values) {
sum += count.get();
}
// 2 输出
v.set(sum);
// 得到最终的结果
context.write(key, v);
}
}
}
更多关于大数据 Hadoop系列的学习文章,请参阅:进击大数据系列,本系列持续更新中。
MapReduce向用户提供了简单的编程接口,由框架层自动完成数据分布存储、数据通信、容错处理等复杂的底层处理细节,用户只需要使用接口实现自己的数据处理逻辑即可。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一 样的,就是因为这个特点使得 MapReduce 编程变得非常流行。
当你的计算资源不能得到满足的时候,可以通过简单的增加机器来扩展它的计算能力。
MapReduce 设计的初衷就是使程序能够部署在廉价的 PC 机器上,这就要求它具有很高 的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行, 不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由 Hadoop 内部完成的。
适合 PB 级以上海量数据的离线处理,可以实现上千台服务器集群并发工作,提供数据处理能力。
MapReduce 无法像MySQL一样,在毫秒或者秒级内返回结果。
流式计算的输入数据是动态的,而 MapReduce 的输入数据集是静态的,不能动态变化。 这是因为 MapReduce 自身的设计特点决定了数据源必须是静态的。
多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下, MapReduce 并不是不能做,而是使用后,每个 MapReduce 作业的输出结果都会写入到磁盘,会造成大量的磁盘 IO,导致性能非常的低下。
参考文章:https://www.jianshu.com/p/a61fd904e2c5 cnblogs.com/liugp/p/16101242.html cnblogs.com/ttzzyy/p/12323259.html blog.csdn.net/qq_38414334/article/details/12047614