以下是在流媒体中生成kafka的示例,但批处理版本几乎相同
从源码流到kafka:
val ds = df .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") .start()
编写静态数据帧(不是从源流式传输)到kafka
df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .write .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") .save()
请记住这一点
看看基本文档: https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html
听起来你有一个静态数据帧,它不是来自源的流。