Hi all, I am relatively new to kafka and my initial attempts at consuming messages are failing. My topic has 3 partitions and I am setting "auto.offset.reset" to "earliest". The call to poll hangs. Here's my code: ***********************************************************************************import org.apache.kafka.clients.consumer.*;import org.apache.kafka.common.TopicPartition;import java.util.*; public class TestConsumer { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "mytest5"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); props.put("max.poll.records", "50"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("mpsFleet4"), new RebalanceHandler(consumer)); while (true) { ConsumerRecords<String, String> records = consumer.poll(120000); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } }} class RebalanceHandler implements ConsumerRebalanceListener { Consumer<String, String> consumer; RebalanceHandler(Consumer<String, String> consumer){ this.consumer = consumer; } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Partitions revoked"); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("Partitions assigned"); //consumer.seekToBeginning(partitions); }} ***********************************************************************************
When I use ConsumerGroupCommand to check the offsets, I see the following: *********************************************************************************** GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG OWNERmytest5 mpsFleet4 0 unknown 397184 unknown consumer-1_/172.28.11.138mytest5 mpsFleet4 1 unknown 650207 unknown consumer-1_/172.28.11.138mytest5 mpsFleet4 2 unknown 451783 unknown consumer-1_/172.28.11.138*********************************************************************************** If I set "enable.auto.commit" to "true", the call to poll still hangs but all the values for CURRENT-OFFSET are same as LOG-END-OFFSET. I am really not able to understand what's going on and would really appreciate any help. I have even tried playing with explicitly seeking the partition offsets in my rebalance listener. ThanksSachin