Horovod

在18年其他框架的分布式训练不成熟的时候,率先用NCCL和MPI实现了ring allreduce。

Horovod Timeline

Untitled

Tensor Fusion

Ring-allreduce utilizes the network in an optimal way if the tensors are large enough, but does not

work as efficiently or quickly if they are very small.

One of the unique things about Horovod is its ability to interleave communication and computation coupled with the ability to batch small allreduce operations, which results in improved performance. We call this batching feature Tensor Fusion.

Tensor Fusion works by attempting to combine all the tensors that are ready to be reduced at given moment of time into one reduction operation. The algorithm of Tensor Fusion is as follows:

  1. Determine which tensors are ready to be reduced. Select first few tensors that fit in HOROVOD_FUSION_THRESHOLD bytes and have the same data type.
  2. Allocate fusion buffer of size HOROVOD_FUSION_THRESHOLD if it was not allocated before. Default fusion buffer size is 128 MB.
  3. Copy data of selected tensors into the fusion buffer.
  4. Execute the allreduce operation on the fusion buffer.
  5. Copy data from the fusion buffer into the output tensors.
  6. Repeat until there are no more tensors to reduce in this cycle.

Elastic Horovod

Place all variables that need to be kept in sync between worker replicas (model parameters, optimizer state, epoch and batch numbers, etc.) into a hvd.elastic.State object.

Periodically call state.commit() to backup a copy of your state in memory.

Register callbacks with the hvd.elastic.State object to respond to changes in the worker membership in the job.

import torch
import horovod.torch as hvd

hvd.init()

torch.cuda.set_device(hvd.local_rank())

dataset = ...
model = ...

optimizer = optim.SGD(model.parameters(), lr * hvd.size())
optimizer = hvd.DistributedOptimizer(optimizer)

@hvd.elastic.run
def train(state):
    batch_offset = state.batch
    for state.epoch in range(state.epoch, epochs):
        for state.batch in range(state.batch, batches_per_epoch):
            data, target = get_random_batch()

            optimizer.zero_grad()
            output = model(data)
            loss = F.nll_loss(output, target)
            loss.backward()
            optimizer.step()

            if state.batch % batches_per_commit == 0:
                state.commit()
        state.batch = 0

def on_state_reset():
    # adjust learning rate on reset
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr * hvd.size()

state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
state.register_reset_callbacks([on_state_reset])
train(state)

state.check_host_updates() 和 state.commit() 都可用于检查同步,前者更轻量