Parallelizing data processing

Here’s a scenario that is commonly encountered in practice: a big model is trained on a large dataset that doesn’t fit in memory (e.g. a deep convolutional neural network trained on ImageNet) using a GPU to accelerate training.

In that case, doing data processing and training in a single process is very inefficient: the GPU is idle when data is read off disk and processed, and nothing else is done while the GPU is at work.

An obvious solution is to do the preprocessing and training in parallel: if I/O operations are executed while the GPU is busy, then less time is wasted waiting for data to be available.

In this section, we’ll cover how to spawn a data processing server in a separate process and how to connect to that server and consume that data in a training script.

Toy example

Let’s first create a dummy dataset:

>>> from fuel.datasets import IndexableDataset
>>> dataset = IndexableDataset({'features': [[0] * 128] * 1000})

In practice, the dataset can be whatever you want, but we’ll settle with that for simplicity.

Since this is a pretty small dataset, we’ll need to simulate slowdowns associated with I/O operations and preprocessing. We’ll create a transformer whose sole purpose is to wait some period of time before returning the requested data:

>>> import time
>>> from fuel.transformers import Transformer
>>> class Bottleneck(Transformer):
...     def __init__(self, *args, **kwargs):
...         self.slowdown = kwargs.pop('slowdown', 0)
...         super(Bottleneck, self).__init__(*args, **kwargs)
...
...     def get_data(self, request=None):
...         if request is not None:
...             raise ValueError
...         time.sleep(self.slowdown)
...         return next(self.child_epoch_iterator)

We’ll also create a context manager to time a block of code and print the result:

>>> from contextlib import contextmanager
>>> @contextmanager
... def timer(name):
...     start_time = time.time()
...     yield
...     stop_time = time.time()
...     print('{} took {} seconds'.format(name, stop_time - start_time))

Let’s see how much of a slowdown we’re incurring in our current setup:

>>> from fuel.schemes import ShuffledScheme
>>> from fuel.streams import DataStream
>>> iteration_scheme = ShuffledScheme(examples=1000, batch_size=100)
>>> data_stream = Bottleneck(
...     data_stream=DataStream.default_stream(
...         dataset=dataset, iteration_scheme=iteration_scheme),
...     slowdown=0.005)
>>> with timer('Iteration'): # doctest: +ELLIPSIS
...     for data in data_stream.get_epoch_iterator(): pass
Iteration took ... seconds

Next, we’ll write a small piece of code that simulates some computation being done on our data. Let’s pretend that we’re training for 5 epochs and that training on a batch takes a fixed amount of time.

>>> with timer('Training'): # doctest: +ELLIPSIS
...     for i in range(5):
...         for data in data_stream.get_epoch_iterator(): time.sleep(0.01)
Training took ... seconds

Take note of the time it takes: we’ll cut that down with the data processing server.

Data processing server

Fuel features a start_server() built-in function which takes a data stream as input and sets up a data processing server that iterates over this data stream. The function signature is the following:

def start_server(data_stream, port=5557, hwm=10):

The data_stream argument is self-explanatory. The port the server listens to defaults to 5557 but can be changed through the port argument. The hwm argument controls the high-water mark, which loosely translates to the buffer size. The default value of 10 usually works well. Increasing this increases the buffer, which can be useful if your data preprocessing times are very random. However, it will increase memory usage. Be sure to set the corresponding high-water mark on the client as well.

A client can then connect to that server and request data. The ServerDataStream class is what we want to use. Its __init__ method has the following signature:

def __init__(self, sources, host='localhost', port=5557, hwm=10):

The sources argument is how you communicate source names to the data stream. It’s expected to be a tuple of strings with as many elements as there are sources that will be received. The host and port arguments are used to specify where to connect to the data processing server. Note that this allows you to run the server on a completely different machine! The hwm argument should mirror what you passed to start_server().

Putting it together

You’ll need to separate your code in two files: one that spawns a data processing server and one that handles the training loop.

Here’s those two files:

"""server.py"""
import time

from fuel.datasets import IndexableDataset
from fuel.schemes import ShuffledScheme
from fuel.server import start_server
from fuel.streams import DataStream
from fuel.transformers import Transformer


class Bottleneck(Transformer):
    """Waits every time data is requested to simulate a bottleneck.

    Parameters
    ----------
    slowdown : float, optional
        Time (in seconds) to wait before returning data. Defaults to 0.

    """
    def __init__(self, *args, **kwargs):
        self.slowdown = kwargs.pop('slowdown', 0)
        super(Bottleneck, self).__init__(*args, **kwargs)

    def get_data(self, request=None):
        if request is not None:
            raise ValueError
        time.sleep(self.slowdown)
        return next(self.child_epoch_iterator)


def create_data_stream(slowdown=0):
    """Creates a bottlenecked data stream of dummy data.

    Parameters
    ----------
    slowdown : float
        Time (in seconds) to wait each time data is requested.

    Returns
    -------
    data_stream : fuel.streams.AbstactDataStream
        Bottlenecked data stream.

    """
    dataset = IndexableDataset({'features': [[0] * 128] * 1000})
    iteration_scheme = ShuffledScheme(examples=1000, batch_size=100)
    data_stream = Bottleneck(
        data_stream=DataStream.default_stream(
            dataset=dataset, iteration_scheme=iteration_scheme),
        slowdown=slowdown)
    return data_stream


if __name__ == "__main__":
    start_server(create_data_stream(0.005))
"""train.py"""
import argparse
import time
from contextlib import contextmanager

from fuel.streams import ServerDataStream

from server import create_data_stream


@contextmanager
def timer(name):
    """Times a block of code and prints the result.

    Parameters
    ----------
    name : str
        What this block of code represents.

    """
    start_time = time.time()
    yield
    stop_time = time.time()
    print('{} took {} seconds'.format(name, stop_time - start_time))


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '-p', '--parallel', action='store_true',
        help='run data preprocessing in a separate process')
    args = parser.parse_args()

    if args.parallel:
        data_stream = ServerDataStream(('features',))
    else:
        data_stream = create_data_stream(0.005)

    with timer('Training'):
        for i in range(5):
            for data in data_stream.get_epoch_iterator(): time.sleep(0.01)

We’ve modularized the code to be a little more convenient to re-use. Save the two files in the same directory and type

$ python train.py

This will run the training and the data processing in the same process.

Now, type

$ python server.py

in a separate terminal window and type

$ python train.py -p

Compare the two running times: you should see a clear gain using the separate data processing server.