我正在运行一个python脚本来从新闻提供者收集数据并在flume.conf文件中获取此脚本。
我的flume.conf文件:
newsAgent.sources = r1newsAgent.sinks = sparknewsAgent.channels = …
我一定看过和你一样的教程。我尝试了很多不同的选择。大多数都没有成功。但是我找到了一个解决方法:在你的flume.conf中使用一个exec源代码,然后像你一样调用脚本。但是在python脚本中,将数据写入文件。然后在脚本(data_collector.py)停止执行之前“cat”该文件。
我认为这是因为exec源需要“流”数据,而只是打印输出将无法正常工作。
我的设置与您的设置非常相似:
stream.py(为了便于理解,删除了逻辑):
from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.flume import FlumeUtils if __name__ == "__main__": sc = SparkContext(appName="test"); ssc = StreamingContext(sc, 30) stream = FlumeUtils.createStream(ssc, "127.0.0.1", 55555) stream.pprint()
这是我的data_collector.py(注意带有“cat”命令的最后一行):
#! /usr/bin/python import requests import random class RandResp(): def __init__(self): self.url = "https://swapi.co/api/people/" self.rand = str(random.randint(0, 17)) self.r = requests.get(self.url + self.rand) def get_r(self): return(self.r.text) if __name__ == "__main__": import os with open("exec.txt", "w") as file_in: file_in.write(RandResp().get_r()) os.system("cat exec.txt")
这是我的flume.conf:
# list sources, sinks and channels in the agent agent.sources = tail-file agent.channels = c1 agent.sinks=avro-sink # define the flow agent.sources.tail-file.channels = c1 agent.sinks.avro-sink.channel = c1 agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 # define source and sink agent.sources.tail-file.type = exec agent.sources.tail-file.command = python /home/james/Desktop/testing/data_collector.py agent.sources.tail-file.channels = c1 agent.sinks.avro-sink.type = avro agent.sinks.avro-sink.hostname = 127.0.0.1 agent.sinks.avro-sink.port = 55555
所以基本上在我的data_collector.py中,我只需要做任何需要完成的逻辑,将它写入一个名为exec.txt的文件,然后立即“cat”该文件。它有效...祝你好运