Hi all,
I am relatively new to kafka and my initial attempts at consuming messages are 
failing. My topic has 3 partitions and I am setting "auto.offset.reset" to 
"earliest". The call to poll hangs. Here's my code: 
***********************************************************************************import
 org.apache.kafka.clients.consumer.*;import 
org.apache.kafka.common.TopicPartition;import java.util.*;
public class TestConsumer {
    public static void main(String[] args){
        Properties props = new Properties();        
props.put("bootstrap.servers", "localhost:9092");        props.put("group.id", 
"mytest5");        props.put("enable.auto.commit", "false");        
props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");        
props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");        
props.put("auto.offset.reset", "earliest");        
props.put("max.poll.records", "50");        KafkaConsumer<String, String> 
consumer = new KafkaConsumer<>(props);        
consumer.subscribe(Arrays.asList("mpsFleet4"), new RebalanceHandler(consumer)); 
       while (true) {            ConsumerRecords<String, String> records = 
consumer.poll(120000);            for (ConsumerRecord<String, String> record : 
records)                System.out.printf("offset = %d, key = %s, value = 
%s%n", record.offset(), record.key(), record.value());        }    }}
class RebalanceHandler implements ConsumerRebalanceListener {
    Consumer<String, String> consumer;
    RebalanceHandler(Consumer<String, String> consumer){        this.consumer = 
consumer;    }
    @Override    public void onPartitionsRevoked(Collection<TopicPartition> 
partitions) {        System.out.println("Partitions revoked");    }    
@Override    public void onPartitionsAssigned(Collection<TopicPartition> 
partitions) {        System.out.println("Partitions assigned");        
//consumer.seekToBeginning(partitions);    }}
***********************************************************************************

When I use ConsumerGroupCommand to check the offsets, I see the following:

***********************************************************************************
GROUP      TOPIC         PARTITION     CURRENT-OFFSET     LOG-END-OFFSET        
LAG                      OWNERmytest5     mpsFleet4             0               
     unknown                          397184                unknown         
consumer-1_/172.28.11.138mytest5     mpsFleet4             1                    
unknown                          650207                unknown         
consumer-1_/172.28.11.138mytest5     mpsFleet4             2                    
unknown                          451783                unknown         
consumer-1_/172.28.11.138***********************************************************************************

If I set "enable.auto.commit" to "true", the call to poll still hangs but all 
the values for CURRENT-OFFSET are same as LOG-END-OFFSET.
I am really not able to understand what's going on and would really appreciate 
any help. I have even tried playing with explicitly seeking the partition 
offsets in my rebalance listener.
ThanksSachin

Reply via email to