Distributed work queue operations using mpi4py.
Distributed work queue operations using mpi4py.
Allows for easy parallelization in controller/worker mode with one
controller submitting function or method calls to workers. Supports
multiple ranks per worker (collective workers). Uses mpi4py if
available, otherwise processes calls sequentially in one process.
Based on mpi.py from the pyunicorn project.
# Example of using distributed work queue distwq
# PYTHONPATH must include the directories in which distwq and this file are located.
import distwq
import numpy as np
import scipy
from scipy import signal
def do_work(freq):
fs = 10e3
N = 1e5
amp = 2*np.sqrt(2)
freq = float(freq)
noise_power = 0.001 * fs / 2
time = np.arange(N) / fs
x = amp*np.sin(2*np.pi*freq*time)
x += np.random.normal(scale=np.sqrt(noise_power), size=time.shape)
f, pdens = signal.periodogram(x, fs)
return f, pdens
def main(controller):
n = 150
for i in range(0, n):
controller.submit_call("do_work", (i+1,), module_name="example_distwq")
s = []
for i in range(0, n):
s.append(controller.get_next_result())
print("results length : %d" % len(s))
print(s)
controller.info()
if __name__ == '__main__':
if distwq.is_controller:
distwq.run(fun_name="main", verbose=True, nprocs_per_worker=3)
else:
distwq.run(fun_name=None, verbose=True, nprocs_per_worker=3)
submit_call(name_to_call, args=(), kwargs={}, module_name="__main__", time_est=1, task_id=None)
Submit a call for parallel execution.
If called by the controller and workers are available, the call is submitted
to a worker for asynchronous execution.
If called by a worker or if no workers are available, the call is instead
executed synchronously on this MPI node.
Examples:
for n in range(0,10):
distwq.submit_call("doit", (n,A[n]), id=n, time_est=n**2)
for n in range(0,10):
result[n] = distwq.get_result(n)
for n in range(0,10):
ids.append(distwq.submit_call("doit", (n,A[n])))
for n in range(0,10):
results.append(distwq.get_result(ids.pop()))
for n in range(0,10):
distwq.submit_call("doit", (n,A[n]))
for n in range(0,10):
results.append(distwq.get_next_result())
distwq.submit_call("solve", (), {"a":a, "b":b},
module="numpy.linalg")
Returns:
id of call, to be used in get_result().
get_result(task_id)
Returns result of earlier submitted call.
Can only be called by the controller.
If the call is not yet finished, waits for it to finish.
Results should be collected in the same order as calls were submitted.
For each worker, the results of calls assigned to that worker must be
collected in the same order as those calls were submitted.
Can only be called once per call.
Returns:
return value of call.
get_next_result()
Returns result of next earlier submitted call whose result has not yet
been obtained.
Can only be called by the controller.
If the call is not yet finished, waits for it to finish.
Returns:
id, return value of call, or None of there are no more calls in the queue.
info()
Print processing statistics.
Can only be called by the controller.
exit()
Tells all workers to exit.
Can only be called by the controller.
abort()
Abort execution on all MPI nodes immediately.
Can be called by controller and workers.
serve()
Serves submitted calls until told to finish.
Call this function if workers need to perform initialization
different from the controller, like this:
def workerfun(worker):
do = whatever + initialization - is * necessary
worker.serve()
do = whatever + cleanup - is * necessary
If workerfun() is not defined, serve() will be called automatically by run().
serve()
Broker and serve submitted calls until told to finish. A task
is received from the controller and sent to all collective
workers associated with this broker via scatter.
Call this function if workers need to perform initialization
different from the controller, like this:
def workerfun(worker):
do = whatever + initialization - is * necessary
worker.serve()
do = whatever + cleanup - is * necessary
If workerfun() is not defined, serve() will be called automatically by
run().
run(fun_name=None, module_name='__main__', verbose=False, worker_grouping_method=None, nprocs_per_worker=1, broker_is_worker=False, args=())
Runs in controller/worker mode until fun(controller/worker) finishes.
Must be called on all MPI nodes.
On the controller, run() calls fun_name() and returns when fun_name() returns.
On each worker, run() calls fun() if that is defined, or calls serve()
otherwise, and returns when fun() returns, or when fun() returns on
the controller, or when controller calls exit().