前言
Lab1需要实现一个MapReduce模型,MapReduce是一个分布式的计算框架。一些常见的计算任务可抽象为Map和Reduce两个阶段,如单词计数、倒排索引等,这些任务可以基于MapReduce实现分布式的计算。
对于应用程序员来说,只需要实现好Map和Reduce函数就可以基于MapReduce进行高性能的分布式计算。而Lab1要求我们从MapReduce设计者的角度来思考,实现一个简易版本的MapReduce,即如何将一个可拆分为Map和Reduce两阶段的计算任务分发到不同的物理机器上完成。
思路
首先阅读MapReduce论文,可将Master的任务分配分为Map和Reduce两个阶段,根据初始计算数据划分为X个Map任务和Y个Reduce任务(Y由用户程序指定)。
完成所有Map任务后即可转换为Reduce阶段。而Master和Worker通过rpc通信,则Master可以在每次Worker请求任务时分配Map任务,并为每个任务启动一个计时器(论文中给出的示例是10s),超时则将已分配的任务放回任务channel中,可再次分配。Worker每完成一个任务都需要通知Master。
对于Map任务需要注意将计算的中间结果保存至本地临时文件(命名格式mr-X-Y),然后由Reduce任务读取临时文件作为输入并将最终计算结果保存为mr-out-Y。
当Master发现所有任务完成后即可退出。
RPC
rpc为单向通信,客户端Worker 传入函数名及参数通过rpc远程调用服务端Master 的方法
1 2 3
|
call("Master.DispatchTask", &req, &reply)
|
rpc请求、响应数据类型如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| const ( ASK_TASK = iota WAIT_TASK MAP_TASK REDUCE_TASK MAP_PHASE REDUCE_PHASE MAP_TASK_FIN REDUCE_TASK_FIN )
type Request struct { ID int MapID int ReduceID int }
type Reply struct { ID int MapTask ReduceTask }
type MapTask struct { FileName string MapID int NReduce int }
type ReduceTask struct { FileName string ReduceID int MapNum int }
|
Master
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| type Master struct { FilesName []string NReduce int MapNum int ReduceNum int Phase int MapTaskChan chan MapTask ReduceTaskChan chan ReduceTask MapFinChan chan int ReduceFinChan chan int ExitChan chan bool Timers map[int]*time.Timer mutex sync.Mutex }
|
- Master通过请求参数的ID执行不同处理逻辑
- 通过 len(MapFinChan) 判断所有Map任务完成后,Master通过更改Phase字段转换为分配Reduce任务
- 通过 len(ReduceFinChan) 判断所有Reduce任务完成后,向ExitChan传入true,结束Master进程
- 可将每个输入文件划分为一个Map任务,则共有 len(files) 个任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| func MakeMaster(files []string, nReduce int) *Master { m := Master{ FilesName: files, NReduce: nReduce, MapNum: len(files), ReduceNum: nReduce, Phase: MAP_PHASE, MapTaskChan: make(chan MapTask, len(files)), MapFinChan: make(chan int, len(files)), ReduceTaskChan: make(chan ReduceTask, nReduce), ReduceFinChan: make(chan int, nReduce), Timers: make(map[int]*time.Timer), ExitChan: make(chan bool, 1), } for i, file := range files { task := MapTask{ FileName: file, MapID: i, NReduce: nReduce, } m.MapTaskChan <- task } for i := 0; i < nReduce; i++ { task := ReduceTask{ ReduceID: i, MapNum: m.MapNum, } m.ReduceTaskChan <- task }
m.server() return &m }
|
crash test
引入计时器
通过维护一个从MapID或ReduceID到*time.Timer
的映射,Master每接收一个任务分配请求并分配任务后,就为该任务启动一个计时器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func (m *Master) StartTimer(reply *Reply) { timer := time.NewTimer(10 * time.Second) switch reply.ID { case MAP_TASK: m.mutex.Lock() m.Timers[reply.MapID] = timer m.mutex.Unlock() select { case <-timer.C: m.mutex.Lock() m.MapTaskChan <- reply.MapTask m.mutex.Unlock() } case REDUCE_TASK: m.mutex.Lock() m.Timers[reply.ReduceID] = timer m.mutex.Unlock() select { case <-timer.C: m.mutex.Lock() m.ReduceTaskChan <- reply.ReduceTask m.mutex.Unlock() } } }
|
完成任务后,由Worker通过rpc调用TaskFin()通知Master
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| func (m *Master) TaskFin(args *Request, reply *Reply) error { m.mutex.Lock() defer m.mutex.Unlock()
switch args.ID { case MAP_TASK_FIN: if m.Timers[args.MapID].Stop() { m.MapFinChan <- args.MapID } case REDUCE_TASK_FIN: if m.Timers[args.ReduceID].Stop() { m.ReduceFinChan <- args.ReduceID } }
return nil }
|
原子化命名
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| for x, kva := range bucket { oname := "mr-" + strconv.Itoa(reply.MapTask.MapID) + "-" + strconv.Itoa(x) tempfile, err := ioutil.TempFile("", "mr-tmp-*") if err != nil { log.Fatalf("cannot create temp file %v", err) } enc := json.NewEncoder(tempfile) for _, kv := range kva { if err := enc.Encode(&kv); err != nil { fmt.Println(err) tempfile.Close() os.Remove(tempfile.Name()) continue } } if err := tempfile.Close(); err != nil { log.Fatalf("cannot close %v", tempfile.Name()) } if err := os.Rename(tempfile.Name(), oname); err != nil { log.Fatalf("cannot rename temp file %v to %v: %v", tempfile.Name(), oname, err) } }
|
测试
1 2 3 4 5 6 7 8 9 10 11 12 13
| # 引入Map、Reduce函数 go build -buildmode=plugin ../mrapps/wc.go # 清除上次运行结果 rm mr-out* # 启动Master go run mrmaster.go pg-*.txt # 启动Worker go run mrworker.go wc.so # 查看结果 cat mr-out-* | sort | more
# 脚本测试 bash test-mr.sh
|
通过所有测试!!!