我正在尝试为使用Spring Boot 2.x开发的Kafka侦听器编写单元测试。作为单元测试,我不想启动Zookeeper实例的完整Kafka服务器。因此,我决定使用Spring Embedded Kafka。
我的侦听器的定义非常基本。
@Component public class Listener { private final CountDownLatch latch; @Autowired public Listener(CountDownLatch latch) { this.latch = latch; } @KafkaListener(topics = "sample-topic") public void listen(String message) { latch.countDown(); } }
latch接收到消息后验证计数器等于零的测试也很容易。
@RunWith(SpringRunner.class) @SpringBootTest @DirtiesContext @EmbeddedKafka(topics = { "sample-topic" }) @TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" }) public class ListenerTest { @Autowired private KafkaEmbedded embeddedKafka; @Autowired private CountDownLatch latch; private KafkaTemplate<Integer, String> producer; @Before public void setUp() { this.producer = buildKafkaTemplate(); this.producer.setDefaultTopic("sample-topic"); } private KafkaTemplate<Integer, String> buildKafkaTemplate() { Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps); return new KafkaTemplate<>(pf); } @Test public void listenerShouldConsumeMessages() throws InterruptedException { // Given producer.sendDefault(1, "Hello world"); // Then assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue(); } }
不幸的是,测试失败了,我不明白为什么。是否可以使用的实例KafkaEmbedded来测试带有注释的方法@KafkaListener?
所有代码都在我的GitHub存储库kafka-listener中共享。