我正在使用flink来消费kafka并写入redis。
这是我对redis的接收函数:
.addSink(new RichSinkFunction< MobilePageEvent>(){
@覆盖 …
numRecordsIn和numRecordsOut指标仅计算在Flink作业本身内流动的流记录,并且不包括与外部系统的通信。换句话说,消息来源不会报告任何记录,并且接收器不会报告任何记录。
在我看来,你有几个选择:
将显示添加计数器度量标准的模式 这里 。
对于redis接收器,您可以在open()方法中初始化Counter,并在invoke()中递增它。但这似乎毫无意义,因为这只会反映numRecordsIn指标。如果您的redis接收器正在执行缓冲批量写入,那么等待递增指标直到数据实际发送到redis可能更有意义 - 在这种情况下,您可能更倾向于使用Meter而不是Counter。