Horovod
在18年其他框架的分布式训练不成熟的时候,率先用NCCL和MPI实现了ring allreduce。
Horovod Timeline
Tensor Fusion
Ring-allreduce utilizes the network in an optimal way if the tensors are large enough, but does not
work as efficiently or quickly if they are very small.
One of the unique things about Horovod is its ability to interleave communication and computation coupled with the ability to batch small allreduce operations, which results in improved performance. We call this batching feature Tensor Fusion.
Tensor Fusion works by attempting to combine all the tensors that are ready to be reduced at given moment of time into one reduction operation. The algorithm of Tensor Fusion is as follows:
- Determine which tensors are ready to be reduced. Select first few tensors that fit in
HOROVOD_FUSION_THRESHOLD
bytes and have the same data type. - Allocate fusion buffer of size
HOROVOD_FUSION_THRESHOLD
if it was not allocated before. Default fusion buffer size is 128 MB. - Copy data of selected tensors into the fusion buffer.
- Execute the allreduce operation on the fusion buffer.
- Copy data from the fusion buffer into the output tensors.
- Repeat until there are no more tensors to reduce in this cycle.
Elastic Horovod
Place all variables that need to be kept in sync between worker replicas (model parameters, optimizer state, epoch and batch numbers, etc.) into a hvd.elastic.State
object.
Periodically call state.commit()
to backup a copy of your state in memory.
Register callbacks with the hvd.elastic.State
object to respond to changes in the worker membership in the job.
import torch
import horovod.torch as hvd
hvd.init()
torch.cuda.set_device(hvd.local_rank())
dataset = ...
model = ...
optimizer = optim.SGD(model.parameters(), lr * hvd.size())
optimizer = hvd.DistributedOptimizer(optimizer)
@hvd.elastic.run
def train(state):
batch_offset = state.batch
for state.epoch in range(state.epoch, epochs):
for state.batch in range(state.batch, batches_per_epoch):
data, target = get_random_batch()
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if state.batch % batches_per_commit == 0:
state.commit()
state.batch = 0
def on_state_reset():
# adjust learning rate on reset
for param_group in optimizer.param_groups:
param_group['lr'] = lr * hvd.size()
state = hvd.elastic.TorchState(model, optimizer, batch=0, epoch=0)
state.register_reset_callbacks([on_state_reset])
train(state)
state.check_host_updates() 和 state.commit()
都可用于检查同步,前者更轻量