我发现后端消息处理中出现了常见的模式:
ServiceA生成大量消息。
ServiceB一次处理一条消息。
ServiceC发出对数据库的调用或……
Upfront / Disclaimer:我为特别软件公司工作,这是NServiceBus的制造商。我也写了 学习NServiceBus 。
在我为特工工作之前,我曾经发现自己处于你的确切状况。我有一种分析类型的情况,其中12个Web服务器通过MSMQ发送相同类型的命令以指示文章被查看。需要在数据库中跟踪这些计数,以便可以基于视图的数量生成“最流行的”列表。但是每个页面视图的插入都不能很好地执行,所以我介绍了服务总线。
插入器可以通过使用表值参数一次插入多达50-100的好处,但NServiceBus在事务中一次只给出一条消息。
在NServiceBus中,对多个消息进行操作的任何内容通常都需要使用Saga。 (Saga基本上是一堆相关的消息处理程序,它们在处理每条消息之间保持一些存储状态。)
但是Saga必须将其数据存储在某个地方,这通常意味着数据库。那么让我们比较一下:
因此,Saga使“持久性负载”更加糟糕。
当然,您可以选择使用Saga中的内存持久性。这将为您提供批处理而无需额外的持久性开销,但如果Saga端点崩溃,您可能会丢失部分批处理。因此,如果您不愿意丢失数据,那么这不是一种选择。
所以即使在几年前,我也想象过这样的事情:
// Not a real NServiceBus thing! Only exists in my imagination! public interface IHandleMessageBatches<TMessage> { void Handle(TMessage[] messages); int MaxBatchSize { get; } }
我们的想法是,如果消息传输可以提前查看并看到许多消息可用,它可以开始接收MaxBatchSize并且您可以立即获取所有消息。当然,如果队列中只有一条消息,那么你将得到一个包含1条消息的数组。
几年前我和NServiceBus代码库坐了下来,以为我会尝试实现它。好吧,我失败了。当时,尽管MSMQ是唯一的传输(在NServiceBus V3中),但API的架构使得传输代码在队列中查看并一次拉出一条消息,从而将消息处理逻辑的内存事件提升到如果没有大规模的改变,就不可能改变它。
更新版本中的代码更加模块化,这在很大程度上是因为现在支持多个消息传输。但是,仍然存在一次处理一条消息的假设。
进入V6的当前实现是在 IPushMessages 接口。在里面 Initialize 方法,核心推动了 Func<PushContext, Task> pipe 进入运输的实施 IPushMessages 。
IPushMessages
Initialize
Func<PushContext, Task> pipe
或者用英语,“嘿运输,当你有消息可用时,执行此操作将其移交给Core,我们将从那里接收它。”
简而言之,这是因为NServiceBus适用于一次可靠地处理一条消息。从更详细的角度来看,有很多理由说明批量接收会很困难:
SuperMessage
BaseMessage
Handle(BaseMessage[] batch)
总而言之,将NServiceBus更改为接受批次将需要针对批次优化整个管道。单个消息(当前规范)将是一个专门的批处理,其中数组大小为1。
从本质上讲,这对于它所提供的有限商业价值的变化来说风险太大了。
我发现每个消息单个插入并不像我想象的那么昂贵。有害的是,多个Web服务器上的多个线程尝试一次写入数据库,并在该RPC操作中停留,直到完成为止。
当这些操作被序列化为一个队列,并且有限的,设定数量的线程处理这些消息并以数据库可以处理的速率执行数据库插入时,事情往往在大多数情况下都非常顺利地运行。
另外,请仔细考虑您在数据库中执行的操作。现有行的更新比插入更便宜。在我的情况下,我真的只关心计数,并且不需要每个页面视图的记录。因此,根据内容ID和5分钟时间窗口更新记录更便宜,并更新该记录的读取计数,而不是每次读取插入记录并迫使自己进入大量的聚合查询。
如果这绝对不起作用,您需要考虑可以在可靠性方面做出哪些权衡。您可以使用具有内存持久性的Saga,但随后您可以(并且最有可能最终)丢失整个批次。根据您的使用情况,这很可能是可以接受的。
您还可以使用消息处理程序写入Redis,这将比数据库便宜,然后让Saga更像调度程序,将批量数据迁移到数据库。你可以用Kafka或其他一些技术做类似的事情。在这些情况下,您可以自行决定确保所需的可靠性,并设置可以实现这一目标的工具。