N-proxy-N Pub-Sub
类似于ZeroMQ中的N到N异步模式的问题,但遗憾的是从未收到过工作代码的答案。
我正在尝试实现Pub-Sub网络,如…
一如既往,答案很简单。通过将它分成3个脚本,我们不必使用线程和异步编程,因此这应该可以帮助更多人。
打开6个终端并在终端中运行以下命令:
python proxy_topic.py
python proxy_pub.py
python proxy_pub.py jp
python proxy_sub.py
python proxy_sub.py en
python proxy_sub.py jp
import sys import zmq from zmq import Context class ProxyPub: def __init__(self, address='127.0.0.1', port1='5566', port2='5567'): # get ZeroMQ version print("Current libzmq version is %s" % zmq.zmq_version()) print("Current pyzmq version is %s" % zmq.pyzmq_version()) self.context = Context.instance() # 2 sockets, because we can only bind once to a socket (as opposed to connect) self.url1 = "tcp://{}:{}".format(address, port1) self.url2 = "tcp://{}:{}".format(address, port2) self.xpub_xsub_proxy() # N publishers to 1 sub; proxy 1 sub to 1 pub; publish to M subscribers def xpub_xsub_proxy(self): print("Init proxy") # Socket subscribing to publishers frontend_pubs = self.context.socket(zmq.XSUB) frontend_pubs.bind(self.url1) # Socket publishing to subscribers backend_subs = self.context.socket(zmq.XPUB) backend_subs.bind(self.url2) print("Try: Proxy... CONNECT!") zmq.proxy(frontend_pubs, backend_subs) print("CONNECT successful!") if __name__ == '__main__': print("Arguments given: {}".format(sys.argv)) ProxyPub()
import sys import zmq from zmq import Context import time class ProxyPub: def __init__(self, lang='en', address='127.0.0.1', port='5566'): # get ZeroMQ version print("Current libzmq version is %s" % zmq.zmq_version()) print("Current pyzmq version is %s" % zmq.pyzmq_version()) self.context = Context.instance() self.url = "tcp://{}:{}".format(address, port) self.pub_hello_world(lang) def pub_hello_world(self, lang): print("Init pub {}".format(lang)) # connect, because many publishers - 1 subscriber pub = self.context.socket(zmq.PUB) pub.connect(self.url) if lang == 'en': message = "Hello World" sleep = 1 else: message = "Hello Sekai" # Japanese sleep = 2 # wait proxy and subs to b ready time.sleep(.5) # keep publishing "Hello World" / "Hello Sekai" messages print("Pub {}: Going to pub messages!".format(lang)) while True: # publish message to topic 'world' # multipart: topic, message; async always needs `send_multipart()`? pub.send_multipart([lang.encode('ascii'), message.encode('ascii')]) print("Pub {}: Have send msg".format(lang)) # slow down message publication time.sleep(sleep) if __name__ == '__main__': print("Arguments given: {}".format(sys.argv)) if len(sys.argv) == 1: ProxyPub() elif len(sys.argv) == 2: ProxyPub(lang=sys.argv[1]) else: print("Too many arguments")
import sys import zmq from zmq import Context import time class ProxyPub: def __init__(self, lang='', address='127.0.0.1', port='5567'): # get ZeroMQ version print("Current libzmq version is %s" % zmq.zmq_version()) print("Current pyzmq version is %s" % zmq.pyzmq_version()) self.context = Context.instance() self.url = "tcp://{}:{}".format(address, port) self.sub_hello_world(lang) def sub_hello_world(self, lang): print("Init sub {}".format(lang)) # connect, because many subscribers - 1 (proxy) pub sub = self.context.socket(zmq.SUB) sub.connect(self.url) # subscribe to topic 'en' or 'jp' sub.setsockopt(zmq.SUBSCRIBE, lang.encode('ascii')) # wait proxy to be ready; necessary? time.sleep(.2) # keep listening to all published message, filtered on topic print("Sub {}: Going to wait for messages!".format(lang)) while True: msg_received = sub.recv_multipart() print("sub {}: {}".format(lang, msg_received)) if __name__ == '__main__': print("Arguments given: {}".format(sys.argv)) if len(sys.argv) == 1: ProxyPub() elif len(sys.argv) == 2: ProxyPub(lang=sys.argv[1]) else: print("Too many arguments")
绝对不应该评论代理功能。问题是因为zmq.proxy函数永远阻塞,你用“run_until_complete”事件循环运行它。您应该将事件循环执行类型更改为run_forever。