Ray

前言 Ray主要是想做一个集成分布式训练、推理和环境模拟于一身,但是彼此又不耦合的分布式框架。经过合理的模块化,每个环节还可以接入不同的系统,例如为训练接入Horovod/torch.distributed,为推理服务在Kubernetes上运行Ray等,让用户可以在一个分布式应用上组合多个库。 Ray更关注的是horizontal scalability和low overhead,当然scalability一般还会要求reliability。Ray的存储是个分布式的内存共享机制,通信基于gRPC。 一个简单的Ray使用示例: Application concepts Task - A remote function invocation. This is a single function invocation that executes on a process different from the caller, and potentially on a different machine. A task can be stateless (a @ray.remote function) or stateful (a method of a @ray.remote class - see Actor below). A task is executed asynchronously with the caller: the .remote() call immediately returns one or more ObjectRefs (futures) that can be used to retrieve the return value(s)....

June 3, 2022 · Yihong Li

动手实现一个Raft

前言 10月份没有更新过博客,一个原因是自己太懒,没有养成写博客的习惯,但主要的原因还是自己太摸,没有什么工作值得记录的。这个月断断续续花了两周时间,把6.824的Lab2给实现了,稍微总结一下水过这个月吧。第一篇博客发完之后我意识到贴代码可能不太合适,因为这是别人学校的作业,但是转念一想不会有什么人会看到我的博客吧(哈哈哈)。无论怎样,之后的Lab还是不要把我的菜鸡代码贴上来了。 6.824的Lab2主要是实现Raft论文里除了configuration change之外的所有内容,包括Leader Election、Log Replication和Snapshot,其中Log Replication包括了减少AppendEntries RPC匹配错误的优化。这篇博客的侧重点更多是在实现方案上的讨论,Raft的核心主要就在这两张精简图里面了,关于Raft的算法本身,请直接看作者的论文:https://www.usenix.org/system/files/conference/atc14/atc14-paper-ongaro.pdf Leader Election 首先,我们需要一个计时器去控制Election Timeout的计时,在Election Timeout被触发时server成为候选人,进入选举流程。如果server接收到leader的请求或者投票给一个候选人时,需要重置计时器,所以计时器还需要随时检查自己是否已经被重置。这种情况下使用channel和信号量都不是很方便,因为阻塞的时候想要重置的话实现起来很麻烦,所以我用的是一个保存在server元数据结构体中的状态变量,在一个循环中“锁-检查状态-释放锁-sleep”的笨办法。 server成为候选人后,需要给其他server发送投票请求,但是如果使用并发发送请求的策略,有时候我们不知道其他server有没有回复,如何统计大多数人的回复呢?这里我在每个发送投票请求的函数中带了一个计数变量的指针,在投票请求回复同意后对这个计数变量进行原子性的自增操作,如果检查到计数变量超过大多数,就进入leader初始化的阶段。在我的实现里,其他server是有“一票否决权”的,被一票否决后server会从候选人变回follower并重置计时器,所以在投票请求回复后还需要检查自己还是不是当前term的候选人。 其他server在决定要不要投票的时候,需要确定对方是不是"at least up-to-date",也就是对方的term必须至少跟自己一样,如果对方的term跟自己一样则最后一个log entry的索引要至少和自己一样,否则就直接一票否决。 server在成为leader之后,需要进行一个no-op,即马上增加一个当前term的空日志并发给其他server,否则会导致存在一致性的隐患。实验中会因为实现了no-op而导致测试不通过,因为部分测试对log的索引顺序有要求,需要魔改一下测试。 我选择的Heartbeat Timeout是50ms,Election Timeout是150ms到300ms,使用助教的并行测试脚本,进程数等于逻辑核数量时测试可以稳定通过。 Log Replication AppendEntries leader发送AppendEntries RPC请求主要是根据对应follower的next index来决定是发送log entry还是仅发送heartbeat,当接收到超过半数follower的确认后,leader就认为这个log entry是可以commit的,便会提交apply并在下一次AppendEntries RPC请求通知其他follower也提交。有时候leader可能因为经历了网络隔离后是落后的,这时follower对leader的AppendEntries RPC请求会回复自己的term,leader收到后会回到follower状态。如果是leader本身接收到了来自更高term的leader的AppendEntries RPC请求,那么他应该直接成为新leader的follower,因为这里隐藏的含义是有超过半数的server在一个更高的term里投出了一个leader。 对于follower,AppendEntries有很多需要注意的地方。如果请求的term比自己大,需要把自己的last log index回复给leader;如果请求的term比自己小,需要把自己的term回复给leader;如果不是自己投出来的leader,忽略请求;如果匹配错误(last log index < prev log index或者prev log index对应的term不对),也返回不成功。写入log时要注意,如果发生了重写,last log index不一定是log slice的最后一个元素,需要进行一些处理来分成rewrite和append两类操作。 在我的实现中,无论是leader还是follower,在commit index发生改变时都会马上提交apply,并且等所有apply都推到apply channel后才继续执行之后的代码。相比于计时器定时apply,状态变化的复杂性会少一点。 在我的实现中,last log term和last log index都是有额外的变量保存的,并且和log保持一致,因为有时候AppendEntries RPC匹配错误之后,如果不作额外标识,很难直接定位到最后一个有效的log entry。不过如果只保存last log index也是可以的。 Persistence 需要持久化的变量如下,每次变量发生变化的时候都要进行持久化。其中offset会在后文Snapshot中使用到。 Log []*LogEntry Offset int Term int VoteFor int LastLogIndex int LastLogTerm int LastApplied int Snapshot snapshot在实验框架中的实现是这样的:由上层服务向raft发起snapshot的请求,raft会收到一个snapshot的数据,raft只需要将它持久化即可。在follower需要得到一个snapshot时,leader将自己持久化的这个snapshot的数据取出来发给follower,follower如果确定这个snapshot是最新的,就将它发送到apply channel,并在上层的applier中再次调用follower的接口实现snapshot的安装。...

October 25, 2021 · Yihong Li

动手实现一个MapReduce

前言 我是一个记忆力不怎么好的人,很多大大小小的经历,过去了之后只会留下一个大概的印象,细节则会随着时间的流逝逐渐淡去。于是从今天开始,我会将一些学习和生活上的心得发在这个博客上,既为了分享也为了记录。虽然我不是一个爱码字的人,我的博客也不会有多少人看——所以非常感激正在看这篇文章的你——但是我也会尽量写得认真一点。 由于读研选择了边缘智能这个方向,而边缘计算又是从云原生的概念中延伸出来的,所以我觉得有必要在第一年对一些分布式系统相关的知识进行补全,MIT6.824是我的第一个计划。MIT6.824这个分布式系统课程有四个广受好评的实验,这里先挖个坑,希望能在一个学期内完成它们并记录下来。当然我的代码写得并不好,大家看看图一乐就好,有什么改进的建议也非常希望能够联系我。 什么是MapReduce MIT6.824第一个实验是实现一个MapReduce,这是谷歌最早的用于处理大规模数据的分布式计算模型,为复杂的计算任务提供了可扩展性和容错性。 对于一个工作,可能是统计单词数量或者统计单词出现位置,MapReduce将它分为Map和Reduce两个任务,Map过程是Map Worker将初始内容按要求处理成一些键值对,并且根据键的不同,将这些键值对分类保存起来,以提供给不同的Reduce Worker,这个分类保存的过程也就是Shuffle;Reduce过程则是将这些键值对进行进一步归并,按照键将所有的值聚集在一起,得到最后的结果。下面的伪代码是MapReduce的一般形式。 map (k1,v1) → list(k2,v2) reduce (k2,list(v2)) → list(v2) 在原论文中,除了阐述了MapReduce的工作过程,还提出了很多实际实现上的优化,如Combiner Function,Backup Task等。这是它的论文链接:https://research.google/pubs/pub62/,在这里就不再细说了。 RPC部分 实验使用Golang作为编程语言,并通过开启不同的进程来模拟不同的节点。而对于模拟节点之间的通信,则使用Unix domain socket的跨进程通信,Golang的RPC库的具体细节就不再本文中赘述了。 接下来分析模拟节点的通信。首先Worker之间并不需要通信,而对于Coordinator和Worker之间的通信,只需要以Coordinator作为RPC服务器,并实现Worker的RPC调用,用于Worker申领工作和Worker完成工作后汇报。接口定义如下,GetTask用于Worker向Coordinator申领工作,MapTaskDone和ReduceTaskDone则是Worker完成工作后向Coordinator汇报。 type GetTaskArgs struct { } type TaskType int var ( TaskTypeOff TaskType = 0 TaskTypeMap TaskType = 1 TaskTypeReduce TaskType = 2 TaskTypeWait TaskType = 3 ) type GetTaskReply struct { JobId string TaskType TaskType TaskId string Content []string NReduce int64 ReduceTaskIndex int64 } type MapTaskDoneArgs struct { MapTaskId string IntermediateFileNames []string } type MapTaskDoneReply struct { } type ReduceTaskDoneArgs struct { ReduceTaskId string OutputFileName string } type ReduceTaskDoneReply struct { } Coordinator 首先关注Coordinator的结构体定义,MapReduce对于每一个新的工作都需要指定Coordinator,所以需要有一个唯一的JobId来标识该工作;同时需要一个状态变量来指示工作现在进行到什么阶段,并保存每个MapTask和ReduceTask的状态;显然这个结构体需要被来自不同的节点修改,所以需要在每次修改时加锁,防止出现Data Race,因此Mutex变量也作为结构体成员保存。...

September 23, 2021 · Yihong Li