前言

我是一个记忆力不怎么好的人,很多大大小小的经历,过去了之后只会留下一个大概的印象,细节则会随着时间的流逝逐渐淡去。于是从今天开始,我会将一些学习和生活上的心得发在这个博客上,既为了分享也为了记录。虽然我不是一个爱码字的人,我的博客也不会有多少人看——所以非常感激正在看这篇文章的你——但是我也会尽量写得认真一点。

由于读研选择了边缘智能这个方向,而边缘计算又是从云原生的概念中延伸出来的,所以我觉得有必要在第一年对一些分布式系统相关的知识进行补全,MIT6.824是我的第一个计划。MIT6.824这个分布式系统课程有四个广受好评的实验,这里先挖个坑,希望能在一个学期内完成它们并记录下来。当然我的代码写得并不好,大家看看图一乐就好,有什么改进的建议也非常希望能够联系我。

什么是MapReduce

MIT6.824第一个实验是实现一个MapReduce,这是谷歌最早的用于处理大规模数据的分布式计算模型,为复杂的计算任务提供了可扩展性和容错性。

对于一个工作,可能是统计单词数量或者统计单词出现位置,MapReduce将它分为Map和Reduce两个任务,Map过程是Map Worker将初始内容按要求处理成一些键值对,并且根据键的不同,将这些键值对分类保存起来,以提供给不同的Reduce Worker,这个分类保存的过程也就是Shuffle;Reduce过程则是将这些键值对进行进一步归并,按照键将所有的值聚集在一起,得到最后的结果。下面的伪代码是MapReduce的一般形式。

map (k1,v1) → list(k2,v2)
reduce (k2,list(v2)) → list(v2)

在原论文中,除了阐述了MapReduce的工作过程,还提出了很多实际实现上的优化,如Combiner Function,Backup Task等。这是它的论文链接:https://research.google/pubs/pub62/,在这里就不再细说了。

RPC部分

实验使用Golang作为编程语言,并通过开启不同的进程来模拟不同的节点。而对于模拟节点之间的通信,则使用Unix domain socket的跨进程通信,Golang的RPC库的具体细节就不再本文中赘述了。

接下来分析模拟节点的通信。首先Worker之间并不需要通信,而对于Coordinator和Worker之间的通信,只需要以Coordinator作为RPC服务器,并实现Worker的RPC调用,用于Worker申领工作和Worker完成工作后汇报。接口定义如下,GetTask用于Worker向Coordinator申领工作,MapTaskDone和ReduceTaskDone则是Worker完成工作后向Coordinator汇报。

type GetTaskArgs struct {
}

type TaskType int

var (
	TaskTypeOff    TaskType = 0
	TaskTypeMap    TaskType = 1
	TaskTypeReduce TaskType = 2
	TaskTypeWait   TaskType = 3
)

type GetTaskReply struct {
	JobId           string
	TaskType        TaskType
	TaskId          string
	Content         []string
	NReduce         int64
	ReduceTaskIndex int64
}

type MapTaskDoneArgs struct {
	MapTaskId             string
	IntermediateFileNames []string
}

type MapTaskDoneReply struct {
}

type ReduceTaskDoneArgs struct {
	ReduceTaskId   string
	OutputFileName string
}

type ReduceTaskDoneReply struct {
}

Coordinator

首先关注Coordinator的结构体定义,MapReduce对于每一个新的工作都需要指定Coordinator,所以需要有一个唯一的JobId来标识该工作;同时需要一个状态变量来指示工作现在进行到什么阶段,并保存每个MapTask和ReduceTask的状态;显然这个结构体需要被来自不同的节点修改,所以需要在每次修改时加锁,防止出现Data Race,因此Mutex变量也作为结构体成员保存。

type Coordinator struct {
	JobId            string
	CoordinatorState CoordinatorState
	MapTasks         []*MapTask
	ReduceTasks      []*ReduceTask
	OutputFileNames  []string
	L                *sync.Mutex
}

type CoordinatorState int64

var (
	CoordinatorStateMapping  CoordinatorState = 1
	CoordinatorStateReducing CoordinatorState = 2
	CoordinatorStateFinished CoordinatorState = 3
)

MapTask和ReduceTask的结构体就是根据需要来定义,同样需要状态变量指示该任务的状态;Id设计成数组的形式,是为了支持容错和Backup Task,这个在之后的代码会体现;记录任务开始时间是用于判断超时任务。

type MapTaskState int64

var (
	MapTaskStateWaiting  MapTaskState = 1
	MapTaskStateRunning  MapTaskState = 2
	MapTaskStateExpire   MapTaskState = 3
	MapTaskStateFinished MapTaskState = 4
)

type MapTask struct {
	MapTaskIds   []string
	Split        string
	MapTaskState MapTaskState
	StartTime    int64
}

type ReduceTaskState int64

var (
	ReduceTaskStateWaiting  ReduceTaskState = 1
	ReduceTaskStateRunning  ReduceTaskState = 2
	ReduceTaskStateExpire   ReduceTaskState = 3
	ReduceTaskStateFinished ReduceTaskState = 4
)

type ReduceTask struct {
	ReduceTaskIds     []string
	ReduceTaskIndex   int64
	IntermediateFiles []string
	OutputFile        string
	ReduceTaskState   ReduceTaskState
	StartTime         int64
}

在一个新的工作到来时,需要为该任务创建一个Coordinator,根据原始输入分成多个split,每个split对应一个MapTask。MapTask和ReduceTask在Coordinator创建时就已经配置完成,ReduceTask根据用户指定的结果文件数目决定,有多少个结果文件就有多少个ReduceTask,每个ReduceTask有对应序号,用于标识该ReduceTask要处理哪些中间结果。

func MakeCoordinator(files []string, nReduce int) *Coordinator {
	c := Coordinator{}
	c.JobId = uuid.NewString()
	c.L = new(sync.Mutex)
	mapTasks := make([]*MapTask, len(files))
	for i, file := range files {
		mapTask := &MapTask{
			Split:        file,
			MapTaskState: MapTaskStateWaiting,
			StartTime:    time.Now().Unix(),
		}
		mapTasks[i] = mapTask
	}
	c.MapTasks = mapTasks

	reduceTasks := make([]*ReduceTask, nReduce)
	for i := 0; i < nReduce; i++ {
		reduceTask := &ReduceTask{
			ReduceTaskIndex: int64(i),
			ReduceTaskState: ReduceTaskStateWaiting,
		}
		reduceTasks[i] = reduceTask
	}
	c.ReduceTasks = reduceTasks
	c.CoordinatorState = CoordinatorStateMapping
	c.server()
	go c.coordinatorMoniter()
	return &c
}

创建的Coordinator函数返回该Coordinator,让上层函数循环调用c.Done()检查工作是否完成。

func (c *Coordinator) Done() bool {
	c.L.Lock()
	ok := c.CoordinatorState == CoordinatorStateFinished
	c.L.Unlock()
	return ok
}

在Coordinator创建好之后,需要启动一个监控线程对Coordinator当前状态进行监控,这里使用轮询的方法,检查当前是否完成全部Task以及Task是否超时。任务出现超时会导致该任务被分派给多于一个Worker,但这并不会有什么问题。因为对于MapTask,中间结果文件是只属于Worker本地的,而对于ReduceTask,利用操作系统重命名的原子性可以保证最终输出文件的唯一性。因此,MapReduce是高容错的。

func (c *Coordinator) coordinatorMoniter() {
	c.mapMoniter()
	c.reduceMoniter()
}

var ExpireTime int64 = 3

func (c *Coordinator) mapMoniter() {
	for {
		rolling := false
		c.L.Lock()
		for _, mapTask := range c.MapTasks {
			if mapTask.MapTaskState != MapTaskStateFinished {
				rolling = true
			}
			if mapTask.MapTaskState == MapTaskStateRunning &&
				time.Now().Unix()-mapTask.StartTime > ExpireTime {
				mapTask.MapTaskState = MapTaskStateExpire
			}
		}
		c.L.Unlock()
		if !rolling {
			fmt.Printf("mapMonitor: map finish.\n")
			c.L.Lock()
			c.CoordinatorState = CoordinatorStateReducing
			c.L.Unlock()
			break
		}
		fmt.Printf("mapMonitor: map still running.\n")
		time.Sleep(10 * time.Second)
	}
}

func (c *Coordinator) reduceMoniter() {
	for {
		rolling := false
		c.L.Lock()
		for _, reduceTask := range c.ReduceTasks {
			if reduceTask.ReduceTaskState != ReduceTaskStateFinished {
				rolling = true
			}
			if reduceTask.ReduceTaskState == ReduceTaskStateRunning &&
				time.Now().Unix()-reduceTask.StartTime > ExpireTime {
				reduceTask.ReduceTaskState = ReduceTaskStateExpire
			}
		}
		c.L.Unlock()
		if !rolling {
			fmt.Printf("reduceMonitor: reduce finish.\n")
			c.L.Lock()
			c.CoordinatorState = CoordinatorStateFinished
			c.L.Unlock()
			break
		}
		fmt.Printf("reduceMonitor: reduce still running.\n")
		time.Sleep(10 * time.Second)
	}
}

接下来是Worker申领工作的RPC调用。

在Map阶段,每当一个Worker申领任务,都会在一个状态为未启动的或者已经超时的任务中选择一个,并在MapTaskIds中添加一个Id。Coordinator将这个Id随着输入数据等其他信息发给申领任务的Worker,将来Worker提交任务时将带有这个Id,以便Coordinator能够将MapTask的中间结果记录到对应位置。在Reduce阶段同样有这样的设计。

在Map阶段,因为ReduceTask的数目等于MapTask输出的中间结果文件数目,所以需要作为参数传给Worker;而在Reduce阶段,ReduceTaskIndex决定了分配到该ReduceTask的Worker要使用所有MapTask输出的第几个中间结果文件,所以需要作为参数传给Worker,例如如果某个ReduceTask的ReduceTaskIndex是0,则分配到该ReduceTask的Worker需要读取所有MapTask的第1个中间结果文件。

func (c *Coordinator) GetTask(args *GetTaskArgs, reply *GetTaskReply) error {
	c.L.Lock()
	defer c.L.Unlock()
	if c.CoordinatorState == CoordinatorStateMapping {
		for _, mapTask := range c.MapTasks {
			if mapTask.MapTaskState == MapTaskStateWaiting ||
				mapTask.MapTaskState == MapTaskStateExpire {
				reply.JobId = c.JobId
				reply.TaskId = uuid.NewString()
				reply.TaskType = TaskTypeMap
				reply.NReduce = int64(len(c.ReduceTasks))
				reply.Content = []string{mapTask.Split}
				mapTask.MapTaskIds = append(mapTask.MapTaskIds, reply.TaskId)
				mapTask.MapTaskState = MapTaskStateRunning
				mapTask.StartTime = time.Now().Unix()
				return nil
			}
		}
		reply.TaskType = TaskTypeWait
		return nil
	} else if c.CoordinatorState == CoordinatorStateReducing {
		for _, reduceTask := range c.ReduceTasks {
			if reduceTask.ReduceTaskState == ReduceTaskStateWaiting ||
				reduceTask.ReduceTaskState == ReduceTaskStateExpire {
				reply.JobId = c.JobId
				reply.TaskId = uuid.NewString()
				reply.TaskType = TaskTypeReduce
				reply.Content = reduceTask.IntermediateFiles
				reply.ReduceTaskIndex = reduceTask.ReduceTaskIndex
				reduceTask.ReduceTaskIds = append(reduceTask.ReduceTaskIds, reply.TaskId)
				reduceTask.ReduceTaskState = ReduceTaskStateRunning
				reduceTask.StartTime = time.Now().Unix()
				return nil
			}
		}
		reply.TaskType = TaskTypeWait
		return nil
	} else if c.CoordinatorState == CoordinatorStateFinished {
		reply.TaskType = TaskTypeOff
		return nil
	}
	return nil
}

Worker完成工作后汇报的RPC调用只要找到对应的Task,然后保存结果信息并修改状态即可。对于MapTask,保存的结果信息是中间结果文件列表;对于ReduceTask,保存的结果信息是最终输出文件名,但是为了跑测试脚本,最终输出文件名在Worker中被固定为mr-out-i,i是ReduceTaskIndex。

func (c *Coordinator) MapTaskDone(args *MapTaskDoneArgs, reply *MapTaskDoneReply) error {
	c.L.Lock()
	defer c.L.Unlock()
	for _, mapTask := range c.MapTasks {
		for _, id := range mapTask.MapTaskIds {
			if args.MapTaskId == id {
				mapTask.MapTaskState = MapTaskStateFinished
				for i, n := range args.IntermediateFileNames {
					c.ReduceTasks[i].IntermediateFiles = append(c.ReduceTasks[i].IntermediateFiles, n)
				}
				return nil
			}
		}
	}
	return fmt.Errorf("who are you?")
}

func (c *Coordinator) ReduceTaskDone(args *ReduceTaskDoneArgs, reply *ReduceTaskDoneReply) error {
	c.L.Lock()
	defer c.L.Unlock()
	for _, reduceTask := range c.ReduceTasks {
		for _, id := range reduceTask.ReduceTaskIds {
			if args.ReduceTaskId == id {
				reduceTask.ReduceTaskState = ReduceTaskStateFinished
				c.OutputFileNames = append(c.OutputFileNames, args.OutputFileName)
				return nil
			}
		}
	}
	return fmt.Errorf("who are you?")
}

Worker

Worker并不需要干一些协调性的工作,只要像只驴一样蒙着眼一直干活就好了。

对于一个Worker,它所要做的只是不断申领任务,然后根据任务类型选择完成他们的函数,并在任务完成后告知Coordinator。

func Worker(mapf func(string, string) []KeyValue,
	reducef func(string, []string) string) {
	for {
		reply, ok := CallGetTask()
		if !ok {
			fmt.Printf("CallGetWork wrong.")
			return
		}
		switch reply.TaskType {
		case TaskTypeOff:
			fmt.Printf("Work done.")
			return
		case TaskTypeMap:
			intermediateFileNames, ok := doMap(reply.JobId, reply.TaskId, reply.Content[0], reply.NReduce, mapf)
			if !ok {
				fmt.Printf("mapf wrong.")
				break
			}
			fmt.Printf("doMap done. Calling MapTaskDone.\n")
			_, ok = CallMapTaskDone(reply.TaskId, intermediateFileNames)
			if !ok {
				fmt.Printf("CallMapTaskDone wrong.")
			}
		case TaskTypeReduce:
			outputFileName, ok := doReduce(reply.JobId, reply.TaskId, reply.ReduceTaskIndex, reply.Content, reducef)
			if !ok {
				fmt.Printf("reducef wrong.\n")
				break
			}
			fmt.Printf("doReduce done. Calling ReduceTaskDone.\n")
			_, ok = CallReduceTaskDone(reply.TaskId, outputFileName)
			if !ok {
				fmt.Printf("CallReduceTaskDone wrong.")
			}
		case TaskTypeWait:
			fmt.Printf("No work to do. Worker waits.\n")
			time.Sleep(10 * time.Second)
		default:
			fmt.Printf("WorkType wrong.")
		}
	}
}

func CallGetTask() (*GetTaskReply, bool) {
	args := &GetTaskArgs{}
	reply := &GetTaskReply{}
	ok := call("Coordinator.GetTask", &args, &reply)
	if !ok {
		return nil, false
	}
	return reply, true
}

执行MapTask的Worker,具体是创建相当于ReduceTask数量的中间结果文件,将输入文件读取并经过Map Function处理后得到一些键值对,根据对键哈希取模的方法决定该保存到哪个中间结果文件。

//
// Map functions return a slice of KeyValue.
//
type KeyValue struct {
	Key   string
	Value string
}

//
// use ihash(key) % NReduce to choose the reduce
// task number for each KeyValue emitted by Map.
//
func ihash(key string) int {
	h := fnv.New32a()
	h.Write([]byte(key))
	return int(h.Sum32() & 0x7fffffff)
}

func doMap(jobId string, mapTaskId string, splitName string, nReduce int64,
	mapf func(string, string) []KeyValue) ([]string, bool) {
	intermediateFileNames := make([]string, nReduce)
	for i := 0; i < int(nReduce); i++ {
		intermediateFileNames[i] = fmt.Sprintf(".intermediate-%s-%v-%s.txt", jobId, i, mapTaskId)
	}
	split, err := os.Open(splitName)
	if err != nil {
		log.Fatalf("cannot open %v", splitName)
	}
	content, err := ioutil.ReadAll(split)
	if err != nil {
		log.Fatalf("cannot read %v", splitName)
	}
	split.Close()
	kva := mapf(splitName, string(content))

	fmt.Println("mapf done.")

	intermediateFiles := make([]*os.File, 0)
	for _, fileName := range intermediateFileNames {
		file, err := os.Create(fileName)
		if err != nil {
			fmt.Printf("%+v", err)
			return nil, false
		}
		intermediateFiles = append(intermediateFiles, file)
	}

	for _, kv := range kva {
		fmt.Fprintf(intermediateFiles[ihash(kv.Key)%int(nReduce)], "%v %v\n", kv.Key, kv.Value)
	}

	for _, file := range intermediateFiles {
		file.Close()
	}

	return intermediateFileNames, true
}

func CallMapTaskDone(mapTaskId string, intermediateFileNames []string) (*MapTaskDoneReply, bool) {
	args := &MapTaskDoneArgs{}
	args.MapTaskId = mapTaskId
	args.IntermediateFileNames = intermediateFileNames
	reply := &MapTaskDoneReply{}
	ok := call("Coordinator.MapTaskDone", &args, &reply)
	if !ok {
		return nil, false
	}
	return reply, true
}

执行ReduceTask的Worker,具体是读取ReduceTask对应的中间结果文件,处理后写到临时输出文件中,然后重命名为最终输出文件。如果最终输出文件已存在,那么操作系统会有一定的策略来处理这种情况,从而保证该ReduceTask的最终输出文件只有一个。

func doReduce(jobId string, reduceTaskId string, index int64, intermidiateFileNames []string,
	reducef func(string, []string) string) (string, bool) {
	kvs := make(map[string][]string)
	for _, inintermidiateFileName := range intermidiateFileNames {
		inintermidiateFile, err := os.Open(inintermidiateFileName)
		if err != nil {
			log.Fatalf("cannot open %v", inintermidiateFileName)
		}
		content, err := ioutil.ReadAll(inintermidiateFile)
		if err != nil {
			log.Fatalf("cannot read %v", inintermidiateFileName)
		}
		inintermidiateFile.Close()
		lines := strings.Split(string(content), "\n")
		lines = lines[:len(lines)-1]
		for _, l := range lines {
			kv := strings.Split(l, " ")
			if _, ok := kvs[kv[0]]; ok {
				kvs[kv[0]] = append(kvs[kv[0]], kv[1])
			} else {
				kvs[kv[0]] = make([]string, 0)
				kvs[kv[0]] = append(kvs[kv[0]], kv[1])
			}
		}
	}

	output := ""
	for k, vs := range kvs {
		o := reducef(k, vs)
		output = fmt.Sprintf("%s%v %v\n", output, k, o)
	}

	outputFileName := fmt.Sprintf("mr-out-%v.%s.tmp", index, reduceTaskId)
	outputFile, err := os.Create(outputFileName)
	if err != nil {
		fmt.Printf("%+v", err)
		return "", false
	}
	fmt.Fprintf(outputFile, "%s", output)
	outputFile.Close()
	os.Rename(outputFileName, fmt.Sprintf("mr-out-%v", index))
	return outputFileName, true
}

func CallReduceTaskDone(reduceTaskId string, outputFileName string) (*ReduceTaskDoneReply, bool) {
	args := &ReduceTaskDoneArgs{}
	args.ReduceTaskId = reduceTaskId
	args.OutputFileName = outputFileName
	reply := &ReduceTaskDoneReply{}
	ok := call("Coordinator.ReduceTaskDone", &args, &reply)
	if !ok {
		return nil, false
	}
	return reply, true
}

后记

到这里,一个不考虑性能勉强能用的MapReduce就诞生了,我的第一篇博客也诞生了。因为写代码之前考虑了很多东西,导致代码里面有很多冗余的设计,如果你发现一些结构体的成员没有作用,那么你很可能是对的。😝