[ https://issues.apache.org/jira/browse/KAFKA-4845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15898258#comment-15898258 ]
Vahid Hashemian edited comment on KAFKA-4845 at 3/6/17 10:20 PM: ----------------------------------------------------------------- [~DanC], The issue you raised sounds very similar to the one reported in [KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547], that was fixed in 0.10.2.0. Also, the seconds code snippet and the comments you added there apply to before [KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547] was [fixed|https://github.com/apache/kafka/commit/813897a00653351710d37acbbb598235e86db824#diff-267b7c1e68156c1301c56be63ae41dd0]. The function {{updateFetchPositions(Set<TopicPartition> partitions)}} currently looks like this: {code} fetcher.resetOffsetsIfNeeded(partitions); if (!subscriptions.hasAllFetchPositions(partitions)) { coordinator.refreshCommittedOffsetsIfNeeded(); fetcher.updateFetchPositions(partitions); } {code} So it sounds like you are not running the latest KafkaConsumer code. The issue you raised applies to 0.10.1.0 and 0.10.1.1 (a duplicate of [KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]), but not to 0.10.2.0. Please advise if I misunderstood the defect or am missing something. Thanks. was (Author: vahid): [~DanC], The issue you raised sounds very similar to the one reported in [KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547], that was fixed in 0.10.2.0. Also, the seconds code snippet and the comments you added there apply to before [KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547] was [fixed|https://github.com/apache/kafka/commit/813897a00653351710d37acbbb598235e86db824#diff-267b7c1e68156c1301c56be63ae41dd0]. The function {{updateFetchPositions(Set<TopicPartition> partitions)}} currently looks like this: {code} fetcher.resetOffsetsIfNeeded(partitions); if (!subscriptions.hasAllFetchPositions(partitions)) { coordinator.refreshCommittedOffsetsIfNeeded(); fetcher.updateFetchPositions(partitions); } {code} So it sounds like you are not running the latest KafkaConsumer code. The issue you raised applies to 0.10.1.0 and 0.10.1.1 (a duplicate of [KAFKA-4547|https://issues.apache.org/jira/browse/KAFKA-4547]), but not to 0.10.2.0. Please advise if I'm misunderstood the defect or am missing something. Thanks. > KafkaConsumer.seekToEnd cannot take effect when integrating with spark > streaming > -------------------------------------------------------------------------------- > > Key: KAFKA-4845 > URL: https://issues.apache.org/jira/browse/KAFKA-4845 > Project: Kafka > Issue Type: Bug > Components: clients > Affects Versions: 0.10.1.0, 0.10.1.1, 0.10.2.0 > Reporter: Dan > Assignee: Vahid Hashemian > > When integrating with spark streaming, kafka consumer cannot get the latest > offsets except for one partition. The code snippet is as follows: > {code} > protected def latestOffsets(): Map[TopicPartition, Long] = { > val c = consumer > c.poll(0) > val parts = c.assignment().asScala > val newPartitions = parts.diff(currentOffsets.keySet) > currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> > c.position(tp)).toMap > c.pause(newPartitions.asJava) > c.seekToEnd(currentOffsets.keySet.asJava) > parts.map(tp => tp -> c.position(tp)).toMap > } > {code} > When calling consumer.position(topicPartition), it will call > updateFetchPositions(Collections.singleton(partition)): > The bug lies in updateFetchPositions(Set<TopicPartition> partitions): > {code} > fetcher.resetOffsetsIfNeeded(partitions); // reset to latest > offset for current partition > if (!subscriptions.hasAllFetchPositions()) { // called seekToEnd for > all partitions before, so this sentence will be true > coordinator.refreshCommittedOffsetsIfNeeded(); > fetcher.updateFetchPositions(partitions); // reset to committed > offsets for current partition > } > {code} > So eventually there is only one partition(the last partition in assignment) > can get latest offset while all the others get the committed offset. -- This message was sent by Atlassian JIRA (v6.3.15#6346)