要使自定义时间戳提取器类可用于S3连接器,您将需要以下内容:
添加带有自定义类的jar以及其他连接器的依赖项。例:
保存下 ./share/java/kafka-connect-s3 如果你想要这个 仅适用于S3连接器或中 ./share/java/kafka-connect-storage-common 让它可用 所有存储接收器连接器(当前为S3和HDFS连接器)。
./share/java/kafka-connect-s3
./share/java/kafka-connect-storage-common
io.confluent.connect.storage.partitioner.TimestampExtractor
设置时使用完全限定的类名 timestamp.extractor 连接器配置中的属性,当然要确保它与您在jar中定义和打包的包匹配。例如:
timestamp.extractor
timestamp.extractor=me.connectors.MyTimestampExtractor
最后,您将遵循类似的过程,使连接器可以使用自定义分区程序。