我认为你没有使用ssc.start()启动流
为您的代码添加一个动作可能有所帮助
stream.print()
试一试,祝你好运
我认为日志说明你需要的一切:)
IllegalArgumentException:要求失败:未注册任何输出操作,因此无需执行任何操作
什么是输出操作?例如:
您必须向应用程序添加一些操作,例如save stream.mapToPair 变量然后在这个变量或print()上调用foreachRDD来显示值
stream.mapToPair
使用Kafka创建直接流后,您将创建 JavaPairDStream 。现在你可以迭代了 JavaPairDStream 并打印您的Kafka消息的密钥和值。
JavaPairDStream
JavaPairDStream<String, String> jPairDStream = stream.mapToPair( new PairFunction<ConsumerRecord<String, String>, String, String>() { @Override public Tuple2<String, String> call(ConsumerRecord<String, String> record) throws Exception { return new Tuple2<>(record.key(), record.value()); } }); jPairDStream.foreachRDD(jPairRDD -> { jPairRDD.foreach(rdd -> { System.out.println("key="+rdd._1()+" value="+rdd._2()); }); }); jssc.start(); jssc.awaitTermination();
您正在使用的命令 kafka-console-producer.bat 将产生一个消息,其中键将为null。为了生成主题上具有键和值的消息 test 使用以下命令。在控制台中输入逗号分隔的键和值 key1,value1
kafka-console-producer.bat
test
key1,value1
kafka-console-producer.bat --property parse.key=true --property key.separator=, --broker-list localhost:9092 --topic test
您的pom文件具有不同的spark构件版本。确保对所有工件使用相同的版本。您需要以下依赖项来运行您的程序。
<dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.11</artifactId> <version>2.1.0</version> </dependency> </dependencies>