前言
lab1是实现一个MapReduce,markdown写了挺久了,一直没有上传到博客
RPC定义
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| type HeartbeatArgs struct { }
type HeartbeatReply struct { Filename string JobType JobType Id int NReduce int NMap int }
type JobFinishArgs struct { Id int JobPhase CurrentPhase }
type JobFinishReply struct { }
|
Tasks结构
Coordinator
维护一个task列表,初始时为map task列表,当map阶段结束,task列表变更为reduce task
1 2 3 4 5 6 7
| type Task struct { taskId int startTime time.Time taskStatus TaskStatus fileName string jobType JobType }
|
Coordinator结构
1 2 3 4 5 6 7 8 9 10
| type Coordinator struct { nMap int nReduce int files []string currentPhase CurrentPhase tasks []Task
mu sync.Mutex }
|
Coordinator初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| func MakeCoordinator(files []string, nReduce int) *Coordinator { c := Coordinator{ mu: sync.Mutex{}, files: files, nMap: len(files), nReduce: nReduce, } c.currentPhase = MapPhase c.tasks = make([]Task, c.nMap) for index, file := range c.files { c.tasks[index] = Task{ fileName: file, taskId: index, taskStatus: Idle, } }
c.server() return &c }
|
RPC接口实现
我没有在Coordinator维护每个Worker的状态,每次收到Worker的请求时,Coordinator首先判断当前是什么phase,检查task完成情况。遍历task列表,当有空闲或者超时未完成的task时,将这个task分配给发起请求的worker。
1、当所有的ReduceTask都完成时,返回一个CompleteJob,worker收到后返回,MapReduce结束
2、如果当前所有的task都在处理中,判断是否有超时的task。如果有,重设其超时时间,并将这个task分配给请求的worker;如果没有,返回一个waitJob,worker收到后休眠一段时间,然后才重新发起请求
3、当有task空闲时,直接分配即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (c *Coordinator) Request(args *HeartbeatArgs, reply *HeartbeatReply) error { c.mu.Lock() defer c.mu.Unlock() switch c.currentPhase { case AllDonePhase: reply.JobType = CompleteJob case MapPhase: c.selectTask(reply, MapPhase) case ReducePhase: c.selectTask(reply, ReducePhase) } return nil }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| func (c *Coordinator) selectTask(reply *HeartbeatReply, phase CurrentPhase) { hasNewJob := false var taskLen int if phase == MapPhase { taskLen = len(c.files) } else if phase == ReducePhase { taskLen = c.nReduce } for i := 0; i < taskLen; i++ { task := &c.tasks[i] if task.taskStatus == Idle { hasNewJob = true task.taskStatus = Processing task.startTime = time.Now()
reply.Id = task.taskId reply.NMap = c.nMap reply.NReduce = c.nReduce reply.JobType = task.jobType reply.Filename = task.fileName break } else if task.taskStatus == Processing { if time.Now().Sub(task.startTime) > time.Second*10 { hasNewJob = true
reply.Id = task.taskId reply.NMap = c.nMap reply.NReduce = c.nReduce reply.JobType = task.jobType
reply.Filename = task.fileName
task.startTime = time.Now() break } } else if task.taskStatus == Finished { continue } else { fmt.Errorf("unexcepted branch") } }
if !hasNewJob { reply.JobType = WaitJob } return
}
|
Worker完成一个job时通知Coordinator,Coordinator将对应的task标记为已完成,并判断当前阶段所有任务是否已完成,如果完成了则进行下一阶段的初始化。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| func (c *Coordinator) Report(args *JobFinishArgs, reply *JobFinishReply) error { c.mu.Lock() defer c.mu.Unlock() if c.currentPhase == AllDonePhase { return nil } if args.JobPhase == c.currentPhase { c.tasks[args.Id].taskStatus = Finished if c.IsAllTaskFinish() { if c.currentPhase == MapPhase { c.ReducePhaseInit() return nil } else if c.currentPhase == ReducePhase { c.currentPhase = AllDonePhase return nil } else { fmt.Errorf("report: unexcepted branch") } }
} return nil } func (c *Coordinator) IsAllTaskFinish() bool { allFinish := true for i := 0; i < len(c.tasks); i++ { if c.tasks[i].taskStatus != Finished { allFinish = false break } } return allFinish }
func (c *Coordinator) ReducePhaseInit() { c.currentPhase = ReducePhase c.tasks = make([]Task, c.nReduce) for i := 0; i < c.nReduce; i++ { c.tasks[i].taskId = i c.tasks[i].taskStatus = Idle c.tasks[i].jobType = ReduceJob } }
|
Worker实现
Worker上的一个job对应于一个Coordinator的task(尽管可能有一个或多个超时的job对应于一个task,但只需要最后有一个Worker上的job完成并顺利report了就ok)
Worker收到回复后就根据JobType执行相应的操作,以map阶段为例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
|
func doMapJob(reply *HeartbeatReply, mapf func(string, string) []KeyValue) { fileName := reply.Filename content, err := os.ReadFile(fileName) if err != nil { log.Fatalf("cannot read %v", fileName) } kva := mapf(fileName, string(content)) intermediates := make([][]KeyValue, reply.NReduce) for _, kv := range kva { index := ihash(kv.Key) % reply.NReduce intermediates[index] = append(intermediates[index], kv) } var wg sync.WaitGroup for index, intermediate := range intermediates { wg.Add(1) go func(index int, intermediate []KeyValue) { defer wg.Done() intermediateFilePath := generateMapResultFileName(reply.Id, index) tmpFile, err := os.CreateTemp(".", intermediateFilePath) if err != nil { log.Fatalf("cannot create tmp file %v", intermediateFilePath) } enc := json.NewEncoder(tmpFile) for _, kv := range intermediate { err := enc.Encode(&kv) if err != nil { log.Fatalf("cannot encode json %v", kv.Key) } } if err := os.Rename(tmpFile.Name(), intermediateFilePath); err != nil { fmt.Errorf("cannot replace %q with tempfile %q: %v", intermediateFilePath, tmpFile.Name(), err) } }(index, intermediate) } wg.Wait() reportTask(JobFinishArgs{reply.Id, MapPhase})
}
|
本lab的MapReduce是单机版的,文件其实都存在本地,所以不需要维护中间文件名,只需要在启动时根据mr-x-x的格式读取文件即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| func doReduceJob(reply *HeartbeatReply, reducef func(string, []string) string) { var kva []KeyValue for i := 0; i < reply.NMap; i++ { filePath := generateMapResultFileName(i, reply.Id) file, err := os.Open(filePath) if err != nil { log.Fatalf("cannot open %v", filePath) } dec := json.NewDecoder(file) for { var kv KeyValue if err := dec.Decode(&kv); err != nil { break } kva = append(kva, kv) } file.Close() } results := make(map[string][]string) for _, kv := range kva { results[kv.Key] = append(results[kv.Key], kv.Value) }
reduceFileName := generateReduceResultFileName(reply.Id) tmpFile, err := os.CreateTemp(".", reduceFileName) if err != nil { log.Fatalf("cannot create tmp file %v", reduceFileName) } for key, values := range results { output := reducef(key, values) fmt.Fprintf(tmpFile, "%v %v\n", key, output) } if err := os.Rename(tmpFile.Name(), reduceFileName); err != nil { fmt.Errorf("cannot replace %q with tempfile %q: %v", reduceFileName, tmpFile.Name(), err) } reportTask(JobFinishArgs{reply.Id, ReducePhase}) }
|