我想在多个python进程中共享BlockingChannel。为了发送来自其他python进程的basic_ack。
如何跨多个python进程共享BlockingChannel。
…
它是一个在多个线程之间共享通道的反模式,并且您不太可能设法在进程之间共享它。
经验法则是1 connection 每个过程和1 channel 每个帖子。
connection
channel
您可以通过以下链接阅读有关此事的更多信息:
如果要将消息使用与多处理配对,通常的模式是让主进程接收消息,将其有效负载传递到工作进程池并在完成后对其进行确认。
简单的例子使用 pika.BlockingChannel 和 concurrent.futures.ProcessPoolExecutor :
pika.BlockingChannel
concurrent.futures.ProcessPoolExecutor
def ack_message(channel, delivery_tag, _future): """Called once the message has been processed. Acknowledge the message to RabbitMQ. """ channel.basic_ack(delivery_tag=delivery_tag) for message in channel.consume(queue='example'): method, properties, body = message future = pool.submit(process_message, body) # use partial to pass channel and ack_tag to callback function ack_message_callback = functools.partial(ack_message, channel, method.delivery_tag) future.add_done_callback(ack_message_callback)
上面的循环将无休止地消耗来自的消息 example 排队并将它们提交到进程池。您可以通过RabbitMQ控制要同时处理的消息数量 消费者预取 参数。校验 pika.basic_qos 看看如何在Python中完成它。
example
pika.basic_qos