[ https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Vahid Hashemian resolved KAFKA-4845. ------------------------------------ Resolution: Duplicate > KafkaConsumer.seekToEnd cannot take effect when integrating with spark > streaming > -------------------------------------------------------------------------------- > > Key: KAFKA-4845 > URL: https://issues.apache.org/jira/browse/KAFKA-4845 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0 > Reporter: Dan > Assignee: Vahid Hashemian > > When integrating with spark streaming, kafka consumer cannot get the latest > offsets except for one partition. The code snippet is as follows: > {code} > protected def latestOffsets(): Map[TopicPartition, Long] = { > val c = consumer > c.poll(0) > val parts = c.assignment().asScala > val newPartitions = parts.diff(currentOffsets.keySet) > currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> > c.position(tp)).toMap > c.pause(newPartitions.asJava) > c.seekToEnd(currentOffsets.keySet.asJava) > parts.map(tp => tp -> c.position(tp)).toMap > } > {code} > When calling consumer.position(topicPartition), it will call > updateFetchPositions(Collections.singleton(partition)): > The bug lies in updateFetchPositions(Set<TopicPartition> partitions): > {code} > fetcher.resetOffsetsIfNeeded(partitions); // reset to latest > offset for current partition > if (!subscriptions.hasAllFetchPositions()) { // called seekToEnd for > all partitions before, so this sentence will be true > coordinator.refreshCommittedOffsetsIfNeeded(); > fetcher.updateFetchPositions(partitions); // reset to committed > offsets for current partition > } > {code} > So eventually there is only one partition(the last partition in assignment) > can get latest offset while all the others get the committed offset. -- This message was sent by Atlassian JIRA (v6.3.15#6346)