使用的2020 Spring版的教程和Lab,最新的2021版是由Frans Kaashoek主讲,而不是Robert Morris。再加上Frans的口音都比Morris重很多,手写也比较难认,所以没有使用2021的教程。
个人感觉6.824还是值得听一听的。虽然主要部分在论文里面,但是课堂上的讨论也很有价值。刷课的视频我放在B站上,ID: s09g谷歌摸鱼。
Lab1要求用Go实现一个基本的MapReduce框架。需要先读MapReduce论文。如果英文论文读不下来,我也在B站上做了一期MapReduce论文解读的视频。
6.824所有的Lab都要求用Go实现,所以我先花了点时间去学Go.微信阅读有《Head First Go》,知识结构合理,适合零基础,无限卡可以免费读。看完《Head First Go》之后可以快速过一下Go by Example中文版 ,了解一下还有哪些《Head First Go》没有讲到的地方。然后就可以去写Lab1了,有不清楚的地方直接百度。Lab1最终的实现代码我放在了github.com/s09g/mapreduce-go
根据论文第三节,MapReduce的执行流程分这么几步:
启动MapReduce, 将输入文件切分成大小在16-64MB之间的文件。然后在一组多个机器上启动用户程序
其中一个副本将成为master, 余下成为worker. master给worker指定任务(M个map任务,R个reduce任务)。master选择空闲的worker给予map或reduce任务
Map worker 接收切分后的input,执行Map函数,将结果缓存到内存
缓存后的中间结果会周期性的写到本地磁盘,并切分成R份(reducer数量)。R个文件的位置会发送给master, master转发给reducer
Reduce worker 收到中间文件的位置信息,通过RPC读取。读取完先根据中间<k, v>排序,然后按照key分组、合并。
Reduce worker在排序后的数据上迭代,将中间<k, v> 交给reduce 函数处理。最终结果写给对应的output文件(分片)
所有map和reduce任务结束后,master唤醒用户程序
这一部分对论文的3.2节有所改动,相比于原论文有所简化。
论文提到每个(Map或者Reduce)Task有分为idle, in-progress, completed 三种状态。
type MasterTaskStatus int
const (
Idle MasterTaskStatus = iota
InProgress
Completed
)
Master存储这些Task的信息。与论文不同的是,这里我并没有保留worked的ID,因此master不会主动向worker发送心跳检测
type MasterTask struct {
TaskStatus MasterTaskStatus
StartTime time.Time
TaskReference *Task
}
此外Master存储Map任务产生的R个中间文件的信息。
type Master struct {
TaskQueue chan *Task // 等待执行的task
TaskMeta map[int]*MasterTask // 当前所有task的信息
MasterPhase State // Master的阶段
NReduce int
InputFiles []string
Intermediates [][]string // Map任务产生的R个中间文件的信息
}
Map和Reduce的Task应该负责不同的事情,但是在实现代码的过程中发现同一个Task结构完全可以兼顾两个阶段的任务。
type Task struct {
Input string
TaskState State
NReducer int
TaskNumber int
Intermediates []string
Output string
}
此外我将task和master的状态合并成一个State。task和master的状态应该一致。如果在Reduce阶段收到了迟来MapTask结果,应该直接丢弃。
type State int
const (
Map State = iota
Reduce
Exit
Wait
)
1. 启动master
func MakeMaster(files []string, nReduce int) *Master {
m := Master{
TaskQueue: make(chan *Task, max(nReduce, len(files))),
TaskMeta: make(map[int]*MasterTask),
MasterPhase: Map,
NReduce: nReduce,
InputFiles: files,
Intermediates: make([][]string, nReduce),
}
// 切成16MB-64MB的文件
// 创建map任务
m.createMapTask()
// 一个程序成为master,其他成为worker
//这里就是启动master 服务器就行了,
//拥有master代码的就是master,别的发RPC过来的都是worker
m.server()
// 启动一个goroutine 检查超时的任务
go m.catchTimeOut()
return &m
}
2. master监听worker RPC调用,分配任务
// master等待worker调用
func (m *Master) AssignTask(args *ExampleArgs, reply *Task) error {
// assignTask就看看自己queue里面还有没有task
mu.Lock()
defer mu.Unlock()
if len(m.TaskQueue) > 0 {
//有就发出去
*reply = *<-m.TaskQueue
// 记录task的启动时间
m.TaskMeta[reply.TaskNumber].TaskStatus = InProgress
m.TaskMeta[reply.TaskNumber].StartTime = time.Now()
} else if m.MasterPhase == Exit {
*reply = Task{TaskState: Exit}
} else {
// 没有task就让worker 等待
*reply = Task{TaskState: Wait}
}
return nil
}
3. 启动worker
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
// 启动worker
for {
// worker从master获取任务
task := getTask()
// 拿到task之后,根据task的state,map task交给mapper, reduce task交给reducer
// 额外加两个state,让 worker 等待 或者 直接退出
switch task.TaskState {
case Map:
mapper(&task, mapf)
case Reduce:
reducer(&task, reducef)
case Wait:
time.Sleep(5 * time.Second)
case Exit:
return
}
}
}
4. worker向master发送RPC请求任务
func getTask() Task {
// worker从master获取任务
args := ExampleArgs{}
reply := Task{}
call("Master.AssignTask", &args, &reply)
return reply
}
5. worker获得MapTask,交给mapper处理
func mapper(task *Task, mapf func(string, string) []KeyValue) {
//从文件名读取content
content, err := ioutil.ReadFile(task.Input)
if err != nil {
log.Fatal("fail to read file: "+task.Input, err)
}
//将content交给mapf,缓存结果
intermediates := mapf(task.Input, string(content))
//缓存后的结果会写到本地磁盘,并切成R份
//切分方式是根据key做hash
buffer := make([][]KeyValue, task.NReducer)
for _, intermediate := range intermediates {
slot := ihash(intermediate.Key) % task.NReducer
buffer[slot] = append(buffer[slot], intermediate)
}
mapOutput := make([]string, 0)
for i := 0; i < task.NReducer; i++ {
mapOutput = append(mapOutput, writeToLocalFile(task.TaskNumber, i, &buffer[i]))
}
//R个文件的位置发送给master
task.Intermediates = mapOutput
TaskCompleted(task)
}
6. worker任务完成后通知master
func TaskCompleted(task *Task) {
//通过RPC,把task信息发给master
reply := ExampleReply{}
call("Master.TaskCompleted", task, &reply)
}
7. master收到完成后的Task
func (m *Master) TaskCompleted(task *Task, reply *ExampleReply) error {
//更新task状态
mu.Lock()
if task.TaskState != m.MasterPhase || m.TaskMeta[task.TaskNumber].TaskStatus == Completed {
// 因为worker写在同一个文件磁盘上,对于重复的结果要丢弃
return nil
}
m.TaskMeta[task.TaskNumber].TaskStatus = Completed
mu.Unlock()
defer m.processTaskResult(task)
return nil
}
func (m *Master) processTaskResult(task *Task) {
mu.Lock()
defer mu.Unlock()
switch task.TaskState {
case Map:
//收集intermediate信息
for reduceTaskId, filePath := range task.Intermediates {
m.Intermediates[reduceTaskId] = append(m.Intermediates[reduceTaskId], filePath)
}
if m.allTaskDone() {
//获得所以map task后,进入reduce阶段
m.createReduceTask()
m.MasterPhase = Reduce
}
case Reduce:
if m.allTaskDone() {
//获得所以reduce task后,进入exit阶段
m.MasterPhase = Exit
}
}
}
这里使用一个辅助函数判断是否当前阶段所有任务都已经完成
func (m *Master) allTaskDone() bool {
for _, task := range m.TaskMeta {
if task.TaskStatus != Completed {
return false
}
}
return true
}
8. 转入Reduce阶段,worker获得ReduceTask,交给reducer处理
func reducer(task *Task, reducef func(string, []string) string) {
//先从filepath读取intermediate的KeyValue
intermediate := *readFromLocalFile(task.Intermediates)
//根据kv排序
sort.Sort(ByKey(intermediate))
dir, _ := os.Getwd()
tempFile, err := ioutil.TempFile(dir, "mr-tmp-*")
if err != nil {
log.Fatal("Fail to create temp file", err)
}
// 这部分代码修改自mrsequential.go
i := 0
for i < len(intermediate) {
//将相同的key放在一起分组合并
j := i + 1
for j < len(intermediate) && intermediate[j].Key == intermediate[i].Key {
j++
}
values := []string{}
for k := i; k < j; k++ {
values = append(values, intermediate[k].Value)
}
//交给reducef,拿到结果
output := reducef(intermediate[i].Key, values)
//写到对应的output文件
fmt.Fprintf(tempFile, "%v %v\n", intermediate[i].Key, output)
i = j
}
tempFile.Close()
oname := fmt.Sprintf("mr-out-%d", task.TaskNumber)
os.Rename(tempFile.Name(), oname)
task.Output = oname
TaskCompleted(task)
}
9. master确认所有ReduceTask都已经完成,转入Exit阶段,终止所有master和worker goroutine
func (m *Master) Done() bool {
mu.Lock()
defer mu.Unlock()
ret := m.MasterPhase == Exit
return ret
}
10. 上锁
master跟多个worker通信,master的数据是共享的,其中TaskMeta, Phase, Intermediates, TaskQueue
都有读写发生。TaskQueue
使用channel
实现,自己带锁。只有涉及Intermediates, TaskMeta, Phase
的操作需要上锁。PS.写的糙一点,那就是master每个方法都要上锁,master直接变成同步执行。。。
另外go -race并不能检测出所有的datarace。我曾一度任务Intermediates
写操作发生在map阶段,读操作发生在reduce阶段读,逻辑上存在barrier
,所以不会有datarace. 但是后来想到两个write也可能造成datarace,然而Go Race Detector并没有检测出来。
11. carsh处理
test当中有容错的要求,不过只针对worker。mapreduce论文中提到了:
tricky的在于Lab1是在单机上用多进程模拟了多机器,但并不会因为进程终止导致写好的文件丢失,这也是为什么我前面没有按照论文保留workerID.
针对Lab1修改后的容错设计:
func (m *Master) catchTimeOut() {
for {
time.Sleep(5 * time.Second)
mu.Lock()
if m.MasterPhase == Exit {
mu.Unlock()
return
}
for _, masterTask := range m.TaskMeta {
if masterTask.TaskStatus == InProgress && time.Now().Sub(masterTask.StartTime) > 10*time.Second {
m.TaskQueue <- masterTask.TaskReference
masterTask.TaskStatus = Idle
}
}
mu.Unlock()
}
}
if task.TaskState != m.MasterPhase || m.TaskMeta[task.TaskNumber].TaskStatus == Completed {
// 因为worker写在同一个文件磁盘上,对于重复的结果要丢弃
return nil
}
m.TaskMeta[task.TaskNumber].TaskStatus = Completed
开启race detector,执行test-mr.sh
PS. Lab1要求的Go版本是1.13,如果使用的更新的版本,会报出
rpc.Register: method "Done" has 1 input parameters; needs exactly three
可以直接无视