我正在尝试创建一个新的水槽代理,如source spooldir,并将它们放入HDFS。这是我的配置文件:
agent.sources =文件agent.channels =频道agent.sinks = hdfsSink
根据我之前的评论,现在我正在分享我遵循和执行的所有步骤 spooling header enable json file ,把它 hadoop hdfs 集群使用 flume ,创建一个外部文件 json 文件,后来执行 DML query 超过它 -
spooling header enable json file
hadoop hdfs
flume
json
DML query
的 创建 flume-spool.conf 强>
flume-spool.conf
//Flume Configuration Starts erum.sources =source-1 erum.channels =file-channel-1 erum.sinks =hdfs-sink-1 erum.sources.source-1.channels =file-channel-1 erum.sinks.hdfs-sink-1.channel =file-channel-1 //Define a file channel called fileChannel on erum erum.channels.file-channel-1.type =file erum.channels.file-channel-1.capacity =2000000 erum.channels.file-channel-1.transactionCapacity =100000 //Define a source for erum erum.sources.source-1.type =spooldir erum.sources.source-1.bind =localhost erum.sources.source-1.port =44444 erum.sources.source-1.inputCharset =UTF-8 erum.sources.source-1.bufferMaxLineLength =100 //Spooldir in my case is /home/arif/practice/flume_sink erum.sources.source-1.spoolDir =/home/arif/practice/flume_sink/ erum.sources.source-1.fileHeader =true erum.sources.source-1.fileHeaderKey=file erum.sources.source-1.fileSuffix =.COMPLETED //Sink is flume_import under hdfs erum.sinks.hdfs-sink-1.pathManager =DEFAULT erum.sinks.hdfs-sink-1.type =hdfs erum.sinks.hdfs-sink-1.hdfs.filePrefix =common erum.sinks.hdfs-sink-1.hdfs.fileSuffix =.json erum.sinks.hdfs-sink-1.hdfs.writeFormat =Text erum.sinks.hdfs-sink-1.hdfs.fileType =DataStream erum.sinks.hdfs-sink-1.hdfs.path =hdfs://localhost:9000/user/arif/flume_sink/products/ erum.sinks.hdfs-sink-1.hdfs.batchSize =1000 erum.sinks.hdfs-sink-1.hdfs.rollSize =2684354560 erum.sinks.hdfs-sink-1.hdfs.rollInterval =5 erum.sinks.hdfs-sink-1.hdfs.rollCount =5000
的 现在我们使用代理运行水槽 - erum 强>
erum
bin/flume-ng agent -n erum -c conf -f conf/flume-spool.conf -Dflume.root.logger=DEBUG,console
的 复制了 products.json 里面的文件 erum.sources.source-1.spoolDir flume 配置指定目录。 强>
products.json
erum.sources.source-1.spoolDir
的 里面的内容 products.json 文件如下所示 - 强>
{"productid":"5968dd23fc13ae04d9000001","product_name":"sildenafilcitrate","mfgdate":"20160719031109","supplier":"WisozkInc","quantity":261,"unit_cost":"$10.47"} {"productid":"5968dd23fc13ae04d9000002","product_name":"MountainJuniperusashei","mfgdate":"20161003021009","supplier":"Keebler-Hilpert","quantity":292,"unit_cost":"$8.74"} {"productid":"5968dd23fc13ae04d9000003","product_name":"DextromathorphanHBr","mfgdate":"20161101041113","supplier":"Schmitt-Weissnat","quantity":211,"unit_cost":"$20.53"} {"productid":"5968dd23fc13ae04d9000004","product_name":"MeophanHBr","mfgdate":"20161101061113","supplier":"Schmitt-Weissnat","quantity":198,"unit_cost":"$18.73"}
的 从下面的网址下载hive-serdes-sources-1.0.6.jar- 强>
https://www.dropbox.com/s/lsjgk2zaqz8uli9/hive-serdes-sources-1.0.6.jar?dl=0
的 在使用flume-spool将json文件假脱机到hdfs集群之后,我们将启动hive服务器,登录到hive shell,然后执行以下操作 - 强>
hive> add jar /home/arif/applications/hadoop/apache-hive-2.1.1-bin/lib/hive-serdes-sources-1.0.6.jar; hive> create external table products (productid string, product_name string, mfgdate string, supplier string, quantity int, unit_cost string) > row format serde 'com.cloudera.hive.serde.JSONSerDe' location '/user/arif/flume_sink/products/'; OK Time taken: 0.211 seconds hive> select * from products; OK 5968dd23fc13ae04d9000001 sildenafilcitrate 20160719031109 WisozkInc 261 $10.47 5968dd23fc13ae04d9000002 MountainJuniperusashei 20161003021009 Keebler-Hilpert 292 $8.74 5968dd23fc13ae04d9000003 DextromathorphanHBr 20161101041113 Schmitt-Weissnat 211 $20.53 5968dd23fc13ae04d9000004 MeophanHBr 20161101061113 Schmitt-Weissnat 198 $18.73 Time taken: 0.291 seconds, Fetched: 4 row(s)
我已经完成了这些整个步骤,没有任何错误,希望这对你有所帮助,谢谢。
正如这篇文章中所解释的: http://shzhangji.com/blog/2017/08/05/how-to-extract-event-time-in-apache-flume/
所需的更改是包括一个拦截器和序列化器:
# SOURCES CONFIGURATION agent.sources.file.type = spooldir agent.sources.file.channels = channel agent.sources.file.spoolDir = /path/to/json_files agent.sources.file.interceptors = i1 agent.sources.file.interceptors.i1.type = regex_extractor agent.sources.file.interceptors.i1.regex = <regex_for_timestamp> agent.sources.file.interceptors.i1.serializers = s1 agent.sources.file.interceptors.i1.serializers.s1.type = org.apache.flume.interceptor.RegexExtractorInterceptorMillisSerializer agent.sources.file.interceptors.i1.serializers.s1.name = timestamp agent.sources.file.interceptors.i1.serializers.s1.pattern = <pattern_that_matches_your_regex>
谢谢你指出,除了链接,我需要包括一个适当的片段:)