Scaling Distributed Machine Learning with In-Network Aggregation

这是一个将programmable switch当成一个分布式机器学习的parameter server,从而实现line rate的梯度聚集的工作。它是第一个提出将programmable switch用在分布式机器学习的梯度聚集上的工作,引用量也比较多。和parameter server相比,可以避免end-host processing,达到“sub-RTT” latency;和allreduce相比,可以将传输数据量减少大约2倍。和一些梯度压缩的技术相比也不一定慢,同时还有无损梯度聚集的优点。 它有三个创新点: 为了适应programmable switch的弱算力,文章提出将参数的更新分成小块,从而可以被programmable switch流水线处理。这个严格来说可能不算创新,而是一种adaptation。逻辑上的设计都是非常简单的。 这个算法虽然很简单,但是其实很巧妙。将需要聚集的分块后,worker和switch的分块是一个有序的直接相联映射,它隐含了多个worker之间的共识:收到switch的返回就说明梯度聚集完成,并且下一个需要聚集的分块的位置也是确定且一致的。 The coordination is implicit because the mapping between model updates, slots, and packets is deterministic. 为了保证worker之间的同步,以及检测和恢复丢包的问题,控制面需要能够区分是回传正确结果时的丢包还是worker传数据的时的丢包。文章的解决方案是保存两个switch状态:上一次slot里面的结果的shadow copy,以及当前的slot接收到的worker发来的数据。这个机制只能应对worker落后一个chunk的情况,如果worker持续掉线的话是会阻塞的,而分布式机器学习也不会像一般的分布式系统那样考虑太多的可用性,就简单阻塞住就行。它给的switch端算法逻辑9-17行好像有点问题,大概理解就行。 为了适应programmable switch不支持浮点运算,文章提出在switch上做定点数加法,然后在worker端做scaling。worker首先要对这个scaling的幂值达成共识,文章用的方法是在发一个block的时候带上下一个block的梯度的最大幂值,然后在switch返回的时候带上全局最大的幂值给每个worker。第一个block的幂值怎么办文章没说。 implementation部分还有一些P4和RDMA的具体实现的讨论,但我暂时不太懂,这部分比较复杂,可能实际用到的时候才有动力去学。extention部分讨论了这个框架可以怎样扩展到多机架(跨交换机)的场景,引入拥塞控制模块,不同的硬件设备以及多租户的集群环境。

November 12, 2022 · Yihong Li

In-network aggregation for shared machine learning clusters

这篇文章认为in-network aggregation对减少训练时间的提升很有限,它的作用应该在于减少在一个共享ML集群中的网络带宽的使用。 文章refer了另一篇文章An In-Network Architecture for Accelerating Shared-Memory Multiprocessor Collectives的分析,in-network aggregation对ring allreduce的理论最大提升只有2倍,考虑ring allreduce分为scatter-reduce和Allgather两个阶段,每个都需要传输和接收一次和设备数量n成比例的信息,in-network aggregation可以看成一个超快的parameter server,可以做到需要一个阶段,所以理想是2倍,现实往往还没有2倍。文章构造了很多communication bound的场景去说明,即使通信被优化,实际加速效果也没有那么好。 为了解决可编程交换机的浮点数限制,作者将可编程与交换机解耦,设计了一个新的支持浮点数运算的加速器硬件。这样看好像智能网卡一类的硬件的应用要更general一点,不一定要交换机。 为了更好地共享网络带宽,提出了一些在共享ML集群中用可编程交换机需要解决的问题(负载均衡和拥塞控制)的方案。 考虑fat tree的拓扑结构,文章提出的框架假设使用什么节点是其他算法给定的,而且不假设是否所有任务都使用in-network aggregation,流量超过某个大小的流才用ING。 文章提出的load balencing方法就是对每个任务规定多棵聚集树,每棵树负责相同大小的gadient块(round robin分配),树内用标号大小来区分顺序。 对于拥塞控制,文章是在TCP的ECN拥塞控制上面做了一些算法上的适配,我的理解是它的算法在逐层聚合梯度的时候,顺便将ECN位聚合起来,如果ECN不为0,则整颗聚集树一起降低传输速率,同时会有一个由加速器硬件指定的可用buffer空间,防止溢出,总体来说都是简单的heuristic。 作者用FPGA实现了它设计的加速器硬件,因为对FPGA不是很了解,这部分看不懂的名词特别多(只看懂个lookup-table和flip-flop),感觉之后做FPGA的可能性不大,有需要再看。PS: 想看有没有开源代码看看整个FPGA项目是怎么实现的,发现只有两个空的代码仓库。https://github.com/nadeengebara

November 12, 2022 · Yihong Li

Ekko: A Large-Scale Deep Learning Recommender System with Low-Latency Model Update (OSDI 2022)

用了很多分布式系统的思想。 在线更新推荐推理系统,优化了P2P的通信,而且考虑了模型更新的优先级和差模型对SLO的影响。 差模型的影响通过一个 inference model state manager 监控,它有一个几分钟前的baseline模型,它会接收部分的用户流量作为ground truth去评估现在的模型。 对模型的parameter分片传输,达到最终一致性,作者认为一个模型的不同参数版本不一致,对推理结果影响不大,只要最终一致就可以;用version vectors去做replica之间的P2P同步,物理时间+id作为一个parameter的version number。 用一个Dominator Version Vector去维护cache,保证大于Dominator Version Vector的版本都在cache里面,删除过期cache会更新Dominator Version Vector的计数 (merge)。 如果Version Vector比Dominator Version Vector大说明所有东西都在缓存中。 Shard Version是和replica绑定的,Shard Version少很多,是为了减少Version Vector做的二级数据结构;Shard Version大,Version Vector肯定大。 update priorities考虑的因素是freshness,gradient magnitude和request rates的多项式。 为每一个模型准备一个baseline模型,是几分钟前的模型参数,用于监控模型更新有没有变坏。 witness servers用于记录update,但是不会即时flush也没有update priorities,当infernece server需要rollback的时候才flush。

October 28, 2022 · Yihong Li

A Generic Communication Scheduler for Distributed DNN Training Acceleration (SOSP 19)

分布式机器学习的梯度同步的通讯调度,先做前向传播的参数考虑先同步。 一个有限制的preemption。 用Bayesian Optimization去搜超参。

October 22, 2022 · Yihong Li

PipeSwitch: Fast Pipelined Context Switching for Deep Learning Applications

OSDI: Elastic Resource Sharing for Distributed Deep Learning虽然说是elastic,但是在分析的时候是只考虑任务本身资源伸缩的开销,而没有考虑context switch的开销。 active-standby用内存空间换switching overhead。 不同的context之间: Multi-Instance GPU(并行)只有NVIDIA H100, A100, and A30支持; time slicing(并发)则从Pascal架构开始支持,而且提出MPS之后,将多个进程的CUDA Context,合并到一个CUDA Context 中,流处理器就可以被不同的kernel函数共享,可以做到和CPU的多进程并发的效果,但他们需要所有的数据都preload到显存中。 多个stream里面的kernel的并行是一直支持的。 switching overhead由四个部分组成: old task cleaning, new task initialization, GPU memory allocation, and model transmission via PCIe from CPU to GPU. observation: DNN models have a layered structure and a layer-by-layer computation pattern 文章的idea就是模型一层层地传,然后边传变算。 the core idea is pipelining model transmission over the PCIe and model computation in the GPU...

October 19, 2022 · Yihong Li

Ray

前言 Ray主要是想做一个集成分布式训练、推理和环境模拟于一身,但是彼此又不耦合的分布式框架。经过合理的模块化,每个环节还可以接入不同的系统,例如为训练接入Horovod/torch.distributed,为推理服务在Kubernetes上运行Ray等,让用户可以在一个分布式应用上组合多个库。 Ray更关注的是horizontal scalability和low overhead,当然scalability一般还会要求reliability。Ray的存储是个分布式的内存共享机制,通信基于gRPC。 一个简单的Ray使用示例: Application concepts Task - A remote function invocation. This is a single function invocation that executes on a process different from the caller, and potentially on a different machine. A task can be stateless (a @ray.remote function) or stateful (a method of a @ray.remote class - see Actor below). A task is executed asynchronously with the caller: the .remote() call immediately returns one or more ObjectRefs (futures) that can be used to retrieve the return value(s)....

June 3, 2022 · Yihong Li