我是Apache Storm的新手。
我正在尝试使用Apache Kafka,Storm和ESPER CEP引擎开发实时流处理系统。
为此,我有一个将发射流的KafkaSpout ……
喷口不能这样做,但你可以使用Storm的窗口支持 https://storm.apache.org/releases/2.0.0-SNAPSHOT/Windowing.html ,或者只是编写聚合螺栓并将其放在喷口和拓扑的其余部分之间。
所以你的拓扑应该是 spout -> aggregator -> feature selection -> trend detection 。
spout -> aggregator -> feature selection -> trend detection
我建议你尝试内置的窗口支持,但如果你想编写自己的聚合,你的螺栓实际上只需要接收一些元组(例如3),并发出一个包含所有值的新元组。
聚合器螺栓应该做类似的事情
private List<Tuple> buffered; execute(Tuple input) { if (buffered.size != 2) { buffered.add(input) return } Tuple first = buffered.get(0) Tuple second = buffered.get(1) Values aggregate = new Values(first.getValues(), second.getValues(), input.getValues()) List<Tuple> anchors = List.of(first, second, input) collector.emit(anchors, aggregate) collector.ack(first, second, input) buffered.clear() }
这样你最终会得到一个包含3个输入元组内容的元组。