6.824-Lab1

前言

Lab1需要实现一个MapReduce模型,MapReduce是一个分布式的计算框架。一些常见的计算任务可抽象为Map和Reduce两个阶段,如单词计数、倒排索引等,这些任务可以基于MapReduce实现分布式的计算。

对于应用程序员来说,只需要实现好Map和Reduce函数就可以基于MapReduce进行高性能的分布式计算。而Lab1要求我们从MapReduce设计者的角度来思考,实现一个简易版本的MapReduce,即如何将一个可拆分为Map和Reduce两阶段的计算任务分发到不同的物理机器上完成。

思路

首先阅读MapReduce论文,可将Master的任务分配分为MapReduce两个阶段,根据初始计算数据划分为X个Map任务和Y个Reduce任务(Y由用户程序指定)。

完成所有Map任务后即可转换为Reduce阶段。而Master和Worker通过rpc通信,则Master可以在每次Worker请求任务时分配Map任务,并为每个任务启动一个计时器(论文中给出的示例是10s),超时则将已分配的任务放回任务channel中,可再次分配。Worker每完成一个任务都需要通知Master。

对于Map任务需要注意将计算的中间结果保存至本地临时文件(命名格式mr-X-Y),然后由Reduce任务读取临时文件作为输入并将最终计算结果保存为mr-out-Y。

当Master发现所有任务完成后即可退出。

RPC

rpc为单向通信,客户端Worker 传入函数名及参数通过rpc远程调用服务端Master 的方法

1
2
3
// Worker通过rpc调用Master.DispatchTask(req,reply)
// Master执行DispatchTask为调用者Worker分配任务
call("Master.DispatchTask", &req, &reply)

rpc请求、响应数据类型如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 常量值
const (
ASK_TASK = iota
WAIT_TASK
MAP_TASK
REDUCE_TASK
MAP_PHASE
REDUCE_PHASE
MAP_TASK_FIN
REDUCE_TASK_FIN
)

type Request struct {
//
// ID为请求类型,值为自定义常量
// Master通过常量值来了解请求类型
// 如ASK_TASK(请求分配任务)
//
ID int
// Map任务ID
MapID int
// Reduce任务ID
ReduceID int
}

type Reply struct {
//
// ID为响应类型,值为自定义常量
// Worker通过常量值来了解响应类型
// 如MAP_TASK(执行Map)、RECDUCE_TASK(执行Reduce)、WAIT_TASK(等待)
//
ID int
MapTask
ReduceTask
}
// Map任务
type MapTask struct {
FileName string
// Map任务ID
MapID int
// 用户指定,Map任务通过ihash(key) % nReduce进行分区
NReduce int
}
// Reduce任务
type ReduceTask struct {
// lab1在本地直接读取文件,故该字段用不上
FileName string
// Reduce任务ID
ReduceID int
MapNum int
}

Master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
type Master struct {
// Map处理文件集合
FilesName []string
NReduce int
// Map或Reduce任务总数量
MapNum int // == len(FilesName)
ReduceNum int // == nReduce
// 标识不同任务阶段:MAP_PHASE或REDUCE_PHASE
Phase int
// 初始化Master时记得通过make为所有channel分配容量
// 初始化Master时就应将所有任务划分好并分类放入TaskChan
MapTaskChan chan MapTask
ReduceTaskChan chan ReduceTask
// 可根据以下两个FinChan的长度判断是否已完成所有任务
MapFinChan chan int
ReduceFinChan chan int
// 接受到值则轮询Done()时返回true
ExitChan chan bool
// 计时器,为每个已分配Task计时
// 超过10秒未完成则视为Task分配失效,可重新分配
Timers map[int]*time.Timer
// 加锁,实现对Master数据的安全访问
mutex sync.Mutex
}
  • Master通过请求参数的ID执行不同处理逻辑
  • 通过 len(MapFinChan) 判断所有Map任务完成后,Master通过更改Phase字段转换为分配Reduce任务
  • 通过 len(ReduceFinChan) 判断所有Reduce任务完成后,向ExitChan传入true,结束Master进程
  • 可将每个输入文件划分为一个Map任务,则共有 len(files) 个任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
func MakeMaster(files []string, nReduce int) *Master {
m := Master{
FilesName: files,
NReduce: nReduce,
MapNum: len(files),
ReduceNum: nReduce,
Phase: MAP_PHASE,
MapTaskChan: make(chan MapTask, len(files)),
MapFinChan: make(chan int, len(files)),
ReduceTaskChan: make(chan ReduceTask, nReduce),
ReduceFinChan: make(chan int, nReduce),
Timers: make(map[int]*time.Timer),
ExitChan: make(chan bool, 1),
}
// Your code here.
// 根据输入文件数量切分Map任务
for i, file := range files {
task := MapTask{
FileName: file,
MapID: i,
NReduce: nReduce,
}
m.MapTaskChan <- task
}
// 用户指定Reduce输出文件数量
for i := 0; i < nReduce; i++ {
task := ReduceTask{
ReduceID: i,
MapNum: m.MapNum,
}
m.ReduceTaskChan <- task
}

m.server()
return &m
}

crash test

引入计时器

通过维护一个从MapID或ReduceID到*time.Timer的映射,Master每接收一个任务分配请求并分配任务后,就为该任务启动一个计时器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 任务执行计时,每一个任务请求对应启动一个计时器
func (m *Master) StartTimer(reply *Reply) {
// 启动一个计时器
timer := time.NewTimer(10 * time.Second)
switch reply.ID {
case MAP_TASK:
// 建立MapID到计时器的映射
m.mutex.Lock()
m.Timers[reply.MapID] = timer
m.mutex.Unlock()
select {
// 超时
case <-timer.C:
m.mutex.Lock()
m.MapTaskChan <- reply.MapTask // 将失效任务放回,重新分配
m.mutex.Unlock()
}
case REDUCE_TASK:
// 建立ReduceID到计时器的映射
m.mutex.Lock()
m.Timers[reply.ReduceID] = timer
m.mutex.Unlock()
select {
// 超时
case <-timer.C:
m.mutex.Lock()
m.ReduceTaskChan <- reply.ReduceTask // 将失效任务放回,重新分配
m.mutex.Unlock()
}
}
}

完成任务后,由Worker通过rpc调用TaskFin()通知Master

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
func (m *Master) TaskFin(args *Request, reply *Reply) error {
m.mutex.Lock()
defer m.mutex.Unlock()

switch args.ID {
case MAP_TASK_FIN:
// Stop()返回true,停止成功则说明计时器未过期
if m.Timers[args.MapID].Stop() {
m.MapFinChan <- args.MapID // Map任务完成
}
case REDUCE_TASK_FIN:
// Stop()返回true,停止成功则说明计时器未过期
if m.Timers[args.ReduceID].Stop() {
m.ReduceFinChan <- args.ReduceID // Reduce任务完成
}
}

return nil
}

原子化命名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// x号bucket写入缓存 mr-MapID-x
for x, kva := range bucket {
oname := "mr-" + strconv.Itoa(reply.MapTask.MapID) + "-" + strconv.Itoa(x)
// 创建临时文件
tempfile, err := ioutil.TempFile("", "mr-tmp-*")
if err != nil {
log.Fatalf("cannot create temp file %v", err)
}
enc := json.NewEncoder(tempfile)
for _, kv := range kva {
if err := enc.Encode(&kv); err != nil {
fmt.Println(err)
tempfile.Close()
os.Remove(tempfile.Name())
continue
}
}
// 关闭临时文件
if err := tempfile.Close(); err != nil {
log.Fatalf("cannot close %v", tempfile.Name())
}
// 原子化重命名临时文件
if err := os.Rename(tempfile.Name(), oname); err != nil {
log.Fatalf("cannot rename temp file %v to %v: %v", tempfile.Name(), oname, err)
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
# 引入Map、Reduce函数
go build -buildmode=plugin ../mrapps/wc.go
# 清除上次运行结果
rm mr-out*
# 启动Master
go run mrmaster.go pg-*.txt
# 启动Worker
go run mrworker.go wc.so
# 查看结果
cat mr-out-* | sort | more

# 脚本测试
bash test-mr.sh

通过所有测试!!!

image-20240122204901475