在你的替代方法中,你可能会得到 CommitFailedException 由于:
CommitFailedException
props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
风暴2.0.0-SNAPSHOT(以及1.0.6) - KafkaConsumer自动提交不受支持
来自文档:
请注意,KafkaConsumer自动提交不受支持。该 如果是,KafkaSpoutConfig构造函数将抛出异常 设置“enable.auto.commit”属性,并使用该消费者 spout将始终将该属性设置为false。你可以配置 通过setProcessingGuarantee自动提交的类似行为 KafkaSpoutConfig构建器上的方法。
参考文献:
Storm-kafka使用普通的kafka客户端在zookeeper中将消费者信息写入不同的位置和不同的格式。所以你无法在kafkamanager ui中看到它。
你可以找到一些其他的监控工具,比如 https://github.com/keenlabs/capillary 。