In this tutorial, you are going to see how to write a Horovod-powered distributed TensorFlow computation. More specifically, the final goal is to train different models in parallel by assigning each of them to a different GPU. The discussion is organised in two sections. The first section illustrates Horovod's basic concepts and its usage coupled with TensorFlow, the second one uses the MNIST classification task as test case.
Example: Horovod and Tensorflow Basics
Horovod is a distributed training framework for TensorFlow, Keras, PyTorch, and Apache MXNet. Its computational model is inspired by the MPI specification (and uses MPI for its implementation too). When Horovod is used in a Python program, which in turn is launched using
srun, it enables various spawned processes to be aware of each other and to communicate among themselves. For this communication to happen, the
init function must be called at the beginning of the program (similar to the
MPI_Init call). Let's see how it works by building a Python program example. Its purpose will be to create
processes, distributed across
m machines, and to assign to each of them a GPU to work with.
In the first three lines, the
horovod.tensorflow modules, that provides functionalities to enhance Tensorflow capabilities, and
logging are imported. The last one is used by us to log information during execution, but mostly to avoid Tensorflow printing a very verbose log (only error messages are allowed to show up). Finally, the
init function is invoked. So far, so good.
One of the main concepts of MPI is process rank. It turns out that also Horovod exposes functionalities for a process to know its rank within the set of processes created by the
srun command. More specifically,
- the rank of a process is its index within a given ordering of all processes participating in the distributed computation
- the local rank of a process is its index within a given ordering of all processes participating in the distributed computation that are running on the same node as the process in question
Moreover, each process can invoke functions to know the size and local size of the computation. As you can expect, they are the total number of processes and the number or processes in the caller's node respectively.
As an example, consider a distributed computation involving 4 processes (size = 4) distributed across 2 nodes, each node running 2 of the 4 processes (both local_size values are equal to 2). The rank of a process can take values in
[0, 1, 2, 3], while the local rank can take the value of either
1. These values can be used to control which processes execute which instructions.
With this information in mind, let's continue with the Python program.
The first two lines are convenient function calls to retrieve the rank and local rank of the process, which are then logged for demonstration purposes. Next, each process retrieves the list of GPUs that are available on the node it is running on. Of course, processes on the same node will retrieve the same list, whereas any two processes running on different nodes will have different, non overlapping, sets of GPUs. In the latter case, resource contention is structurally impossible; it is in the former case that the local rank concept comes handy. Each process uses its local rank as index to select a GPU in the
gpus list and will not share it with any other processes because:
- other processes on other nodes will select a GPU from a different list
- other processes on the same node will select a different GPU because the local rank is unique within the same node
The caveat is that there must be enough GPUs on each node. Another approach is to allow two or more processes to share the same GPU, taking care of assigning processes to GPUs as evenly as possible (e.g. with a round robin policy).
The last function call sets the GPU Tensorflow will use for each process. Try the script using
distributed_tf.sh. You should see an output similar to the following one.
Now we are ready to work with multiple models in parallel!
Example: The MNIST classification task
The MNIST dataset is an acronym that stands for the Modified National Institute of Standards and Technology dataset. It comprises 60,000 small square 28x28 pixel grayscale images of handwritten single digits between 0 and 9. The task is to classify a given image of a handwritten digit into one of 10 classes representing integer values from 0 to 9, inclusively.
Starting from the Python program started above, let us add the additional code to complete the task. In particular, suppose we want to train 4 models with the same architecture but different training data, each of them on a different GPU. Here is the code to do that.
First, the dataset is loaded from
tensorFlow module (being a standard dataset for test cases, Tensorflow provides a convenient function to retrieve it) and then split in two parts, one for training and the other for testing. What follows is the definition of the model and the loss function. Until now, every process executes the same code. They diverge when the
model.fit function is called. Indeed, the training dataset is implicitly partitioned using the size of the computation and rank of a process. Each process gets a different portion of samples because the rank is unique among all processes. Therefore, each trained model is different from one another. To prove this, each model is evaluated using the same test set through the
model.evaluate call. If you run the Python program adding this last part you should see that the accuracy reported from every task is slightly different. You can use the rank and size values in
if statements to train completely different models and, in general, make each process follow a different execution path.