Dear all,

The below code snippet uses kafka admin client to retrieve the last committed 
and produced offsets of all partitions for a certain consumer group namely 
CONSUMER_GROUP :


Map<TopicPartition, OffsetAndMetadata> offsets = 
admin.listConsumerGroupOffsets(CONSUMER_GROUP).partitionsToOffsetAndMetadata().get();
Map<TopicPartition, OffsetSpec> requestLatestOffsets = new HashMap<>();
for(TopicPartition tp: offsets.keySet()) {
    requestLatestOffsets.put(tp, OffsetSpec.latest());
}
Map<TopicPartition, ListOffsetsResult.ListOffsetsResultInfo> latestOffsets = 
admin.listOffsets(requestLatestOffsets).all().get();
for (Map.Entry<TopicPartition, OffsetAndMetadata> e: offsets.entrySet()) {
    String topic = e.getKey().topic();
    int partition = e.getKey().partition();
    long committedOffset = e.getValue().offset();
    long latestOffset = latestOffsets.get(e.getKey()).offset();
    System.out.println("Consumer group " + CONSUMER_GROUP + " has committed 
offset " + committedOffset + " to topic " + topic + " partition " + partition  
+ ". The latest offset in the partition is " + latestOffset + " so consumer  
group is "+ (latestOffset - committedOffset) + " records behind");
}



Can you please hint me on the below points:


  1.  If the consumer group is in rebalancing state (or another unstable 
state), would that (how) affect the returned last and committed offsets?

  2.

  3.  Given the code above, are there any specific way to know wether the group 
is undergoing a rebalance (or is generally in unstable state) when the admin 
request is being served by the Kafka brokers?

Reply via email to