In this tutorial, you are going to see how to write a Horovod-powered distributed TensorFlow computation. More specifically, the final goal is to train different models in parallel by assigning each of them to a different GPU. The discussion is organised in two sections. The first section illustrates Horovod's basic concepts and its usage coupled with TensorFlow, the second one uses the MNIST classification task as test case.

On this page:

Example: Horovod and Tensorflow Basics

Horovod is a distributed training framework for TensorFlow, Keras, PyTorch, and Apache MXNet. Its computational model is inspired by the MPI specification (and uses MPI for its implementation too). When Horovod is used in a Python program, which in turn is launched using srun, it enables various spawned processes to be aware of each other and to communicate among themselves. For this communication to happen, the init function must be called at the beginning of the program (similar to the MPI_Init call). Let's see how it works by building a Python program example. Its purpose will be to create n processes, distributed across m machines, and to assign to each of them a GPU to work with.

Listing 1. Initialisation.
import tensorflow as tf
import horovod.tensorflow as hvd
import logging
# Show our log messages

# ...but disable tesorflow's ones except for errors

# initialize horovod - this call must always be done at the beginning of our scripts.

In the first three lines, the tensorflow, horovod.tensorflow modules, that provides functionalities to enhance Tensorflow capabilities, and logging are imported. The last one is used by us to log information during execution, but mostly to avoid Tensorflow printing a very verbose log (only error messages are allowed to show up). Finally, the init function is invoked. So far, so good.

One of the main concepts of MPI is process rank. It turns out that also Horovod exposes functionalities for a process to know its rank within the set of processes created by the srun command. More specifically,

  • the rank of a process is its index within a given ordering of all processes participating in the distributed computation
  • the local rank of a process is its index within a given ordering of all processes participating in the distributed computation that are running on the same node as the process in question

Moreover, each process can invoke functions to know the size and local size of the computation. As you can expect, they are the total number of processes and the number or processes in the caller's node respectively.

As an example, consider a distributed computation involving 4 processes (size = 4) distributed across 2 nodes, each node running 2 of the 4 processes (both local_size values are equal to 2). The rank of a process can take values in [0, 1, 2, 3], while the local rank can take the value of either 0 or 1. These values can be used to control which processes execute which instructions.

With this information in mind, let's continue with the Python program.

Listing 2. Assigning a different GPU to each process.
# retrieve and print the process's global and local ranks
rank = hvd.rank()
local_rank = hvd.local_rank()
size = hvd.size()
local_size = hvd.local_size()"This is process with rank {rank} and local rank {local_rank}")

# each process retrieves the list of gpus available on its node
gpus = tf.config.experimental.list_physical_devices('GPU')
if local_rank == 0:"This is process with rank {rank} and local rank {local_rank}: gpus available are: {gpus}")
# each process selects a gpu (if any gpu is available)
if local_rank >= len(gpus):
    raise Exception("Not enough gpus.")
tf.config.experimental.set_visible_devices(gpus[local_rank], 'GPU')
# From now on each process has its own gpu to use...

The first two lines are convenient function calls to retrieve the rank and local rank of the process, which are then logged for demonstration purposes. Next, each process retrieves the list of GPUs that are available on the node it is running on. Of course, processes on the same node will retrieve the same list, whereas any two processes running on different nodes will have different, non overlapping, sets of GPUs. In the latter case, resource contention is structurally impossible; it is in the former case that the local rank concept comes handy. Each process uses its local rank as index to select a GPU in the gpus list and will not share it with any other processes because:

  • other processes on other nodes will select a GPU from a different list
  • other processes on the same node will select a different GPU because the local rank is unique within the same node

The caveat is that there must be enough GPUs on each node. Another approach is to allow two or more processes to share the same GPU, taking care of assigning processes to GPUs as evenly as possible (e.g. with a round robin policy).

The last function call sets the GPU Tensorflow will use for each process. Try the script using You should see an output similar to the following one.

Listing 3. Example job output.
INFO:root:This is process with rank 1 and local rank 1
INFO:root:This is process with rank 0 and local rank 0
INFO:root:This is process with rank 2 and local rank 0
INFO:root:This is process with rank 3 and local rank 1
INFO:root:This is process with rank 2 and local rank 0: gpus available are: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')]
INFO:root:This is process with rank 0 and local rank 0: gpus available are: [PhysicalDevice(name='/physical_device:GPU:0', device_type='GPU'), PhysicalDevice(name='/physical_device:GPU:1', device_type='GPU')]

Now we are ready to work with multiple models in parallel!

Example: The MNIST classification task

The MNIST dataset is an acronym that stands for the Modified National Institute of Standards and Technology dataset. It comprises 60,000 small square 28x28 pixel grayscale images of handwritten single digits between 0 and 9. The task is to classify a given image of a handwritten digit into one of 10 classes representing integer values from 0 to 9, inclusively.

Starting from the Python program started above, let us add the additional code to complete the task. In particular, suppose we want to train 4 models with the same architecture but different training data, each of them on a different GPU. Here is the code to do that.

Listing 4. MNIST classification example
# From now on each process has its own gpu to use.
# We will now train the same model on each gpu indipendently, and make each of them
# output a prediction for a different input.
mnist = tf.keras.datasets.mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0
model = tf.keras.models.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(128, activation='relu'),
loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
model.compile(optimizer='adam', loss=loss_fn, metrics=['accuracy'])
# We will partition the training set evenly among processes so that the same model
# is trained by each process on different data.
dataset_size = len(x_train)
from math import ceil
# samples per model - number of samples to train each model with
spm = ceil(dataset_size / size)[rank*spm:(rank+1)*spm], y_train[rank*spm:(rank+1)*spm], epochs=15)
print(model.evaluate(x_test,  y_test, verbose=2))

First, the dataset is loaded from tensorFlow module (being a standard dataset for test cases, Tensorflow provides a convenient function to retrieve it) and then split in two parts, one for training and the other for testing. What follows is the definition of the model and the loss function. Until now, every process executes the same code. They diverge when the function is called. Indeed, the training dataset is implicitly partitioned using the size of the computation and rank of a process. Each process gets a different portion of samples because the rank is unique among all processes. Therefore, each trained model is different from one another. To prove this, each model is evaluated using the same test set through the model.evaluate call. If you run the Python program adding this last part you should see that the accuracy reported from every task is slightly different. You can use the rank and size values in if statements to train completely different models and, in general, make each process follow a different execution path.

Related pages

  • No labels