Dan created KAFKA-4845: -------------------------- Summary: KafkaConsumer.seekToEnd cannot take effect when integrating with spark streaming Key: KAFKA-4845 URL: https://issues.apache.org/jira/browse/KAFKA-4845 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 0.10.2.0, 0.10.1.1, 0.10.1.0 Reporter: Dan
When integrating with spark streaming, kafka consumer cannot get the latest offsets except for one partition. The code snippet is as follows: protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer c.poll(0) val parts = c.assignment().asScala val newPartitions = parts.diff(currentOffsets.keySet) currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap c.pause(newPartitions.asJava) c.seekToEnd(currentOffsets.keySet.asJava) parts.map(tp => tp -> c.position(tp)).toMap } When calling consumer.position(topicPartition), it will call updateFetchPositions(Collections.singleton(partition)): The bug lies in updateFetchPositions(Set<TopicPartition> partitions): fetcher.resetOffsetsIfNeeded(partitions); // reset to latest offset for current partition if (!subscriptions.hasAllFetchPositions()) { // called seekToEnd for all partitions before, so this sentence will be true coordinator.refreshCommittedOffsetsIfNeeded(); fetcher.updateFetchPositions(partitions); // reset to committed offsets for current partition } So eventually there is only one partition(the last partition in assignment) can get latest offset while all the others get the committed offset. -- This message was sent by Atlassian JIRA (v6.3.15#6346)