前言
我是一个记忆力不怎么好的人,很多大大小小的经历,过去了之后只会留下一个大概的印象,细节则会随着时间的流逝逐渐淡去。于是从今天开始,我会将一些学习和生活上的心得发在这个博客上,既为了分享也为了记录。虽然我不是一个爱码字的人,我的博客也不会有多少人看——所以非常感激正在看这篇文章的你——但是我也会尽量写得认真一点。
由于读研选择了边缘智能这个方向,而边缘计算又是从云原生的概念中延伸出来的,所以我觉得有必要在第一年对一些分布式系统相关的知识进行补全,MIT6.824是我的第一个计划。MIT6.824这个分布式系统课程有四个广受好评的实验,这里先挖个坑,希望能在一个学期内完成它们并记录下来。当然我的代码写得并不好,大家看看图一乐就好,有什么改进的建议也非常希望能够联系我。
什么是MapReduce
MIT6.824第一个实验是实现一个MapReduce,这是谷歌最早的用于处理大规模数据的分布式计算模型,为复杂的计算任务提供了可扩展性和容错性。
对于一个工作,可能是统计单词数量或者统计单词出现位置,MapReduce将它分为Map和Reduce两个任务,Map过程是Map Worker将初始内容按要求处理成一些键值对,并且根据键的不同,将这些键值对分类保存起来,以提供给不同的Reduce Worker,这个分类保存的过程也就是Shuffle;Reduce过程则是将这些键值对进行进一步归并,按照键将所有的值聚集在一起,得到最后的结果。下面的伪代码是MapReduce的一般形式。
map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)
在原论文中,除了阐述了MapReduce的工作过程,还提出了很多实际实现上的优化,如Combiner Function,Backup Task等。这是它的论文链接:https://research.google/pubs/pub62/,在这里就不再细说了。
RPC部分
实验使用Golang作为编程语言,并通过开启不同的进程来模拟不同的节点。而对于模拟节点之间的通信,则使用Unix domain socket的跨进程通信,Golang的RPC库的具体细节就不再本文中赘述了。
接下来分析模拟节点的通信。首先Worker之间并不需要通信,而对于Coordinator和Worker之间的通信,只需要以Coordinator作为RPC服务器,并实现Worker的RPC调用,用于Worker申领工作和Worker完成工作后汇报。接口定义如下,GetTask用于Worker向Coordinator申领工作,MapTaskDone和ReduceTaskDone则是Worker完成工作后向Coordinator汇报。
type GetTaskArgs struct {
}
type TaskType int
var (
TaskTypeOff TaskType = 0
TaskTypeMap TaskType = 1
TaskTypeReduce TaskType = 2
TaskTypeWait TaskType = 3
)
type GetTaskReply struct {
JobId string
TaskType TaskType
TaskId string
Content []string
NReduce int64
ReduceTaskIndex int64
}
type MapTaskDoneArgs struct {
MapTaskId string
IntermediateFileNames []string
}
type MapTaskDoneReply struct {
}
type ReduceTaskDoneArgs struct {
ReduceTaskId string
OutputFileName string
}
type ReduceTaskDoneReply struct {
}
Coordinator
首先关注Coordinator的结构体定义,MapReduce对于每一个新的工作都需要指定Coordinator,所以需要有一个唯一的JobId来标识该工作;同时需要一个状态变量来指示工作现在进行到什么阶段,并保存每个MapTask和ReduceTask的状态;显然这个结构体需要被来自不同的节点修改,所以需要在每次修改时加锁,防止出现Data Race,因此Mutex变量也作为结构体成员保存。
type Coordinator struct {
JobId string
CoordinatorState CoordinatorState
MapTasks []*MapTask
ReduceTasks []*ReduceTask
OutputFileNames []string
L *sync.Mutex
}
type CoordinatorState int64
var (
CoordinatorStateMapping CoordinatorState = 1
CoordinatorStateReducing CoordinatorState = 2
CoordinatorStateFinished CoordinatorState = 3
)
MapTask和ReduceTask的结构体就是根据需要来定义,同样需要状态变量指示该任务的状态;Id设计成数组的形式,是为了支持容错和Backup Task,这个在之后的代码会体现;记录任务开始时间是用于判断超时任务。
type MapTaskState int64
var (
MapTaskStateWaiting MapTaskState = 1
MapTaskStateRunning MapTaskState = 2
MapTaskStateExpire MapTaskState = 3
MapTaskStateFinished MapTaskState = 4
)
type MapTask struct {
MapTaskIds []string
Split string
MapTaskState MapTaskState
StartTime int64
}
type ReduceTaskState int64
var (
ReduceTaskStateWaiting ReduceTaskState = 1
ReduceTaskStateRunning ReduceTaskState = 2
ReduceTaskStateExpire ReduceTaskState = 3
ReduceTaskStateFinished ReduceTaskState = 4
)
type ReduceTask struct {
ReduceTaskIds []string
ReduceTaskIndex int64
IntermediateFiles []string
OutputFile string
ReduceTaskState ReduceTaskState
StartTime int64
}
在一个新的工作到来时,需要为该任务创建一个Coordinator,根据原始输入分成多个split,每个split对应一个MapTask。MapTask和ReduceTask在Coordinator创建时就已经配置完成,ReduceTask根据用户指定的结果文件数目决定,有多少个结果文件就有多少个ReduceTask,每个ReduceTask有对应序号,用于标识该ReduceTask要处理哪些中间结果。
func MakeCoordinator(files []string, nReduce int) *Coordinator {
c := Coordinator{}
c.JobId = uuid.NewString()
c.L = new(sync.Mutex)
mapTasks := make([]*MapTask, len(files))
for i, file := range files {
mapTask := &MapTask{
Split: file,
MapTaskState: MapTaskStateWaiting,
StartTime: time.Now().Unix(),
}
mapTasks[i] = mapTask
}
c.MapTasks = mapTasks
reduceTasks := make([]*ReduceTask, nReduce)
for i := 0; i < nReduce; i++ {
reduceTask := &ReduceTask{
ReduceTaskIndex: int64(i),
ReduceTaskState: ReduceTaskStateWaiting,
}
reduceTasks[i] = reduceTask
}
c.ReduceTasks = reduceTasks
c.CoordinatorState = CoordinatorStateMapping
c.server()
go c.coordinatorMoniter()
return &c
}
创建的Coordinator函数返回该Coordinator,让上层函数循环调用c.Done()检查工作是否完成。
func (c *Coordinator) Done() bool {
c.L.Lock()
ok := c.CoordinatorState == CoordinatorStateFinished
c.L.Unlock()
return ok
}
在Coordinator创建好之后,需要启动一个监控线程对Coordinator当前状态进行监控,这里使用轮询的方法,检查当前是否完成全部Task以及Task是否超时。任务出现超时会导致该任务被分派给多于一个Worker,但这并不会有什么问题。因为对于MapTask,中间结果文件是只属于Worker本地的,而对于ReduceTask,利用操作系统重命名的原子性可以保证最终输出文件的唯一性。因此,MapReduce是高容错的。
func (c *Coordinator) coordinatorMoniter() {
c.mapMoniter()
c.reduceMoniter()
}
var ExpireTime int64 = 3
func (c *Coordinator) mapMoniter() {
for {
rolling := false
c.L.Lock()
for _, mapTask := range c.MapTasks {
if mapTask.MapTaskState != MapTaskStateFinished {
rolling = true
}
if mapTask.MapTaskState == MapTaskStateRunning &&
time.Now().Unix()-mapTask.StartTime > ExpireTime {
mapTask.MapTaskState = MapTaskStateExpire
}
}
c.L.Unlock()
if !rolling {
fmt.Printf("mapMonitor: map finish.\n")
c.L.Lock()
c.CoordinatorState = CoordinatorStateReducing
c.L.Unlock()
break
}
fmt.Printf("mapMonitor: map still running.\n")
time.Sleep(10 * time.Second)
}
}
func (c *Coordinator) reduceMoniter() {
for {
rolling := false
c.L.Lock()
for _, reduceTask := range c.ReduceTasks {
if reduceTask.ReduceTaskState != ReduceTaskStateFinished {
rolling = true
}
if reduceTask.ReduceTaskState == ReduceTaskStateRunning &&
time.Now().Unix()-reduceTask.StartTime > ExpireTime {
reduceTask.ReduceTaskState = ReduceTaskStateExpire
}
}
c.L.Unlock()
if !rolling {
fmt.Printf("reduceMonitor: reduce finish.\n")
c.L.Lock()
c.CoordinatorState = CoordinatorStateFinished
c.L.Unlock()
break
}
fmt.Printf("reduceMonitor: reduce still running.\n")
time.Sleep(10 * time.Second)
}
}
接下来是Worker申领工作的RPC调用。
在Map阶段,每当一个Worker申领任务,都会在一个状态为未启动的或者已经超时的任务中选择一个,并在MapTaskIds中添加一个Id。Coordinator将这个Id随着输入数据等其他信息发给申领任务的Worker,将来Worker提交任务时将带有这个Id,以便Coordinator能够将MapTask的中间结果记录到对应位置。在Reduce阶段同样有这样的设计。
在Map阶段,因为ReduceTask的数目等于MapTask输出的中间结果文件数目,所以需要作为参数传给Worker;而在Reduce阶段,ReduceTaskIndex决定了分配到该ReduceTask的Worker要使用所有MapTask输出的第几个中间结果文件,所以需要作为参数传给Worker,例如如果某个ReduceTask的ReduceTaskIndex是0,则分配到该ReduceTask的Worker需要读取所有MapTask的第1个中间结果文件。
func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
c.L.Lock()
defer c.L.Unlock()
if c.CoordinatorState == CoordinatorStateMapping {
for _, mapTask := range c.MapTasks {
if mapTask.MapTaskState == MapTaskStateWaiting ||
mapTask.MapTaskState == MapTaskStateExpire {
reply.JobId = c.JobId
reply.TaskId = uuid.NewString()
reply.TaskType = TaskTypeMap
reply.NReduce = int64(len(c.ReduceTasks))
reply.Content = []string{mapTask.Split}
mapTask.MapTaskIds = append(mapTask.MapTaskIds, reply.TaskId)
mapTask.MapTaskState = MapTaskStateRunning
mapTask.StartTime = time.Now().Unix()
return nil
}
}
reply.TaskType = TaskTypeWait
return nil
} else if c.CoordinatorState == CoordinatorStateReducing {
for _, reduceTask := range c.ReduceTasks {
if reduceTask.ReduceTaskState == ReduceTaskStateWaiting ||
reduceTask.ReduceTaskState == ReduceTaskStateExpire {
reply.JobId = c.JobId
reply.TaskId = uuid.NewString()
reply.TaskType = TaskTypeReduce
reply.Content = reduceTask.IntermediateFiles
reply.ReduceTaskIndex = reduceTask.ReduceTaskIndex
reduceTask.ReduceTaskIds = append(reduceTask.ReduceTaskIds, reply.TaskId)
reduceTask.ReduceTaskState = ReduceTaskStateRunning
reduceTask.StartTime = time.Now().Unix()
return nil
}
}
reply.TaskType = TaskTypeWait
return nil
} else if c.CoordinatorState == CoordinatorStateFinished {
reply.TaskType = TaskTypeOff
return nil
}
return nil
}
Worker完成工作后汇报的RPC调用只要找到对应的Task,然后保存结果信息并修改状态即可。对于MapTask,保存的结果信息是中间结果文件列表;对于ReduceTask,保存的结果信息是最终输出文件名,但是为了跑测试脚本,最终输出文件名在Worker中被固定为mr-out-i,i是ReduceTaskIndex。
func (c *Coordinator) MapTaskDone(args *MapTaskDoneArgs, reply *MapTaskDoneReply) error {
c.L.Lock()
defer c.L.Unlock()
for _, mapTask := range c.MapTasks {
for _, id := range mapTask.MapTaskIds {
if args.MapTaskId == id {
mapTask.MapTaskState = MapTaskStateFinished
for i, n := range args.IntermediateFileNames {
c.ReduceTasks[i].IntermediateFiles = append(c.ReduceTasks[i].IntermediateFiles, n)
}
return nil
}
}
}
return fmt.Errorf("who are you?")
}
func (c *Coordinator) ReduceTaskDone(args *ReduceTaskDoneArgs, reply *ReduceTaskDoneReply) error {
c.L.Lock()
defer c.L.Unlock()
for _, reduceTask := range c.ReduceTasks {
for _, id := range reduceTask.ReduceTaskIds {
if args.ReduceTaskId == id {
reduceTask.ReduceTaskState = ReduceTaskStateFinished
c.OutputFileNames = append(c.OutputFileNames, args.OutputFileName)
return nil
}
}
}
return fmt.Errorf("who are you?")
}
Worker
Worker并不需要干一些协调性的工作,只要像只驴一样蒙着眼一直干活就好了。
对于一个Worker,它所要做的只是不断申领任务,然后根据任务类型选择完成他们的函数,并在任务完成后告知Coordinator。
func Worker(mapf func(string, string) []KeyValue,
reducef func(string, []string) string) {
for {
reply, ok := CallGetTask()
if !ok {
fmt.Printf("CallGetWork wrong.")
return
}
switch reply.TaskType {
case TaskTypeOff:
fmt.Printf("Work done.")
return
case TaskTypeMap:
intermediateFileNames, ok := doMap(reply.JobId, reply.TaskId, reply.Content[0], reply.NReduce, mapf)
if !ok {
fmt.Printf("mapf wrong.")
break
}
fmt.Printf("doMap done. Calling MapTaskDone.\n")
_, ok = CallMapTaskDone(reply.TaskId, intermediateFileNames)
if !ok {
fmt.Printf("CallMapTaskDone wrong.")
}
case TaskTypeReduce:
outputFileName, ok := doReduce(reply.JobId, reply.TaskId, reply.ReduceTaskIndex, reply.Content, reducef)
if !ok {
fmt.Printf("reducef wrong.\n")
break
}
fmt.Printf("doReduce done. Calling ReduceTaskDone.\n")
_, ok = CallReduceTaskDone(reply.TaskId, outputFileName)
if !ok {
fmt.Printf("CallReduceTaskDone wrong.")
}
case TaskTypeWait:
fmt.Printf("No work to do. Worker waits.\n")
time.Sleep(10 * time.Second)
default:
fmt.Printf("WorkType wrong.")
}
}
}
func CallGetTask() (*GetTaskReply, bool) {
args := &GetTaskArgs{}
reply := &GetTaskReply{}
ok := call("Coordinator.GetTask", &args, &reply)
if !ok {
return nil, false
}
return reply, true
}
执行MapTask的Worker,具体是创建相当于ReduceTask数量的中间结果文件,将输入文件读取并经过Map Function处理后得到一些键值对,根据对键哈希取模的方法决定该保存到哪个中间结果文件。
//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
Key string
Value string
}
//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
h := fnv.New32a()
h.Write([]byte(key))
return int(h.Sum32() & 0x7fffffff)
}
func doMap(jobId string, mapTaskId string, splitName string, nReduce int64,
mapf func(string, string) []KeyValue) ([]string, bool) {
intermediateFileNames := make([]string, nReduce)
for i := 0; i < int(nReduce); i++ {
intermediateFileNames[i] = fmt.Sprintf(".intermediate-%s-%v-%s.txt", jobId, i, mapTaskId)
}
split, err := os.Open(splitName)
if err != nil {
log.Fatalf("cannot open %v", splitName)
}
content, err := ioutil.ReadAll(split)
if err != nil {
log.Fatalf("cannot read %v", splitName)
}
split.Close()
kva := mapf(splitName, string(content))
fmt.Println("mapf done.")
intermediateFiles := make([]*os.File, 0)
for _, fileName := range intermediateFileNames {
file, err := os.Create(fileName)
if err != nil {
fmt.Printf("%+v", err)
return nil, false
}
intermediateFiles = append(intermediateFiles, file)
}
for _, kv := range kva {
fmt.Fprintf(intermediateFiles[ihash(kv.Key)%int(nReduce)], "%v %v\n", kv.Key, kv.Value)
}
for _, file := range intermediateFiles {
file.Close()
}
return intermediateFileNames, true
}
func CallMapTaskDone(mapTaskId string, intermediateFileNames []string) (*MapTaskDoneReply, bool) {
args := &MapTaskDoneArgs{}
args.MapTaskId = mapTaskId
args.IntermediateFileNames = intermediateFileNames
reply := &MapTaskDoneReply{}
ok := call("Coordinator.MapTaskDone", &args, &reply)
if !ok {
return nil, false
}
return reply, true
}
执行ReduceTask的Worker,具体是读取ReduceTask对应的中间结果文件,处理后写到临时输出文件中,然后重命名为最终输出文件。如果最终输出文件已存在,那么操作系统会有一定的策略来处理这种情况,从而保证该ReduceTask的最终输出文件只有一个。
func doReduce(jobId string, reduceTaskId string, index int64, intermidiateFileNames []string,
reducef func(string, []string) string) (string, bool) {
kvs := make(map[string][]string)
for _, inintermidiateFileName := range intermidiateFileNames {
inintermidiateFile, err := os.Open(inintermidiateFileName)
if err != nil {
log.Fatalf("cannot open %v", inintermidiateFileName)
}
content, err := ioutil.ReadAll(inintermidiateFile)
if err != nil {
log.Fatalf("cannot read %v", inintermidiateFileName)
}
inintermidiateFile.Close()
lines := strings.Split(string(content), "\n")
lines = lines[:len(lines)-1]
for _, l := range lines {
kv := strings.Split(l, " ")
if _, ok := kvs[kv[0]]; ok {
kvs[kv[0]] = append(kvs[kv[0]], kv[1])
} else {
kvs[kv[0]] = make([]string, 0)
kvs[kv[0]] = append(kvs[kv[0]], kv[1])
}
}
}
output := ""
for k, vs := range kvs {
o := reducef(k, vs)
output = fmt.Sprintf("%s%v %v\n", output, k, o)
}
outputFileName := fmt.Sprintf("mr-out-%v.%s.tmp", index, reduceTaskId)
outputFile, err := os.Create(outputFileName)
if err != nil {
fmt.Printf("%+v", err)
return "", false
}
fmt.Fprintf(outputFile, "%s", output)
outputFile.Close()
os.Rename(outputFileName, fmt.Sprintf("mr-out-%v", index))
return outputFileName, true
}
func CallReduceTaskDone(reduceTaskId string, outputFileName string) (*ReduceTaskDoneReply, bool) {
args := &ReduceTaskDoneArgs{}
args.ReduceTaskId = reduceTaskId
args.OutputFileName = outputFileName
reply := &ReduceTaskDoneReply{}
ok := call("Coordinator.ReduceTaskDone", &args, &reply)
if !ok {
return nil, false
}
return reply, true
}
后记
到这里,一个不考虑性能勉强能用的MapReduce就诞生了,我的第一篇博客也诞生了。因为写代码之前考虑了很多东西,导致代码里面有很多冗余的设计,如果你发现一些结构体的成员没有作用,那么你很可能是对的。😝