Hello Kafka Users, I am trying to run below sample code mentioned in Kafka documentation under Automatic Offset Committing for a topic with 1 partition (tried with 3 and more partition as well). Create command as follows
bin/kafka-topics.sh --create --zookeeper <ZK>:2181 --replication-factor 3 --partitions 1 --topic test --config cleanup.policy=compact,delete but the sample code always returns 0 records unless I provide a custom ConsumerRebalanceListener (below) which sets consumer to beginning. I wonder if the sample code given at Kafka documentation is wrong or am I missing something? https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html *Automatic Offset Committing* This example demonstrates a simple usage of Kafka's consumer api that relying on automatic offset committing. Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("foo", "bar")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } ==== public class SeekToBeginingConsumerRebalancerListener implements org.apache.kafka.clients.consumer.ConsumerRebalanceListener { private Consumer<String, String> consumer; public SeekToBeginingConsumerRebalancerListener(KafkaConsumer<String, String> consumer2) { this.consumer = consumer2; } public void onPartitionsRevoked(Collection<TopicPartition> partitions) { for (TopicPartition partition : partitions) { //offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),consumer.position(partition)); } } public void onPartitionsAssigned(Collection<TopicPartition> partitions) { /* for (TopicPartition partition : partitions) { consumer.seek(partition,seekTo)); }*/ consumer.seekToBeginning(partitions); } }