This module provides a method of doing parallelization using MPI. It uses mpi4py, but provides convenience functions that make parallelization much easier.
If you have the following file (double.py):
def compute(x): return 2 * x if __name__ == '__main__': values = range(20) for x in map(compute, values): print(x)
You can parallelize the computation by replacing it with the following (double_mpi.py):
import deepdish as dd def compute(x): return 2 * x if dd.parallel.main(__name__): values = range(20) for x in dd.parallel.imap(compute, values): print(x)
And run it with:
$ mpirun -n 8 python double_mpi.py
The way it works is that
deepdish.parallel.main() will be a gate-keeper that
only lets through rank 0. The rest of the nodes will stand idly by. Then, when
deepdish.parallel.imap() is called, the computation is sent to the worker
nodes, computed, and finally sent back to rank 0. Note that the rank 0 node is
not given a task since it needs to be ready to execute the body of the for loop
(the imap function will yield values as soon as ready). If the body of the
for loop is light on computation, you might want to tell mpirun that you want
one more job than your cores.
The file double_mpi.py can also be run without mpirun as well.
Note that if your high performance cluster has good MPI support, this will allow you to parallelize not only across cores, but across machines.
If you have multiple arguments, you can use
to automatically unpack them. Note that starmap also returns a generator that
will yield results as soon as they are done, so to gather all into a list we
have to run it through list before giving it to concatenate:
import deepdish as dd import numpy as np def compute(batch, x): print('Processing batch', batch, 'on node', dd.parallel.rank()) return 2 * x if dd.parallel.main(__name__): x = np.arange(100) batches = np.array_split(x, 10) args = ((batch, x) for batch, x in enumerate(batches)) # Execute and combine results y = np.concatenate(list(dd.parallel.starmap(compute, args))) print(y)
We are also showing that you can print the rank using
Sometimes we don’t care what order the jobs get processed in, in which case we
deepdish.parallel.starmap_unordered(). This is great if we are doing a
commutative reduction on the results or if we are processing or testing
independent samples. The results will be yielded as soon as any batch is done,
which means more responsive output and a smaller memory footprint than its
ordered counterpart. However, this means that you won’t necessarily know which
batch completed, so you might have to put the batch number in as one of the
arguments and return it. In this example, we do something similar which is to
run the indices through the compute function:
import deepdish as dd import numpy as np def compute(indices, x): return indices, 2 * x if dd.parallel.main(__name__): x = np.arange(100) * 10 index_batches = np.array_split(np.arange(len(x)), 10) args = ((indices, x[indices]) for indices in index_batches) y = np.zeros_like(x) for indices, batch_y in dd.parallel.starmap_unordered(compute, args): print('Finished indices', indices) y[indices] = batch_y print(y)
For more information, see the
deepdish.parallel API documentation.