虽然很明显不跨多个线程使用单个PIKA连接,但我们是否可以使用跨多个线程的连接通道。
我是Pika的维护者之一,不,你不能跨线程使用连接或通道。这是记录在案的。
<子> 注意: RabbitMQ团队监控 rabbitmq-users 邮件列表 并且有时只回答StackOverflow上的问题。 子>
rabbitmq-users
我这样做就像下面这样
Example using PIKA consumer without blocking thread - PIKA and GRPC Streaming ########### def grpc_test(self, request, context): # A response streaming GRPC implementation - Client gets stream of messages message_queue = Queue.Queue() app = request def rmq_callback(data): print("Got a call back from RMQ Client") message_queue.put(data) # Register with RabbitMQ for Data # thread safe - create a connection here and a channel pikaconsumer = TestConsumer() # Client want to listen on this queue pikaconsumer.listen_on_queue("xxxx", rmq_callback) # use the connection and channel in a new thread (and no other thread) t= threading.Thread(target=pikaconsumer.start_consuming) t.start() while True: date = message_queue.get(True) protobuf_obj = proto.Data() message.ParseFromString(obj) yield message ########### class TestConsumer(object): def __init__(self): amqp_url ='amqp://guest:guest@127.0.0.1:5672/' parameters = pika.URLParameters(amqp_url) connection = pika.BlockingConnection(parameters) self._channel = connection.channel() def listen_on_queue(self,queue_name,exchange,routing_keys,_callback): # In case queue is not there - create a queue self._channel.queue_declare(queue=queue_name,auto_delete=True,) for routing_key in routing_keys: self._channel.queue_bind(queue_name, exchange, str(routing_key)) LOGGER.info('Binding Exchange[%s] to Queue[%s] with RoutingKey[%s]', exchange, queue_name, str(routing_key)) def __on_message(channel, method_frame, header_frame, body, callback=()): print(method_frame.delivery_tag) callback(body) channel.basic_ack(delivery_tag=method_frame.delivery_tag) self._consumer_tag = self._channel.basic_consume(partial(__on_message, callback=_callback), queue_name) def start_consuming(self): self._channel.start_consuming()