Hello Kafka Users,

I am trying to run below sample code mentioned in Kafka documentation under
Automatic Offset Committing for a topic with 1 partition  (tried with 3 and
more partition as well). Create command as follows

bin/kafka-topics.sh --create --zookeeper <ZK>:2181 --replication-factor 3
--partitions 1 --topic test --config cleanup.policy=compact,delete

but the sample code always returns 0 records unless I provide a custom
ConsumerRebalanceListener (below) which sets consumer to beginning.

I wonder if the sample code given at Kafka documentation is wrong or am I
missing something?

https://kafka.apache.org/0101/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html


*Automatic Offset Committing*

This example demonstrates a simple usage of Kafka's consumer api that
relying on automatic offset committing.

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "test");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
             System.out.printf("offset = %d, key = %s, value = %s%n",
record.offset(), record.key(), record.value());
     }



====

public class SeekToBeginingConsumerRebalancerListener implements
org.apache.kafka.clients.consumer.ConsumerRebalanceListener {

     private Consumer<String, String> consumer;
     public SeekToBeginingConsumerRebalancerListener(KafkaConsumer<String,
String> consumer2) {
             this.consumer = consumer2;
     }
     public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
             for (TopicPartition partition : partitions) {

//offsetManager.saveOffsetInExternalStore(partition.topic(),
partition.partition(),consumer.position(partition));
             }
     }
     public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
            /* for (TopicPartition partition : partitions) {
                     consumer.seek(partition,seekTo));
             }*/
         consumer.seekToBeginning(partitions);
     }
}

Reply via email to