Hello All, I was using a mirror maker 2.0. I was testing the consumer checkpointing functionality. I found that the RemoteClusterUtils.translateOffsets do not give checkpoints for the consumer which run in assign mode.
I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12 My source Kafka setup is 1 broker 1 zookeeper having Kafka version 1.0.0. Scala version 2.11 My target Kafka setup is 1 broker 1 zookeeper having Kafka version 1.0.0. Scala version 2.11 I am only doing 1-way replication from my source cluster to the target cluster. Mirror Maker Config: ================ clusters = A, B A.bootstrap.servers = localhost:9082 B.bootstrap.servers = localhost:9092 A->B.enabled = true A->B.topics = .* A->B.groups = .* B->A.enabled = false B->A.topics = .* replication.factor=1 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 emit.heartbeats.interval.seconds = 2 refresh.topics.interval.seconds=1 refresh.groups.interval.seconds=1 emit.checkpoints.interval.seconds=1 sync.topic.configs.enabled=true sync.topic.configs.interval.seconds=1 replication.policy.class=com.ie.naukri.replicator.SimpleReplicationPolicy ============================================================================ In the replication policy, I have removed topic renaming and replicating the topic as it is (same name in target cluster as source cluster). Steps to replicate: ============= 1) Create a topic on the source cluster 2) Push some data in the topic using console producer 3) Start a consumer in assign mode to read from the above topic but only from 1 partition. Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9082"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "TestTopic-123"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2"); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(properties); TopicPartition tp = new TopicPartition("TestTopic-123", 1); consumer.assign(Collections.singleton(tp)); while (true) { ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<byte[], byte[]> record : records) { System.out.println(new String(record.value()) + "__" + record.partition()); Thread.sleep(2000); } } } 4) Stop consumer mid-way. Describe the consumer in the source cluster to get the lag information. bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 --group TestTopic-123 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG TestTopic-123 TestTopic-123 0 5 28 23 5) Run translate offset method to print the downstream offset. Map<TopicPartition, OffsetAndMetadata> newOffsets = RemoteClusterUtils.translateOffsets(properties, "A", "TestTopic-123", Duration.ofMillis(5500)); System.out.println(newOffsets.toString()); 6) An empty map is returned Expected Outcome: Translated Committed offset should have been returned. My Debugging =========== On debugging the issue, I found that the checkpoint topic in the target cluster did not have this group's committed offset. Tried multiple times with different commit frequency and topic/group name. It didn't work. Only consumer running in subscribe mode and console consumer with --group flag is giving checkpoint. Question ======== 1) Is it intended as functionality that the assign mode consumer can never be reset? Or is it a bug? Any help would be greatly appreciated.