In the previous tutorial of this series, it has been shown how to use Horovod to train different models in parallel, each on a different GPU. It is convenient because one can achieve the desired result by writing and running one script - Horovod will do the rest. However, it is nothing that couldn't be done before with a little more effort and SSH recipes. The real reason why Horovod has become very popular is that it allows to train a single model on a very large dataset leveraging multiple GPUs located on different nodes. Until very recently, available memory on accelerators was the limiting factor on the number of samples that could be used during training, in contrast with the always increasing availability of data. Technologies had been developed to allow multiple GPUs on a single node to pool their resources together and to be seen as a single device. Scaling to GPUs scattered on several nodes was the next big challenge that Horovod started to address. TensorFlow has native capability to run on a distributed environment, but it is still an experimental feature that arguably is not user friendly. On the other hand, Horovod makes it simple.


What follows is a simple demonstration of distributed training. The objective is training a convolutional neural network to recognise handwritten digits.

In the first part of the program, each process gets assigned a different GPU to manage.

Listing 1.
import tensorflow as tf
import horovod.tensorflow.keras as hvd
# Horovod: initialize Horovod.
# Horovod: pin GPU to be used to process local rank (one GPU per process)
gpus = tf.config.experimental.list_physical_devices('GPU')
for gpu in gpus:
    tf.config.experimental.set_memory_growth(gpu, True)
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

The dataset is then retrieved and properly pre-processed (images' pixel values are scaled in the range [0-1] and labels are made numeric) for it to be used in the training process. Also, samples are repeated, shuffled and partitioned in batches. After that, the code defines the model using Keras layers. The following steps are not different from what you would have performed in a common single-node training script.

Listing 2.
(mnist_images, mnist_labels), _ = tf.keras.datasets.mnist.load_data(path='mnist-{}.npz'.format(hvd.rank()))
dataset =
    (tf.cast(mnist_images[..., tf.newaxis] / 255.0, tf.float32),
             tf.cast(mnist_labels, tf.int64))
dataset = dataset.repeat().shuffle(10000).batch(128)
mnist_model = tf.keras.Sequential([
    tf.keras.layers.Conv2D(32, [3, 3], activation='relu'),
    tf.keras.layers.Conv2D(64, [3, 3], activation='relu'),
    tf.keras.layers.MaxPooling2D(pool_size=(2, 2)),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax')

Horovod kicks in when defining the optimiser to be used to find model parameters. In particular, any TensorFlow optimiser you select to use must be wrapped using the DistributedOptimizer class provided by Horovod, so that the Data Parallelism strategy is adopted. Data Parallelism is achieved by splitting the training set among GPUs so that it can be elaborated in parallel. In particular, each process computes the local gradients using the optimiser provided through the constructor and its portion of the dataset; subsequently, each process broadcasts these values and combines them with all others (taking the average). One important thing to notice is that the weights' initialisation process must be consistent across all nodes. This is achieved by broadcasting global variables from one node (typically the one with rank 0) to all others. Furthermore, the learning rate must be scaled proportionally to the number of GPUs since the training is faster; this is due to the fact that the number of steps per epoch is inverse proportional to the number of GPUs.

Listing 3.
# Horovod: adjust learning rate based on number of GPUs.
opt = tf.optimizers.Adam(0.001 * hvd.size())
# Horovod: add Horovod DistributedOptimizer.
opt = hvd.DistributedOptimizer(opt)
# Horovod: Specify `experimental_run_tf_function=False` to ensure TensorFlow
# uses hvd.DistributedOptimizer() to compute gradients.
callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
# Horovod: write logs on worker 0.
verbose = 1 if hvd.rank() == 0 else 0
# Train the model.
# Horovod: adjust number of steps based on number of GPUs., steps_per_epoch=500 // hvd.size(), callbacks=callbacks, epochs=24, verbose=verbose)

This is all what is required to train a model on a large dataset. As you can see, Horovod enables distributed training starting from a serial script with minimal changes.

Related pages

  • No labels