MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。 MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。
MapReduce程序执行流程:
MapReduce1.x原理图.png
解析:
2.1 JobTracker:JT
作业的管理者 将作业分解成一堆的任务:Task(MapTask和ReduceTask) 将任务分派给TaskTrance运行 将任务分派给TaskTracker运行 作业的监控,容错处理(task作业挂了,重启task机制) 在一定时间间隔内,JT没有收到TT的心跳信息,TT可能是挂了,TT上运行的任务会被指派到其他的TT上去执行。
2.2 TaskTracker:TT
任务的执行者(干活的) 在TT上执行我们的Task(MapTask和ReduceTask) 会与JT进行交互:执行/启动/停止作业,发送心跳信息给JT
2.3 MapTask
自己开发的Map任务交由该Task出来,解析每条记录的数据,交给自己的map方法处理将map的输出结果写到本地磁盘(有些作业只有map没有reduce
2.4 ReduceTask
将Map Task输出的数据进行读取,按照数据进行分组传给我们自己编写的reduce方法处理,输出结果写出到hdfs
MapReduce2.x原理图.png
map过程:
1、map读取输入文件内容,按行解析成key1、value1键值对,key为每行首字母在文件中的偏移量,value为行的内容,每个键值对调用一次map函数; 2、map根据自己逻辑,对输入的key1、value1处理,转换成新的key2、value2输出; 3、对输出的key2、value2进行分区; 4、对不同分区的数据,按照key2进行排序、分组,相同的key2的value放到一个集合中(中间进行复杂的shuffle过程); 5、分组后的数据进行规约;
reduce过程:
1、对多个map任务的输出,按照不同的分区,通过网络copy到不同的reduce节点; 2、对多个map任务的输出进行Merge(合并、排序),根据reduce自己的任务逻辑对输入的key2、value2处理,转换成新的key3、value3输出; 3、把reduce的输出保存到hdfs上;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 用MapReduce开发一个wordcount
*/
public class WordCountApp {
//ctrl看Mapper源码KEYIN, VALUEIN, KEYOUT, VALUEOUT
/**
* map读取输入数据
*/
public static class MyMapper extends Mapper<LongWritable,Text,Text,LongWritable>{//这里的参数前两个为输入,后两个为输出
LongWritable one = new LongWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//接收到的每一行数据
String line = value.toString();
//按规则拆分
String[] words = line.split("\t");
for (String word : words) {
context.write(new Text(word),one);
}
}
}
/**
* 归并处理数据
*/
public static class MyReducer extends Reducer<Text,LongWritable,Text,LongWritable>{
//Iterable指key的value值,也就是说如果key出现3次,那么就会key对应的values就有多个了
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
long sum = 0;
for (LongWritable value :
values) {
//求key出现的总和
sum+=value.get();
}
//最终统计结果的输出
context.write(key,new LongWritable(sum));
}
}
public static void main(String[] args) throws Exception {
//创建configuration
Configuration configuration = new Configuration();
//判断输出文件夹或者文件是否已经存在
Path outputPath = new Path(args[1]);
FileSystem fileSystem = FileSystem.get(configuration);
if (fileSystem.exists(outputPath)){
fileSystem.delete(outputPath,true);
System.out.println("output path is exist ,but it is deleted!");
}
//创建job
Job job = Job.getInstance(configuration, "wordcount");
//设置job的处理类
job.setJarByClass(WordCountApp.class);
//设置作业处理的输入路径
FileInputFormat.setInputPaths(job,new Path(args[0]));
//设置map相关的参数
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
//设置reduce相关参数
job.setReducerClass(MyReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
//设置作业处理的输出路径
FileOutputFormat.setOutputPath(job,outputPath);
//提交作业
System.exit(job.waitForCompletion(true)?0:1);
}
}
//maven编译
mvn clean package -DskipTests
可以使用xshell软件或者MobaXterm等sftp上传
hadoop jar /root/lib/learnHdfs-1.0-SNAPSHOT.jar com.zero.mapreduce.WordCountApp hdfs://hadoop01:8020/mylove.txt hdfs://hadoop01:8020/hdfsdat/wc/
扫码关注腾讯云开发者
领取腾讯云代金券
Copyright © 2013 - 2025 Tencent Cloud. All Rights Reserved. 腾讯云 版权所有
深圳市腾讯计算机系统有限公司 ICP备案/许可证号:粤B2-20090059 深公网安备号 44030502008569
腾讯云计算(北京)有限责任公司 京ICP证150476号 | 京ICP备11018762号 | 京公网安备号11010802020287
Copyright © 2013 - 2025 Tencent Cloud.
All Rights Reserved. 腾讯云 版权所有