感谢@jkff的解决方案,这是一个实现示例:
ConsumerFactoryFn实现示例:
private static class ConsumerFactoryFn
implements SerializableFunction
并且不要忘记在你的KafkaIO.read()调用中使用.withConsumerFactoryFn,应该是这样的:
Map configMap = new HashMap();
configMap.put(“security.protocol”, (Object) “SSL”);
configMap.put(“ssl.truststore.password”, (Object) “clientpass”);
p.apply(“ReadFromKafka”, KafkaIO.read()
.withBootstrapServers(“ip:9093”)
.withTopic(“pageviews”)
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(configMap)
.withConsumerFactoryFn(new ConsumerFactoryFn()) … etc.
</code>