我遇到过类似的问题。 你是对的,通过使用directStream,意味着直接使用kafka低级API,它没有更新读者偏移量。 scala / java有几个例子,但不适用于python。 但是你自己很容易做到,你需要做的是:
例如,我通过执行以下操作来保存redis中每个分区的偏移量:
stream.foreachRDD(lambda rdd: save_offset(rdd)) def save_offset(rdd): ranges = rdd.offsetRanges() for rng in ranges: rng.untilOffset # save offset somewhere
然后在开始时,您可以使用:
fromoffset = {} topic_partition = TopicAndPartition(topic, partition) fromoffset[topic_partition]= int(value) #the value of int read from where you store previously.
对于一些使用zk跟踪偏移量的工具,最好在zookeeper中保存偏移量。 这一页: https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html 描述如何设置偏移量,基本上,zk节点是: / consumers / [consumer_name] / offsets / [topic name] / [partition id] 因为我们正在使用directStream,所以你必须组成一个消费者名称。
我编写了一些函数来保存并使用python读取Kafka偏移量 卡祖笛 图书馆。
第一个获取Kazoo客户端单例的函数:
ZOOKEEPER_SERVERS = "127.0.0.1:2181" def get_zookeeper_instance(): from kazoo.client import KazooClient if 'KazooSingletonInstance' not in globals(): globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS) globals()['KazooSingletonInstance'].start() return globals()['KazooSingletonInstance']
然后函数读取和写入偏移量:
def read_offsets(zk, topics): from pyspark.streaming.kafka import TopicAndPartition from_offsets = {} for topic in topics: for partition in zk.get_children(f'/consumers/{topic}'): topic_partion = TopicAndPartition(topic, int(partition)) offset = int(zk.get(f'/consumers/{topic}/{partition}')[0]) from_offsets[topic_partion] = offset return from_offsets def save_offsets(rdd): zk = get_zookeeper_instance() for offset in rdd.offsetRanges(): path = f"/consumers/{offset.topic}/{offset.partition}" zk.ensure_path(path) zk.set(path, str(offset.untilOffset).encode())
然后在开始流式传输之前,您可以从zookeeper读取偏移并将它们传递给 createDirectStream 对于 fromOffsets 参数:
fromOffsets
from pyspark import SparkContext from pyspark.streaming import StreamingContext from pyspark.streaming.kafka import KafkaUtils def main(brokers="127.0.0.1:9092", topics=['test1', 'test2']): sc = SparkContext(appName="PythonStreamingSaveOffsets") ssc = StreamingContext(sc, 2) zk = get_zookeeper_instance() from_offsets = read_offsets(zk, topics) directKafkaStream = KafkaUtils.createDirectStream( ssc, topics, {"metadata.broker.list": brokers}, fromOffsets=from_offsets) directKafkaStream.foreachRDD(save_offsets) if __name__ == "__main__": main()