有几种方法可以做到这一点,但这取决于您的系统要求,您的团队技能和基础架构。
您可以使用Apache Cassandra存储事件,并在元组中传递行的键,以便下一个螺栓可以检索它。
如果您的数据本质上属于时间序列,那么您可能希望了解一下 OpenTSDB 要么 InfluxDB 。
你当然可以回到像软件交易记忆这样的东西,但我认为这需要大量的精心设计。
这个问题很适合在微批次的内存计算中演示Apache Spark。但是,您的用例在Storm中实现是微不足道的。
1)确保螺栓使用字段分组。它将一致地将传入的元组哈希到同一个螺栓,这样我们就不会丢失任何元组。
2)在bolt的本地缓存中维护一个Map。此映射将保留“变量”的最后已知值。
class CumulativeDiffBolt extends InstrumentedBolt{ Map<String, Integer> lastKnownVariableValue; @Override public void prepare(){ this.lastKnownVariableValue = new HashMap<>(); .... @Override public void instrumentedNextTuple(Tuple tuple, Collector collector){ .... extract variable from tuple .... extract current value from tuple Integer lastValue = lastKnownVariableValue.getOrDefault(variable, 0) Integer newValue = currValue - lastValue lastKnownVariableValue.put(variable, newValue) emit(new Fields(variable, newValue)); ... }
我担心今天没有这样的内置功能。 但是您可以使用任何类型的分布式缓存,例如memcached或Redis。那些缓存解决方案非常易于使用。
简而言之,您希望在运行元组的风暴中进行微批量计算。 首先,您需要在元组集中定义/查找键。 使用该键在螺栓之间进行字段分组(不要使用随机分组)。这将保证相关元组将始终发送到相同键的下游螺栓的相同任务。 定义类级别集合List / Map以保持旧值并在计算中添加新值,不用担心它们在同一个bolt的不同执行器实例之间是线程安全的。
您可以使用CacheBuilder记住扩展BaseRichBolt中的数据(将其放在prepare方法中):
// init your cache. this.cache = CacheBuilder.newBuilder() .maximumSize(maximumCacheSize) .expireAfterWrite(expireAfterWrite, TimeUnit.SECONDS) .build();
然后在执行中,您可以使用缓存来查看您是否已经看过该密钥条目。从那里你可以添加你的业务逻辑:
// if we haven't seen it before, we can emit it. if(this.cache.getIfPresent(key) == null) { cache.put(key, nearlyEmptyList); this.collector.emit(input, input.getValues()); } this.collector.ack(input);