您需要在服务器地址之间使用逗号,而不是分号。
的 编辑 强>
我刚刚运行了一个没有问题的测试:
spring.kafka.bootstrap-servers=localhost:9092,localhost:9093,localhost:9094
和
@SpringBootApplication public class So50804678Application { public static void main(String[] args) { SpringApplication.run(So50804678Application.class, args); } @KafkaListener(id = "foo", topics = "so50804678") public void in(String in) { System.out.println(in); } @Bean public NewTopic topic() { return new NewTopic("so50804678", 1, (short) 3); } }
$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678 Topic:so50804678 PartitionCount:1 ReplicationFactor:3 Configs: Topic: so50804678 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
杀死了领导者,并且
$ kafka-topics --zookeeper localhost:2181 --describe --topic so50804678 Topic:so50804678 PartitionCount:1 ReplicationFactor:3 Configs: Topic: so50804678 Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 1,2
$ kafka-console-producer --broker-list localhost:9092,localhost:9093,localhost:9093 --topic so50804678
发送了一条消息,应用程序收到了该消息;除WARN外,日志中没有错误:
[Consumer clientId = consumer-1,groupId = foo]无法建立与节点0的连接。经纪人可能无法使用。
然后我重新启动了死服务器;停止了我的应用;然后添加了这段代码......
@Bean public ApplicationRunner runner(KafkaTemplate<String, String> template) { return args -> { while(true) { System.out.println(template.send("so50804678", "foo").get().getRecordMetadata()); Thread.sleep(3_000); } }; }
再一次,杀死现任领导人没有任何影响;一切恢复好了。
您可能需要调整服务器道具中的listeners / advertised.listeners属性。由于我的经纪人都在本地主机上,因此我将其保留为默认值。