Pranav Nakhe created KAFKA-4547:
-----------------------------------

             Summary: Consumer.position returns incorrect results for Kafka 
0.10.1.0 client
                 Key: KAFKA-4547
                 URL: https://issues.apache.org/jira/browse/KAFKA-4547
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 0.10.1.0
         Environment: Windows Kafka 0.10.1.0
            Reporter: Pranav Nakhe


Consider the following code -

                KafkaConsumer<String, String> consumer = new 
KafkaConsumer<String, String>(props);
                List<TopicPartition> listOfPartitions = new ArrayList();
                for (int i = 0; i < 
consumer.partitionsFor("IssueTopic").size(); i++) {
                        listOfPartitions.add(new TopicPartition("IssueTopic", 
i));
                }
                consumer.assign(listOfPartitions);              
                consumer.pause(listOfPartitions);
                consumer.seekToEnd(listOfPartitions);
//              consumer.resume(listOfPartitions); -- commented out
                for(int i = 0; i < listOfPartitions.size(); i++) {
                        
System.out.println(consumer.position(listOfPartitions.get(i)));
                }
                
I have created a topic IssueTopic with 3 partitions with a single replica on my 
single node kafka installation (0.10.1.0)

The behavior noticed for Kafka client 0.10.1.0 as against Kafka client 0.10.0.1

A) Initially when there are no messages on IssueTopic running the above program 
returns
0.10.1.0                0.10.0.1
0                               0
0                               0
0                               0

B) Next I send 6 messages and see that the messages have been evenly 
distributed across the three partitions. Running the above program now returns 
0.10.1.0                0.10.0.1
0                               2
0                               2
2                               2

Clearly there is a difference in behavior for the 2 clients.

Now after seekToEnd call if I make a call to resume (uncomment the resume call 
in code above) then the behavior is

0.10.1.0                0.10.0.1
2                               2
2                               2
2                               2

This is an issue I came across when using the spark kafka integration for 0.10. 
When I use kafka 0.10.1.0 I started seeing this issue. I had raised a pull 
request to resolve that issue [SPARK-18779] but when looking at the kafka 
client implementation/documentation now it seems the issue is with kafka and 
not with spark. There does not seem to be any documentation which 
specifies/implies that we need to call resume after seekToEnd for position to 
return the correct value. Also there is a clear difference in the behavior in 
the two kafka client implementations. 




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to