Dan created KAFKA-4845:
--------------------------
Summary: KafkaConsumer.seekToEnd cannot take effect when
integrating with spark streaming
Key: KAFKA-4845
URL: https://issues.apache.org/jira/browse/KAFKA-4845
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 0.10.2.0, 0.10.1.1, 0.10.1.0
Reporter: Dan
When integrating with spark streaming, kafka consumer cannot get the latest
offsets except for one partition. The code snippet is as follows:
protected def latestOffsets(): Map[TopicPartition, Long] = {
val c = consumer
c.poll(0)
val parts = c.assignment().asScala
val newPartitions = parts.diff(currentOffsets.keySet)
currentOffsets = currentOffsets ++ newPartitions.map(tp => tp ->
c.position(tp)).toMap
c.pause(newPartitions.asJava)
c.seekToEnd(currentOffsets.keySet.asJava)
parts.map(tp => tp -> c.position(tp)).toMap
}
When calling consumer.position(topicPartition), it will call
updateFetchPositions(Collections.singleton(partition)):
The bug lies in updateFetchPositions(Set<TopicPartition> partitions):
fetcher.resetOffsetsIfNeeded(partitions); // reset to latest offset
for current partition
if (!subscriptions.hasAllFetchPositions()) { // called seekToEnd for
all partitions before, so this sentence will be true
coordinator.refreshCommittedOffsetsIfNeeded();
fetcher.updateFetchPositions(partitions); // reset to committed
offsets for current partition
}
So eventually there is only one partition(the last partition in assignment) can
get latest offset while all the others get the committed offset.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)