项目作者: soltesz-lab

项目描述 :
Distributed work queue operations using mpi4py.
高级语言: Python
项目地址: git://github.com/soltesz-lab/distwq.git
创建时间: 2020-01-28T19:57:14Z
项目社区:https://github.com/soltesz-lab/distwq

开源协议:BSD 3-Clause "New" or "Revised" License

下载


distwq

Distributed work queue operations using mpi4py.

Allows for easy parallelization in controller/worker mode with one
controller submitting function or method calls to workers. Supports
multiple ranks per worker (collective workers). Uses mpi4py if
available, otherwise processes calls sequentially in one process.

Based on mpi.py from the pyunicorn project.

EXAMPLE

  1. # Example of using distributed work queue distwq
  2. # PYTHONPATH must include the directories in which distwq and this file are located.
  3. import distwq
  4. import numpy as np
  5. import scipy
  6. from scipy import signal
  7. def do_work(freq):
  8. fs = 10e3
  9. N = 1e5
  10. amp = 2*np.sqrt(2)
  11. freq = float(freq)
  12. noise_power = 0.001 * fs / 2
  13. time = np.arange(N) / fs
  14. x = amp*np.sin(2*np.pi*freq*time)
  15. x += np.random.normal(scale=np.sqrt(noise_power), size=time.shape)
  16. f, pdens = signal.periodogram(x, fs)
  17. return f, pdens
  18. def main(controller):
  19. n = 150
  20. for i in range(0, n):
  21. controller.submit_call("do_work", (i+1,), module_name="example_distwq")
  22. s = []
  23. for i in range(0, n):
  24. s.append(controller.get_next_result())
  25. print("results length : %d" % len(s))
  26. print(s)
  27. controller.info()
  28. if __name__ == '__main__':
  29. if distwq.is_controller:
  30. distwq.run(fun_name="main", verbose=True, nprocs_per_worker=3)
  31. else:
  32. distwq.run(fun_name=None, verbose=True, nprocs_per_worker=3)

API

MPIController

submit_call

  1. submit_call(name_to_call, args=(), kwargs={}, module_name="__main__", time_est=1, task_id=None)

Submit a call for parallel execution.

If called by the controller and workers are available, the call is submitted
to a worker for asynchronous execution.

If called by a worker or if no workers are available, the call is instead
executed synchronously on this MPI node.

Examples:

  1. Provide ids and time estimate explicitly:
  1. for n in range(0,10):
  2. distwq.submit_call("doit", (n,A[n]), id=n, time_est=n**2)
  3. for n in range(0,10):
  4. result[n] = distwq.get_result(n)
  1. Use generated ids stored in a list:
  1. for n in range(0,10):
  2. ids.append(distwq.submit_call("doit", (n,A[n])))
  3. for n in range(0,10):
  4. results.append(distwq.get_result(ids.pop()))
  1. Ignore ids altogether:
  1. for n in range(0,10):
  2. distwq.submit_call("doit", (n,A[n]))
  3. for n in range(0,10):
  4. results.append(distwq.get_next_result())
  1. Call a module function and use keyword arguments:
  1. distwq.submit_call("solve", (), {"a":a, "b":b},
  2. module="numpy.linalg")
  • name_to_call (str): name of callable object (usually a function or
    static method of a class) as contained in the namespace specified
    by module.
  • args (tuple): the positional arguments to provide to the callable
    object. Tuples of length 1 must be written (arg,). Default: ()
  • kwargs (dict): the keyword arguments to provide to the callable
    object. Default: {}
  • module (str): optional name of the imported module or submodule in
    whose namespace the callable object is contained. For objects
    defined on the script level, this is “main“, for objects
    defined in an imported package, this is the package name. Must be a
    key of the dictionary sys.modules (check there after import if in
    doubt). Default: “main
  • time_est (float): estimated relative completion time for this call;
    used to find a suitable worker. Default: 1
  • id (int or None): unique id for this call. Must be a possible dictionary key.
    If None, a random id is assigned and returned. Can be re-used after
    get_result() for this is. Default: None
  • worker: int > 0 and < comm.size, or None: optional no. of worker to assign the call to. If None, the
    call is assigned to the worker with the smallest current total time
    estimate. Default: None

Returns:

id of call, to be used in get_result().

get_result

  1. get_result(task_id)

Returns result of earlier submitted call.

Can only be called by the controller.

If the call is not yet finished, waits for it to finish.
Results should be collected in the same order as calls were submitted.
For each worker, the results of calls assigned to that worker must be
collected in the same order as those calls were submitted.
Can only be called once per call.

  • id (int) : id of an earlier submitted call, as provided to or returned by submit_call().

Returns:

return value of call.

get_next_result

  1. get_next_result()

Returns result of next earlier submitted call whose result has not yet
been obtained.

Can only be called by the controller.

If the call is not yet finished, waits for it to finish.

Returns:

id, return value of call, or None of there are no more calls in the queue.

info

  1. info()

Print processing statistics.

Can only be called by the controller.

exit

  1. exit()

Tells all workers to exit.

Can only be called by the controller.

abort

  1. abort()

Abort execution on all MPI nodes immediately.

Can be called by controller and workers.

MPIWorker

serve

  1. serve()

Serves submitted calls until told to finish.

Call this function if workers need to perform initialization
different from the controller, like this:

  1. def workerfun(worker):
  2. do = whatever + initialization - is * necessary
  3. worker.serve()
  4. do = whatever + cleanup - is * necessary

If workerfun() is not defined, serve() will be called automatically by run().

MPICollectiveBroker

serve

  1. serve()

Broker and serve submitted calls until told to finish. A task
is received from the controller and sent to all collective
workers associated with this broker via scatter.

Call this function if workers need to perform initialization
different from the controller, like this:

  1. def workerfun(worker):
  2. do = whatever + initialization - is * necessary
  3. worker.serve()
  4. do = whatever + cleanup - is * necessary

If workerfun() is not defined, serve() will be called automatically by
run().

Procedures

run

  1. run(fun_name=None, module_name='__main__', verbose=False, worker_grouping_method=None, nprocs_per_worker=1, broker_is_worker=False, args=())

Runs in controller/worker mode until fun(controller/worker) finishes.

Must be called on all MPI nodes.

On the controller, run() calls fun_name() and returns when fun_name() returns.

On each worker, run() calls fun() if that is defined, or calls serve()
otherwise, and returns when fun() returns, or when fun() returns on
the controller, or when controller calls exit().

  • module_name (string): module where fun_name is located
  • verbose (bool): whether processing information should be printed.
  • worker_grouping_method (str): whether to separate worker processes into groups via MPI_Comm_Spawn (“spawn”) or MPI_Comm_Split (“split”)
  • nprocs_per_worker (int): how many processes per worker
  • broker_is_worker (bool): when worker_grouping_method is “spawn” or “split” and nprocs_per_worker > 1, MPI_Comm_Spawn or MPI_Comm_split will be used to create workers, and a CollectiveBroker object is used to relay tasks and results between controller and worker. When broker_is_worker is true, the broker also participates in serving tasks, otherwise it only relays calls.
  • args (tuple): additional args to pass to fun