Parallelization

This module provides a method of doing parallelization using MPI. It uses mpi4py, but provides convenience functions that make parallelization much easier.

If you have the following file (double.py):

def compute(x):
    return 2 * x

if __name__ == '__main__':
    values = range(20)

    for x in map(compute, values):
        print(x)

You can parallelize the computation by replacing it with the following (double_mpi.py):

import deepdish as dd

def compute(x):
    return 2 * x

if dd.parallel.main(__name__):
    values = range(20)

    for x in dd.parallel.imap(compute, values):
        print(x)

And run it with:

$ mpirun -n 8 python double_mpi.py

The way it works is that deepdish.parallel.main() will be a gate-keeper that only lets through rank 0. The rest of the nodes will stand idly by. Then, when deepdish.parallel.imap() is called, the computation is sent to the worker nodes, computed, and finally sent back to rank 0. Note that the rank 0 node is not given a task since it needs to be ready to execute the body of the for loop (the imap function will yield values as soon as ready). If the body of the for loop is light on computation, you might want to tell mpirun that you want one more job than your cores.

The file double_mpi.py can also be run without mpirun as well.

Note that if your high performance cluster has good MPI support, this will allow you to parallelize not only across cores, but across machines.

Multiple arguments

If you have multiple arguments, you can use deepdish.parallel.starmap() to automatically unpack them. Note that starmap also returns a generator that will yield results as soon as they are done, so to gather all into a list we have to run it through list before giving it to concatenate:

import deepdish as dd
import numpy as np

def compute(batch, x):
    print('Processing batch', batch, 'on node', dd.parallel.rank())
    return 2 * x

if dd.parallel.main(__name__):
    x = np.arange(100)
    batches = np.array_split(x, 10)
    args = ((batch, x) for batch, x in enumerate(batches))

    # Execute and combine results
    y = np.concatenate(list(dd.parallel.starmap(compute, args)))
    print(y)

We are also showing that you can print the rank using deepdish.parallel.rank().

Unordered

Sometimes we don’t care what order the jobs get processed in, in which case we can use deepdish.parallel.imap_unordered() or deepdish.parallel.starmap_unordered(). This is great if we are doing a commutative reduction on the results or if we are processing or testing independent samples. The results will be yielded as soon as any batch is done, which means more responsive output and a smaller memory footprint than its ordered counterpart. However, this means that you won’t necessarily know which batch completed, so you might have to put the batch number in as one of the arguments and return it. In this example, we do something similar which is to run the indices through the compute function:

import deepdish as dd
import numpy as np

def compute(indices, x):
    return indices, 2 * x

if dd.parallel.main(__name__):
    x = np.arange(100) * 10
    index_batches = np.array_split(np.arange(len(x)), 10)
    args = ((indices, x[indices]) for indices in index_batches)

    y = np.zeros_like(x)
    for indices, batch_y in dd.parallel.starmap_unordered(compute, args):
        print('Finished indices', indices)
        y[indices] = batch_y

    print(y)

For more information, see the deepdish.parallel API documentation.