以下类包含在几个消费者应用程序中:
@零件@组态公共类HealthListener {
public static final String HEALTH_CHECK_QUEUE_NAME =“healthCheckQueue”…
该 (convertS|s)endAndReceive.*() 方法不是为处理多个回复而设计的;它们是严格的一个请求/一个回复方法。
(convertS|s)endAndReceive.*()
你需要使用一个 (convertAndS|s)end() 发送请求的方法,并实现自己的回复机制,可能使用回复的侦听器容器,以及一些组件来聚合回复。
(convertAndS|s)end()
您可以使用类似Spring Integration Aggregator的东西,但是您需要一些机制( ReleaseStrategy )知道什么时候收到所有预期的答复。
ReleaseStrategy
或者您可以简单地接收离散回复并单独处理它们。
的 编辑 强>
@SpringBootApplication public class So54207780Application { public static void main(String[] args) { SpringApplication.run(So54207780Application.class, args); } @Bean public ApplicationRunner runner(RabbitTemplate template) { return args -> template.convertAndSend("fanout", "", "foo", m -> { m.getMessageProperties().setReplyTo("replies"); return m; }); } @RabbitListener(queues = "queue1") public String listen1(String in) { return in.toUpperCase(); } @RabbitListener(queues = "queue2") public String listen2(String in) { return in + in; } @RabbitListener(queues = "replies") public void replyHandler(String reply) { System.out.println(reply); } @Bean public FanoutExchange fanout() { return new FanoutExchange("fanout"); } @Bean public Queue queue1() { return new Queue("queue1"); } @Bean public Binding binding1() { return BindingBuilder.bind(queue1()).to(fanout()); } @Bean public Queue queue2() { return new Queue("queue2"); } @Bean public Binding binding2() { return BindingBuilder.bind(queue2()).to(fanout()); } @Bean public Queue replies() { return new Queue("replies"); } }
和
FOO foofoo