我们有一个工作的rabbitmq。实现,由于数量,我们计划切换到kafka。
我有一点怀疑。
在RabbitMQ中,当消费者从Q消息中消息时…
我也遇到了同样的问题。如果我想以简单的方式放置,RabbitMQ会保留每个
卡夫卡没有,所以你不能把它做好,你必须自己实施。
虽然有可用的选项,使用kmq,性能将低于50%,看看
https://softwaremill.com/kafka-with-selective-acknowledgments-performance/
除非您轮询新邮件,否则不会增加您的邮件偏移量。所以你必须担心重新处理你的消息。
如果要将数据处理结果存储到Kafka群集,可以使用 卡夫卡的交易特征 。这样您就可以支持一次交付。您的所有更改都将被保存,或者不会存储任何更改。
另一种方法是使您的处理方案具有幂等性。您将为Kafka中的每封邮件分配唯一的ID。处理消息时,将ID存储在数据库中。崩溃后,通过查看数据库检查您的消息ID是否已处理。
您应该阅读一下卡夫卡的消息消费方式。这是官方Kafka文档的消费者部分的链接: https://kafka.apache.org/documentation/#theconsumer
基本上,在Kafka中,只有在经过足够的时间后才会删除消息,并且使用 log.retention.hours , log.retention.minutes 和 log.retention.ms 就像@Amin说的那样。
log.retention.hours
log.retention.minutes
log.retention.ms
在Kafka中,任何数量的消费者都可以随时开始使用来自任何主题的消息,无论其他消费者是否已经从同一主题消费。 Kafka使用存储在Kafka本身的偏移来跟踪每个主题/分区上每个消费者的位置。因此,如果您的消费者需要消费消息100,就像您在问题中描述的那样,您可以简单地“回放”到所需的消息,并再次开始正常消费。如果您以前使用它,或者其他消费者是否正在阅读该主题,则无关紧要。
来自官方Kafka文档:
消费者可以故意回退到旧的偏移量 重新消费数据。这违反了队列的通用合同,但是 对许多消费者来说,这是一个必不可少的功能。例如, 如果消费者代码有错误并在某些消息后被发现 如果被消费,消费者可以在错误消失后重新使用这些消息 是固定的。
Kafka不会删除主题中的消息,除非它到达其中一个 log.retention.bytes log.retention.hours log.retention.minutes log.retention.ms CONFIGS。因此,如果偏移量增加,则不会丢失先前的消息,您只需将偏移量更改为所需的位置即可。
log.retention.bytes