Nathanaël

纳塔纳埃尔,切莫再想去尝试旧日的清水

0%

6.824-lab1总结

前言

lab1是实现一个MapReduce,markdown写了挺久了,一直没有上传到博客

RPC定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
type HeartbeatArgs struct {
}

type HeartbeatReply struct {
Filename string // map阶段使用
JobType JobType
Id int // map阶段传mapId, reduce阶段传reduceId
NReduce int // map阶段使用
NMap int // reduce阶段使用
}

type JobFinishArgs struct {
Id int
JobPhase CurrentPhase
}

type JobFinishReply struct {
}

Tasks结构

Coordinator维护一个task列表,初始时为map task列表,当map阶段结束,task列表变更为reduce task

1
2
3
4
5
6
7
type Task struct {
taskId int // 当前taskId, map阶段为mapId, reduce阶段为reduceId
startTime time.Time
taskStatus TaskStatus
fileName string
jobType JobType
}

Coordinator结构

1
2
3
4
5
6
7
8
9
10
type Coordinator struct {
// Your definitions here.
nMap int // 输入文件数量
nReduce int // 参数传入
files []string // 要处理的文件,map阶段通过reply传递给worker
currentPhase CurrentPhase // 判断当前处于 map阶段 or reduce阶段 or all down阶段
tasks []Task

mu sync.Mutex
}

Coordinator初始化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{
mu: sync.Mutex{},
files: files,
nMap: len(files),
nReduce: nReduce,
}
c.currentPhase = MapPhase
c.tasks = make([]Task, c.nMap)
for index, file := range c.files {
c.tasks[index] = Task{
fileName: file,
taskId: index,
taskStatus: Idle,
}
}

c.server()
return &c
}

RPC接口实现

我没有在Coordinator维护每个Worker的状态,每次收到Worker的请求时,Coordinator首先判断当前是什么phase,检查task完成情况。遍历task列表,当有空闲或者超时未完成的task时,将这个task分配给发起请求的worker。

1、当所有的ReduceTask都完成时,返回一个CompleteJob,worker收到后返回,MapReduce结束

2、如果当前所有的task都在处理中,判断是否有超时的task。如果有,重设其超时时间,并将这个task分配给请求的worker;如果没有,返回一个waitJob,worker收到后休眠一段时间,然后才重新发起请求

3、当有task空闲时,直接分配即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (c *Coordinator) Request(args *HeartbeatArgs, reply *HeartbeatReply) error {
c.mu.Lock()
defer c.mu.Unlock()
switch c.currentPhase {
case AllDonePhase:
reply.JobType = CompleteJob
case MapPhase:
c.selectTask(reply, MapPhase)
case ReducePhase:
c.selectTask(reply, ReducePhase)
}
// log.Printf("Coordinator: assign a task %v to worker", reply)
return nil
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
func (c *Coordinator) selectTask(reply *HeartbeatReply, phase CurrentPhase) {
hasNewJob := false
var taskLen int
if phase == MapPhase {
taskLen = len(c.files)
} else if phase == ReducePhase {
taskLen = c.nReduce
}
for i := 0; i < taskLen; i++ {
task := &c.tasks[i]
if task.taskStatus == Idle {
hasNewJob = true
task.taskStatus = Processing
task.startTime = time.Now()

reply.Id = task.taskId
reply.NMap = c.nMap
reply.NReduce = c.nReduce
reply.JobType = task.jobType
// reduce阶段,filename直接在worker里根据mr-x-x的格式组装,用不到这个字段
reply.Filename = task.fileName
break
} else if task.taskStatus == Processing {
// task[i]超时未完成,需要把它交给其他worker处理
if time.Now().Sub(task.startTime) > time.Second*10 {
hasNewJob = true

reply.Id = task.taskId
reply.NMap = c.nMap
reply.NReduce = c.nReduce
reply.JobType = task.jobType

reply.Filename = task.fileName

task.startTime = time.Now()
// log.Printf("Coordinator: a worker timeout, assign this task %v to another worker", reply)
break
}
} else if task.taskStatus == Finished {
continue
} else {
fmt.Errorf("unexcepted branch")
}
}

if !hasNewJob {
reply.JobType = WaitJob
}
return

}

Worker完成一个job时通知Coordinator,Coordinator将对应的task标记为已完成,并判断当前阶段所有任务是否已完成,如果完成了则进行下一阶段的初始化。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
func (c *Coordinator) Report(args *JobFinishArgs, reply *JobFinishReply) error {
c.mu.Lock()
defer c.mu.Unlock()
if c.currentPhase == AllDonePhase {
return nil
}
if args.JobPhase == c.currentPhase {
// log.Printf("Coordinator: Worker has completed a %v task %v \n", args.JobPhase, args)
c.tasks[args.Id].taskStatus = Finished
if c.IsAllTaskFinish() {
if c.currentPhase == MapPhase {
// log.Printf("Coordinator: MapPhase finish, start init ReducePhase")
c.ReducePhaseInit()
return nil
} else if c.currentPhase == ReducePhase {
// log.Printf("Coordinator: ReducePhase finish, nothing need to do!")
c.currentPhase = AllDonePhase
return nil
} else {
fmt.Errorf("report: unexcepted branch")
}
}

}
return nil
}
func (c *Coordinator) IsAllTaskFinish() bool {
allFinish := true
for i := 0; i < len(c.tasks); i++ {
if c.tasks[i].taskStatus != Finished {
allFinish = false
break
}
}
return allFinish
}

func (c *Coordinator) ReducePhaseInit() {
c.currentPhase = ReducePhase
c.tasks = make([]Task, c.nReduce)
for i := 0; i < c.nReduce; i++ {
c.tasks[i].taskId = i
c.tasks[i].taskStatus = Idle
c.tasks[i].jobType = ReduceJob
}
}

Worker实现

Worker上的一个job对应于一个Coordinator的task(尽管可能有一个或多个超时的job对应于一个task,但只需要最后有一个Worker上的job完成并顺利report了就ok)

Worker收到回复后就根据JobType执行相应的操作,以map阶段为例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
/*
worker收到master的reply后,如果拿到一份mapJob,打开reply里指定了文件名的文件,
读取文件内容,以单词计数为例,调用mapf函数处理文件内容得到形如
add 1
apple 2
bad 2
这样的键值对数组([]KeyValue),再将其中的键值对按哈希值写入不同的中间文件mr-x-x,前者为mapId, 由reply指定,
后者由ihash(key) % NReduce得出, NReduce由master设置为固定大小,本lab设置为输入文件数量
*/
func doMapJob(reply *HeartbeatReply, mapf func(string, string) []KeyValue) {
fileName := reply.Filename
content, err := os.ReadFile(fileName)
if err != nil {
log.Fatalf("cannot read %v", fileName)
}
kva := mapf(fileName, string(content))
intermediates := make([][]KeyValue, reply.NReduce)
for _, kv := range kva {
index := ihash(kv.Key) % reply.NReduce
intermediates[index] = append(intermediates[index], kv)
}
var wg sync.WaitGroup
for index, intermediate := range intermediates {
wg.Add(1)
go func(index int, intermediate []KeyValue) {
defer wg.Done()
intermediateFilePath := generateMapResultFileName(reply.Id, index)
tmpFile, err := os.CreateTemp(".", intermediateFilePath)
if err != nil {
log.Fatalf("cannot create tmp file %v", intermediateFilePath)
}
enc := json.NewEncoder(tmpFile)
for _, kv := range intermediate {
err := enc.Encode(&kv)
if err != nil {
log.Fatalf("cannot encode json %v", kv.Key)
}
}
if err := os.Rename(tmpFile.Name(), intermediateFilePath); err != nil {
fmt.Errorf("cannot replace %q with tempfile %q: %v",
intermediateFilePath, tmpFile.Name(), err)
}
}(index, intermediate)
}
wg.Wait()
reportTask(JobFinishArgs{reply.Id, MapPhase})

}

本lab的MapReduce是单机版的,文件其实都存在本地,所以不需要维护中间文件名,只需要在启动时根据mr-x-x的格式读取文件即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
func doReduceJob(reply *HeartbeatReply, reducef func(string, []string) string) {
var kva []KeyValue
for i := 0; i < reply.NMap; i++ {
filePath := generateMapResultFileName(i, reply.Id)
file, err := os.Open(filePath)
if err != nil {
log.Fatalf("cannot open %v", filePath)
}
dec := json.NewDecoder(file)
for {
var kv KeyValue
if err := dec.Decode(&kv); err != nil {
break
}
kva = append(kva, kv)
}
file.Close()
}
results := make(map[string][]string)
for _, kv := range kva {
results[kv.Key] = append(results[kv.Key], kv.Value)
}

reduceFileName := generateReduceResultFileName(reply.Id)
tmpFile, err := os.CreateTemp(".", reduceFileName)
if err != nil {
log.Fatalf("cannot create tmp file %v", reduceFileName)
}
for key, values := range results {
output := reducef(key, values)
fmt.Fprintf(tmpFile, "%v %v\n", key, output)
}
if err := os.Rename(tmpFile.Name(), reduceFileName); err != nil {
fmt.Errorf("cannot replace %q with tempfile %q: %v",
reduceFileName, tmpFile.Name(), err)
}
reportTask(JobFinishArgs{reply.Id, ReducePhase})
}