我正在尝试的方案如下:
1- Flume TAILDIR从日志文件中读取源代码并将静态拦截器附加到消息的开头。拦截器由主机组成……
所以问题来自Kafka Consumer。它从水槽收到完整的消息
Interceptor + some garbage characters + message
如果其中一个垃圾字符是\ n(Linux系统中的LF),那么它将假设其2条消息,而不是1条消息。
我在Streamsets中使用Kafka Consumer元素,因此更改消息分隔符很简单。我做到了\ r \ n,现在它工作正常。
如果您将完整的消息作为字符串处理并想要在其上应用正则表达式或想要将其写入文件,那么最好用空字符串替换\ r和\ n。
的 可以在此处找到答案的完整演练: 强>
https://community.cloudera.com/t5/Data-Ingestion-Integration/Flume-TAILDIR-Source-to-Kafka-Sink-Static-Interceptor-Issue/m-p/86388#M3508