Dan created KAFKA-4845:
--------------------------

             Summary: KafkaConsumer.seekToEnd cannot take effect when 
integrating with spark streaming
                 Key: KAFKA-4845
                 URL: https://issues.apache.org/jira/browse/KAFKA-4845
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 0.10.2.0, 0.10.1.1, 0.10.1.0
            Reporter: Dan


When integrating with spark streaming, kafka consumer cannot get the latest 
offsets except for one partition. The  code snippet is as follows: 
protected def latestOffsets(): Map[TopicPartition, Long] = {
    val c = consumer
    c.poll(0)
    val parts = c.assignment().asScala
    val newPartitions = parts.diff(currentOffsets.keySet)
    currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> 
c.position(tp)).toMap
    c.pause(newPartitions.asJava)
    c.seekToEnd(currentOffsets.keySet.asJava)
    parts.map(tp => tp -> c.position(tp)).toMap
  }

When calling consumer.position(topicPartition), it will call 
updateFetchPositions(Collections.singleton(partition)):

The bug lies in updateFetchPositions(Set<TopicPartition> partitions):
        fetcher.resetOffsetsIfNeeded(partitions);    // reset to latest offset 
for current partition
        if (!subscriptions.hasAllFetchPositions()) {  // called seekToEnd for 
all partitions before, so this sentence will be true 
            coordinator.refreshCommittedOffsetsIfNeeded();
            fetcher.updateFetchPositions(partitions);  // reset to committed 
offsets for current partition
        }

So eventually there is only one partition(the last partition in assignment) can 
get latest offset while all the others get the committed offset.




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to