MQTT插件发布到 amq.topic 使用mqtt主题名作为路由键。
amq.topic
在消费者方面,它使用路由密钥将自动删除队列绑定到该交换机;在以下示例中,队列已命名 mqtt-subscription-mqttConsumerqos1 。
mqtt-subscription-mqttConsumerqos1
要通过AMQP接收MQTT消息,您需要将自己的队列绑定到交换机。这是一个例子:
@SpringBootApplication public class So54995261Application { public static void main(String[] args) { SpringApplication.run(So54995261Application.class, args); } @Bean @ServiceActivator(inputChannel = "toMQTT") public MqttPahoMessageHandler sendIt(MqttPahoClientFactory clientFactory) { MqttPahoMessageHandler handler = new MqttPahoMessageHandler("clientId", clientFactory); handler.setAsync(true); handler.setDefaultTopic("so54995261"); return handler; } @Bean public MqttPahoClientFactory mqttClientFactory() { DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory(); MqttConnectOptions options = new MqttConnectOptions(); options.setServerURIs(new String[] { "tcp://localhost:1883" }); options.setUserName("guest"); options.setPassword("guest".toCharArray()); factory.setConnectionOptions(options); return factory; } @Bean public MessageProducerSupport mqttInbound() { MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("mqttConsumer", mqttClientFactory(), "so54995261"); adapter.setCompletionTimeout(5000); return adapter; } @Bean public IntegrationFlow flow() { return IntegrationFlows.from(mqttInbound()) .handle(System.out::println) .get(); } @RabbitListener(queues = "so54995261") public void listen(byte[] in) { System.out.println(new String(in)); } @Bean public Queue queue() { return new Queue("so54995261"); } @Bean public Binding binding() { return new Binding("so54995261", DestinationType.QUEUE, "amq.topic", "so54995261", null); } @Bean public ApplicationRunner runner(MessageChannel toMQTT) { return args -> toMQTT.send(new GenericMessage<>("foo")); } }