6.824 lab1
目录
- 课程内容
- 基本结构
- 模块合并
- RPC 请求
- 部分思路
- 结果
- 解锁大师
- 感言
- 晒一下我的 coordinator 的struct
写lab1已经是几个月前了,但是感觉值得写一篇博客,记录一点coding的思路
课程内容
lab1 网址:https://pdos.csail.mit.edu/6.824/labs/lab-mr.html
用git clone下来后,通过测试脚本来确定最后的结果
基本结构
主要完成两个模块coordinator 和 worker 的代码
- coordinator:调度器,负责通过RPC请求分配任务、确认任务的完成情况
- worker:实际执行器,负责执行mapf和reducef 两类任务
模块合并
对coordinator和worker来说 map任务和reduce任务都极为一致——需要确定当前是否有任务存在、都需要发送心跳确定worker状态、都需要由coordinator来确认任务是否成功。
因此对coordinator和worker来说这两类任务可以抽象成同一个struct,我在里面加了一个 isMap 的参数来区分二者
RPC 请求
Func | 作用 |
---|---|
initWorker | 从 coordinator处获取 reduce任务的数量(worker在产生中间文件时需要此值)、心跳间隔 等等初始化值 |
allocateTask | allocate 旨在申请任务,获取成功会返回任务信息,不成功会返回 Wait指令,假如已经全部跑完,会返回 Exit 指令 |
heartBeat | 接受worker task 的响应(失败/成功/运行中),相对应更新任务信息,coordinator接收到心跳会重置allocate中的超时定时器 |
部分思路
- coordinator将所有任务id放入chan中,在 allocate中获取任务,获取成功时同步开启一个goroutine执行计时器(超时重新入chan)
- 确认map任务全部成功后才能开始reduce任务,最开始是上锁后遍历map status均为running来确认,后面使用 waitGroup,完成一个就wg.Done,最后触发 ctx 的canal,通过 ctx.Done 来确认map任务是否全部完成
- worker 先将文件放在tmp类文件内,通过心跳与coordinator达成一致以后才会rename文件为正式版文件名
结果
单次运行时间 1m22s,连续运行100次无报错
解锁大师
考虑到锁该unlock却没有的窘境,我在所有的unlock地方都用了defer,函数内部则用匿名函数来处理(好消息是,随着程序的后续优化,匿名函数都被我处理掉了)
taskFailed := func() {task.TasksExecResult.T.Lock()defer task.TasksExecResult.T.Unlock()nowStatus, _ := task.TasksExecResult.nolockGet(taskId)if nowStatus == TaskStatusSuccess { // 已经是running, 不用更新logx.Infof("taskId %d is success, dont try again", taskId)return}task.TasksExecResult.nolockSet(taskId, TaskStatusInit, "")task.TasksChan <- taskId // 写回taskId}
感言
- 被mutex折腾了好久,经常被锁了但是不知道是哪里,打了非常多的log
- early_exit 一直没过,逆向研究了一下 ut 脚本,发现得所有任务都结束以后才能发 Exit 指令,而我是map运行完就给map任务
- 我使用心跳一个RPC完成了 同步运行状态、发送成功结果、coodinator确认该结果等多个目标,我觉得这个RPC的作用非常机智
- 用了 context.Done 和 canal 来确定是否结束,非常优雅
晒一下我的 coordinator 的struct
// 记录map或者reduce的任务状态
type TaskStatus struct {Ctx context.Context // 用于快速查看该类型任务是否全部完成T *sync.MutexStatus []stringTaskUuid []stringWg *sync.WaitGroup // 通过wg.Done 和 wg.Wait 确认是否完成, 之后触发canal
}// 对每个小任务的计时器, 用于心跳超时
type TimeoutCon struct {TickerDuration time.DurationTimer *time.TickerStopChan chan struct{}
}// map或者reduce任务的所有成员
type Task struct {TasksChan chan intTasksExecResult *TaskStatusTimeoutCon []*TimeoutCon
}type Coordinator struct {Context context.ContextCanal context.CancelFuncNReduce int // reduce 任务的数量MapTaskSize int // fileNames 分为多少组 - 对应的 taskId: 0~(SplitSize-1)FilesList [][]string // fileNames 拆分后的内容MapTask *Task // map taskReduceTask *Task // reduce taskHeartbeatDuration time.DurationExitPolicy string // cooperator 退出时才退出 / 所有任务都完成就退出
}