借用了答案 我在Github开的这个问题 ,我已将我的答案更新如下:
消息保存在操作系统的网络缓冲区中。我已经找到 由于这个原因,HWM并没有那么有用。这是修改后的代码 订阅者错过消息的地方: import time import pickle import zmq from threading import Thread import os ctx = zmq.Context() def pub_thread(): pub = ctx.socket(zmq.PUB) pub.setsockopt(zmq.SNDHWM, 2) pub.setsockopt(zmq.SNDBUF, 2*1024) # See: http://api.zeromq.org/4-2:zmq-setsockopt pub.bind('tcp://*:5555') i = 0 while True: time.sleep(0.001) pub.send_string(str(i), zmq.SNDMORE) pub.send(os.urandom(1024)) i += 1 def sub_thread(): sub = ctx.socket(zmq.SUB) sub.setsockopt(zmq.SUBSCRIBE, b'') sub.setsockopt(zmq.RCVHWM, 2) sub.setsockopt(zmq.RCVBUF, 2*1024) sub.connect('tcp://localhost:5555') while True: time.sleep(0.1) msg, _ = sub.recv_multipart() print("Received:", msg.decode()) t_pub = Thread(target=pub_thread) t_pub.start() sub_thread() 输出看起来像这样: Received: 0 Received: 1 Received: 2 Received: 3 Received: 4 Received: 5 Received: 6 Received: 47 Received: 48 Received: 64 Received: 65 Received: 84 Received: 85 Received: 159 Received: 160 Received: 270 错过消息,因为所有队列/缓冲区都已满并且发布者 开始删除消息(请参阅ZMQ_PUB的文档: http://api.zeromq.org/4-2:zmq-socket )。
消息保存在操作系统的网络缓冲区中。我已经找到 由于这个原因,HWM并没有那么有用。这是修改后的代码 订阅者错过消息的地方:
import time import pickle import zmq from threading import Thread import os ctx = zmq.Context() def pub_thread(): pub = ctx.socket(zmq.PUB) pub.setsockopt(zmq.SNDHWM, 2) pub.setsockopt(zmq.SNDBUF, 2*1024) # See: http://api.zeromq.org/4-2:zmq-setsockopt pub.bind('tcp://*:5555') i = 0 while True: time.sleep(0.001) pub.send_string(str(i), zmq.SNDMORE) pub.send(os.urandom(1024)) i += 1 def sub_thread(): sub = ctx.socket(zmq.SUB) sub.setsockopt(zmq.SUBSCRIBE, b'') sub.setsockopt(zmq.RCVHWM, 2) sub.setsockopt(zmq.RCVBUF, 2*1024) sub.connect('tcp://localhost:5555') while True: time.sleep(0.1) msg, _ = sub.recv_multipart() print("Received:", msg.decode()) t_pub = Thread(target=pub_thread) t_pub.start() sub_thread()
输出看起来像这样:
Received: 0 Received: 1 Received: 2 Received: 3 Received: 4 Received: 5 Received: 6 Received: 47 Received: 48 Received: 64 Received: 65 Received: 84 Received: 85 Received: 159 Received: 160 Received: 270
错过消息,因为所有队列/缓冲区都已满并且发布者 开始删除消息(请参阅ZMQ_PUB的文档: http://api.zeromq.org/4-2:zmq-socket )。
[ 的 注意 强> ]:
sock.setsockopt(zmq.CONFLATE, 1)