我正在尝试使用ZeroMQ进行多处理。我想从tar文件中流式传输文件,所以我使用了流光。以下是想要做的事情的实例。
进口时间导入zmq来自zmq.devices ….
msgpack 和 msgpack_numpy 是我能找到的最佳选择。 试试这个:
msgpack
msgpack_numpy
import time import zmq from zmq.devices.basedevice import ProcessDevice from multiprocessing import Process import numpy as np import msgpack import msgpack_numpy as m def server(frontend_port, number_of_workers): context = zmq.Context() socket = context.socket(zmq.PUSH) socket.connect("tcp://127.0.0.1:%d" % frontend_port) for i in range(0,10): arr = np.array([[[i,i],[i,i]],[[i,i],[i,i]]]) file_name = 'image file name or any other srting' number = 10 # just an instance of an integer msg = msgpack.packb((arr, number, file_name), default=m.encode, use_bin_type=True) socket.send(msg, copy=False) time.sleep(1) for i in range(number_of_workers): msg = msgpack.packb((b'STOP', b'STOP'), default=m.encode, use_bin_type=True) socket.send(msg, copy=False) return True def worker(work_num, backend_port): context = zmq.Context() socket = context.socket(zmq.PULL) socket.connect("tcp://127.0.0.1:%d" % backend_port) while True: task = socket.recv() task = msgpack.unpackb(task, object_hook= m.decode, use_list=False, max_bin_len=50000000, raw=False) if task[1] == b'STOP': break (arr, number, file_name) = task print("Worker ",work_num, 'got message!', file_name) return True def main(): m.patch() frontend_port = 3559 backend_port = 3560 number_of_workers = 2 streamerdevice = ProcessDevice(zmq.STREAMER, zmq.PULL, zmq.PUSH) streamerdevice.bind_in("tcp://127.0.0.1:%d" % frontend_port ) streamerdevice.bind_out("tcp://127.0.0.1:%d" % backend_port) streamerdevice.setsockopt_in(zmq.IDENTITY, b'PULL') streamerdevice.setsockopt_out(zmq.IDENTITY, b'PUSH') streamerdevice.start() processes = [] for work_num in range(number_of_workers): w = Process(target=worker, args=(work_num,backend_port)) processes.append(w) w.start() time.sleep(1) s = Process(target=server, args=(frontend_port,number_of_workers)) s.start() s.join() for w in processes: w.join() if __name__ == '__main__': main()