
《架构师之路:架构设计中的100个知识点》
85.MapReduce架构启示
第一部分:MapReduce究竟解决什么问题。
很多时候,定义清楚问题比解决问题更难。
什么是MapReduce?
它不是一个产品,而是一种解决问题的思路,它有多个工程实现,Google在论文中也给出了它自己的工程架构实现。
MapReduce这个编程模型解决什么问题?
能够用分治法解决的问题,例如:
1. 网页抓取;
2. 日志处理;
3. 索引倒排;
4. 查询请求汇总;
5…
画外音:现实中有许多基于分治的应用需求。
为什么是Google,发明了这个模型?
Google网页抓取,分析,倒排的多个应用场景,当时的技术体系,解决不了Google大数据量高并发量的需求,Google被迫进行技术创新,思考出了这个模型。
画外音:谁痛谁想办法。
为什么MapReduce对“能够用分治法解决的问题”特别有效?
分治法,是将一个大规模的问题,分解成多个小规模的问题(分),多个小规模问题解决,再统筹小问题的解(合),就能够解决大规模的问题。
Google MapReduce为什么能够成功?
Google为了方便用户使用系统,提供给了用户很少的接口,去解决复杂的问题。 1. Map函数接口:处理一个基于key/value(后简称kv)的成对(pair)数据集合,同时也输出基于kv的数据集合; 2. Reduce函数接口:用来合并Map输出的kv数据集合;
画外音:MapReduce系统架构,能在大规模普通PC集群上实现并行处理,和GFS等典型的互联网架构类似。
用户仅仅关注少量接口,不用关心并行、容错、数据分布、负载均衡等细节,又能够解决很多实际的问题,还有这等好事!
能不能举一个例子,说明下MapReduce的Map函数与Reduce函数是如何解决实际问题的?
举例:假设要统计大量文档中单词出现的个数。
Map
输入KV:pair(文档名称,文档内容)
输出KV:pair(单词,1)
画外音:一个单词出现一次,就输出一个1。
Reduce
输入KV:pair(单词,1)
输入KV:pair(单词,总计数)
以下是一段伪代码:Map(list<pair(
foreach(pair in list)
foreach(word in doc_content)
echo pair($word, 1); // 输出list<k,v>
}
画外音:如果有多个Map进程,输入可以是一个pair,不是一个list。
Reduce(list<pair(
map<string,int> result;
foreach(pair in list)
result[word] += count;
foreach($keyin result)
echo pair(key, result[key]); // 输出list<k,v>
}
画外音:即使有多个Reduce进程,输入也是list<pair>,因为它的输入是Map的输出。
最早在单机的体系下计算,输入数据量巨大的时候,处理很慢。如何能够在短时间内完成处理,很容易想到的思路是,将这些计算分布在成百上千的主机上,但此时,会遇到各种复杂的问题,例如:
1. 并行计算
2. 数据分发
3. 错误处理
4. 集群通讯
5…
这些综合到一起,就成为了一个困难的问题,这也是Google MapReduce工程架构要解决的问题。
第二部分:MapReduce的核心优化思路。
为了解决上述场景遇到的各种复杂问题,MapReduce的核心优化思路是:
1. 并行;
2. 先分再合;
下图简述了MR计算“词频统计”的过程。

从左到右四个部分,分别是:
1. 输入文件;
2. 分:M个并行的map计算实例;
3. 合:R个并行的reduce计算实例;
4. 输出结果;
先看最后一步,reduce输出最终结果。

可以看到,R个reduce实例并发进行处理,直接输出最后的计数结果。
实例1输出:(a, 256)(able, 128)(emacs, 1)
实例2输出:(f*ck, 32768) (coding, 65535)
实例3输出:(vim,65535)(x, 16)(zero, 258)
画外音:这就是总结果,可以看到vim比emacs受欢迎很多。
需要理解的是,由于这是业务计算的最终结果,一个单词的计数不会出现在两个实例里。即:如果(a, 256)出现在了实例1的输出里,就一定不会出现在其他实例的输出里。
画外音:否则的话,还需要合并,就不是最终结果了。
再看中间步骤,map到reduce的过程。

可以看到,M个map实例的输出,会作为R个reduce实例的输入。
潜在问题一:每个map都有可能输出(a, 1),而最终结果(a, 256)必须由一个reduce输出,那如何保证每个map输出的同一个key,落到同一个reduce上去呢?
这就是“分区函数”的作用。
什么是分区函数?
分区函数,是使用MapReduce的用户需要实现的,决定map输出的每一个key应当落到哪个reduce上的函数。
画外音:如果用户没有实现,会使用默认分区函数。
以词频统计的应用为例,分区函数可能是:
(1) 以[a-g]开头的key落到第一个reduce实例;
(2) 以[h-n]开头的key落到第二个reduce实例;
(3) 以[o-z]开头的key落到第三个reduce实例;
画外音:有点像数据库水平切分的“范围法”。
分区函数实现要点是什么?
为了保证每一个reduce实例都能够差不多时间结束工作任务,分区函数的实现要点是:尽量负载均衡。
画外音:即数据均匀分摊。
上述词频统计的分区函数,就不是负载均衡的,有些reduce实例处理的单词多,有些reduce处理的单词少,这样就可能出现,所有reduce实例都处理结束,最后等待一个长尾reduce的情况。
对于词频统计,负载更为均衡的分区函数为:
hash(key) % 3
画外音:有点像数据库水平切分的“哈希法”。
潜在问题二:每个map都有可能输出多个(a, 1),这样无形中增大了网络带宽资源,以及reduce的计算资源,有没有办法进行优化呢?
这就是“合并函数”的作用。
什么是合并函数?
有时,map产生的中间key的重复数据比重很大,可以提供给用户一个自定义函数,在一个map实例完成工作后,本地就做一次合并,这样网络传输与reduce计算资源都能节省很多。
合并函数在每个map任务结束前都会执行一次,一般来说,合并函数与reduce函数是一样的,区别是:
1. 合并函数执行map实例本地数据合并;
2. reduce函数执行最终的合并,会收集多个map实例的数据;
对于词频统计应用,合并函数可以将:
一个map实例的多个(a, 1)合并成一个(a, $count)输出。
最后看第一个个步骤,输入文件到map的过程。

潜在问题三:如何确定文件到map的输入呢?
随意即可,只要负载均衡,均匀切分输入文件大小就行,不用管分到哪个map实例。
画外音:无论分到那个map都能正确处理。
结论,Google MapReduce实施了一系列的优化:
1. 分区函数:保证不同map输出的相同key,落到同一个reduce里;
2. 合并函数:在map结束时,对相同key的多个输出做本地合并,节省总体资源;
3. 输入文件到map如何切分:随意,切分均匀就行;
第三部分:MapReduce的工程架构实践。
上述优化后的执行流程,Google MapReduce通过怎样的工程架构实现的呢?

先看下总体架构图,有个直观的印象。
用户使用GoogleMR系统,必须输入的是什么?
1. 输入数据,必选
画外音:否则系统处理啥。
2. map函数,必选
3. reduce函数,必选
画外音:分治法,分与合的业务逻辑。
4. 分区函数,必选
画外音:保证同一个key,在合并阶段,必须落到同一个reduce上,系统提供默认hash(key)法。
5. 合并函数,可选
画外音:看用户是否需要在map结束阶段进行优化。
用户提供各个输入后,GoogleMR的执行流程是什么?
画外音:不妨假设,用户设置了M个map节点,R个reduce节点;例如:M=500,R=200。
(1) 在集群中创建大量可执行实例副本(fork);
(2) 这些副本中有一个master,其他均为worker,任务的分配由master完成, M个map实例和R个reduce实例由worker完成;
(3) 将输入数据分成M份,然后被分配到map任务的worker,从其中一份读取输入数据,执行用户的map函数处理,并在本地内存生成临时数据;
(4) 本地内存临时数据,通过分区函数,被分成R份,周期性的写到本地磁盘,由master调度,传给被分配到reduce任务的worker;
(5) 负责reduce任务的worker,从远程读取多个map输出的数据,执行用户的reduce函数处理,处理结果写入输出文件;
画外音:可能对key要进行外部排序。
(6) 所有map和reduce的worker都结束工作后,master唤醒用户程序,MapReduce调用返回,结果被输出到了R个文件中。
GoogleMR系统里的master和worker是啥?
(1) master:单点master会存储一些元数据,监控所有map与reduce的状态,记录哪个数据要给哪个map,哪个数据要给哪个reduce,掌控全局视野,做中控;
画外音:是不是和GFS的master非常像?
(2) worker:多个worker进行业务逻辑处理,具体一个worker是用来执行map还是reduce,是由master调度的;
画外音:是不是和工作线程池非常像?这里的worker是分布在多台机器上的而已。
master的高可用是如何保证的?
一个简单的方法是,将元数据固化到磁盘上,用一个shadow-master来做高可用。
画外音:GFS不就是这么干的么?
然而现实情况是:没有将元数据固化到磁盘上,元数据被存放在master的内存里用以提高工作效率,当master挂掉后,通知用户“任务执行失败”,让其选择重新执行。
画外音:
(1) 单点master,掌控全局视野,能让系统的复杂性降低非常多;
(2) master挂掉的概率很小;
(3) 不做高可用,能让系统的复杂性降低非常多;
worker的高可用是如何保证的?
master会周期性的ping每个worker,如果超时未返回,master会把对应的worker置为无效,把这个worker的工作任务重新执行:
1. 如果重新执行的是reduce任务,不需要有额外的通知;
2. 如果重新执行的是map任务,需要通知执行reduce的worker节点,输入数据换了一个worker;
随时都可能有map或者reduce挂掉,任务完成前重新被执行,会不会影响MR的最终结果?
在用户输入不变的情况下,MR的输出一定是不变的,这就要求MR系统必须具备幂等性:
1. 对相同的输入,不管哪个负责map的worker执行的结果,一定是不变的,产出的R个本地输出文件内容也一定是不变的;
2. 对于M个map,每个map输出的R个本地文件,只要这些输入不变,对应接收这些数据的reduce的worker执行结果,一定是不变的,输出文件内容也一定是不变的;
长尾效应怎么解决?
一个MR执行时间的最大短板,往往是“长尾worker”。
导致“长尾worker”的原因有很多:
(1) 用户的分区函数设计得不合理,导致某些reduce负载不均,要处理大量的数据;
画外音:
最坏的情况,所有数据最终都落到一个reduce上,分布式并行处理,转变为了单机串行处理;
所以,分区函数的负载均衡性,是用户需要考虑的。
(2) 因为系统的原因,worker所在的机器磁盘坏了,CPU有问题,也可能导致任务执行很慢;
GoogleMR有一个“备用worker”的机制,当某些worker的执行时间超出预期时,会启动另一个worker执行相同的任务,以尝试解决长尾效应。
总结
Google MapReduce架构,体现了很多经典架构实践:
1. 单点master简化系统复杂度;
2. 单点master不高可用,简化系统复杂度;
3. master对worker的监控以及重启,保证worker高可用;
4. 幂等性,保证结果的正确性;
5. 多个worker执行同一个任务优化长尾问题;
参考:《GFS架构启示(第84讲)》
知其然,知其所以然。
思路比结论更重要。
==全文完==
创业ing,一年至少50场活动,欢迎大家加入。