您可以创建自己的接收器并在每个addBatch()调用上执行操作:
class CustomSink extends Sink { override def addBatch(batchId: Long, data: DataFrame): Unit = { data.groupBy().agg(sum("age") as "sumAge").foreach(v => println(s"RESULT=$v")) } } class CustomSinkProvider extends StreamSinkProvider with DataSourceRegister { def createSink( sqlContext: SQLContext, parameters: Map[String, String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = { new PersonSink() } def shortName(): String = "person" }
将outputMode设置为Update并每隔X秒触发一次
val query = ds.writeStream .trigger(Trigger.ProcessingTime("1 seconds")) .outputMode(OutputMode.Update()) .format("exactlyonce.CustomSinkProvider")
“按处理时间触发”是否符合您的要求? “按处理时间触发”触发每个间隔(由代码定义)。
示例触发器代码如下。
https://github.com/apache/spark/blob/v2.2.0/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala#L34