我没有亲自使用过
ParquetFormat
,但要么
必须来自Avro数据(因为原生的Parquet-Avro项目)。所以,
AvroConverter
必须设置,然后添加
value.converter.schema.registry.url
属性,它要求您必须运行并安装Confluent Schema Registry,是的。
您必须使用Kafka Connect的特殊JSON格式,其中包含记录中的架构
。它不能是“普通的JSON”。即你现在有
“value.converter.schemas.enable”: “true”
,我猜你的连接器不工作,因为你的记录不是上述格式。
</醇>
基本上,没有模式,JSON解析器无法知道Parquet需要写什么“列”。
每日分区程序不会每天创建一个文件,只创建一个目录。你将得到一个文件
flush.size
并且还有用于刷新文件的预定旋转间隔的配置。此外,每个Kafka分区将有一个文件。
也,
“consumer.auto.offset.reset”: “earliest”,
只适用于
connect-distribtued.properties
文件,而不是基于每个连接器的基础,AFAIK。
由于我没有亲自使用过
ParquetFormat
,这是我能给出的所有建议,但我已经习惯了
其他工具如NiFi
对于类似的目标,这将允许您不更改现有的Kafka生产者代码。
或者,使用
JSONFormat
相反,Hive集成将无法自动运行,并且必须预先定义表(这将要求您拥有主题的架构)。
另一个选择就是
配置Hive直接从Kafka读取