[ https://issues.apache.org/jira/browse/KAFKA-10609?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ananya updated KAFKA-10609: --------------------------- Description: I was using a mirror maker 2.0. I was testing the consumer checkpointing functionality. I found that the RemoteClusterUtils.translateOffsets do not give checkpoints for the consumer which runs in assign mode. +*Setup Details*+ I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12 My source Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12 My target Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12 I am only doing 1-way replication from my source cluster to the target cluster. +*Mirror Maker Config:*+ {code:java} clusters = A, B A.bootstrap.servers = localhost:9082 B.bootstrap.servers = localhost:9092 A->B.enabled = true A->B.topics = .* A->B.groups = .* B->A.enabled = false B->A.topics = .* replication.factor=1 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 emit.heartbeats.interval.seconds = 2 refresh.topics.interval.seconds=1 refresh.groups.interval.seconds=1 emit.checkpoints.interval.seconds=1 sync.topic.configs.enabled=true sync.topic.configs.interval.seconds=1{code} +*Steps to replicate:*+ * Create a topic on the source cluster * Push some data in the topic using console producer * Start a consumer in assign mode to read from the above topic but only from 1 partition. {code:java} Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9082"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "7"); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(properties); TopicPartition tp = new TopicPartition("testTopic", 1); consumer.assign(Collections.singleton(tp)); while (true) { ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<byte[], byte[]> record : records) { System.out.println(new String(record.value()) + "__" + record.partition()); } } }{code} * Stop consumer mid-way. Describe the consumer in the source cluster to get the lag information. {code:java} bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 --group testTopic GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG testGroup testTopic 0 5 28 23 {code} * Run translate offset method to print the downstream offset. {code:java} Map<TopicPartition, OffsetAndMetadata> newOffsets = RemoteClusterUtils.translateOffsets(properties, "A", "testGroup", Duration.ofMillis(5500)); System.out.println(newOffsets.toString());{code} * *{color:#ff0000}An empty map is returned{color}* *+Expected Outcome:+ {color:#00875a}Translated Committed offset should have been returned.{color}* +*My Debugging*+ On debugging the issue, I found that the checkpoint topic in the target cluster did not have this group's committed offset. Tried multiple times with different commit frequency and topic/group name. It didn't work. Only consumer running in subscribe mode and console consumer with --group flag is giving checkpoint. was: I was using a mirror maker 2.0. I was testing the consumer checkpointing functionality. I found that the RemoteClusterUtils.translateOffsets do not give checkpoints for the consumer which runs in assign mode. I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12 My source Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12 My target Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. Scala version 2.12 I am only doing 1-way replication from my source cluster to the target cluster. Mirror Maker Config: ================ {code:java} clusters = A, B A.bootstrap.servers = localhost:9082 B.bootstrap.servers = localhost:9092 A->B.enabled = true A->B.topics = .* A->B.groups = .* B->A.enabled = false B->A.topics = .* replication.factor=1 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 emit.heartbeats.interval.seconds = 2 refresh.topics.interval.seconds=1 refresh.groups.interval.seconds=1 emit.checkpoints.interval.seconds=1 sync.topic.configs.enabled=true sync.topic.configs.interval.seconds=1{code} +*Steps to replicate:*+ * Create a topic on the source cluster * Push some data in the topic using console producer * Start a consumer in assign mode to read from the above topic but only from 1 partition. {code:java} Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9082"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "7"); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(properties); TopicPartition tp = new TopicPartition("testTopic", 1); consumer.assign(Collections.singleton(tp)); while (true) { ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<byte[], byte[]> record : records) { System.out.println(new String(record.value()) + "__" + record.partition()); } } }{code} * Stop consumer mid-way. Describe the consumer in the source cluster to get the lag information. {code:java} bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 --group testTopic GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG testGroup testTopic 0 5 28 23 {code} * Run translate offset method to print the downstream offset. {code:java} Map<TopicPartition, OffsetAndMetadata> newOffsets = RemoteClusterUtils.translateOffsets(properties, "A", "testGroup", Duration.ofMillis(5500)); System.out.println(newOffsets.toString());{code} * *{color:#FF0000}An empty map is returned{color}* *+Expected Outcome:+ {color:#00875a}Translated Committed offset should have been returned.{color}* +*My Debugging*+ On debugging the issue, I found that the checkpoint topic in the target cluster did not have this group's committed offset. Tried multiple times with different commit frequency and topic/group name. It didn't work. Only consumer running in subscribe mode and console consumer with --group flag is giving checkpoint. > Mirror Maker 2.0 RemoteClusterUtils do not return offset map for Assign Mode > Consumers > -------------------------------------------------------------------------------------- > > Key: KAFKA-10609 > URL: https://issues.apache.org/jira/browse/KAFKA-10609 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 2.5.0 > Environment: Ubuntu 19 8 core 16GB machine > Reporter: Ananya > Priority: Major > > I was using a mirror maker 2.0. I was testing the consumer checkpointing > functionality. I found that the RemoteClusterUtils.translateOffsets do not > give checkpoints for the consumer which runs in assign mode. > +*Setup Details*+ > I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12 > My source Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. > Scala version 2.12 > My target Kafka setup is 1 broker 1 zookeeper having Kafka version 2.5.0. > Scala version 2.12 > I am only doing 1-way replication from my source cluster to the target > cluster. > +*Mirror Maker Config:*+ > > {code:java} > clusters = A, B > A.bootstrap.servers = localhost:9082 > B.bootstrap.servers = localhost:9092 > A->B.enabled = true > A->B.topics = .* > A->B.groups = .* > B->A.enabled = false > B->A.topics = .* > replication.factor=1 > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > emit.heartbeats.interval.seconds = 2 > refresh.topics.interval.seconds=1 > refresh.groups.interval.seconds=1 > emit.checkpoints.interval.seconds=1 > sync.topic.configs.enabled=true > sync.topic.configs.interval.seconds=1{code} > > +*Steps to replicate:*+ > * Create a topic on the source cluster > * Push some data in the topic using console producer > * Start a consumer in assign mode to read from the above topic but only from > 1 partition. > > {code:java} > Properties properties = new Properties(); > properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9082"); > properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > ByteArrayDeserializer.class.getName()); > properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,ByteArrayDeserializer.class.getName()); > properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "testGroup"); > properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); > properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "7"); > KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], > byte[]>(properties); > TopicPartition tp = new TopicPartition("testTopic", 1); > consumer.assign(Collections.singleton(tp)); > while (true) { > ConsumerRecords<byte[], byte[]> records = > consumer.poll(Duration.ofMillis(500)); > for (ConsumerRecord<byte[], byte[]> record : records) { > System.out.println(new String(record.value()) + "__" + > record.partition()); > } > } > }{code} > * Stop consumer mid-way. Describe the consumer in the source cluster to get > the lag information. > {code:java} > bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 > --group testTopic > GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG > > testGroup testTopic 0 5 28 23 > {code} > * Run translate offset method to print the downstream offset. > > {code:java} > Map<TopicPartition, OffsetAndMetadata> newOffsets = > RemoteClusterUtils.translateOffsets(properties, "A", "testGroup", > Duration.ofMillis(5500)); > System.out.println(newOffsets.toString());{code} > * *{color:#ff0000}An empty map is returned{color}* > *+Expected Outcome:+ {color:#00875a}Translated Committed offset should have > been returned.{color}* > +*My Debugging*+ > On debugging the issue, I found that the checkpoint topic in the target > cluster did not have this group's committed offset. > Tried multiple times with different commit frequency and topic/group name. It > didn't work. Only consumer running in subscribe mode and console consumer > with --group flag is giving checkpoint. > -- This message was sent by Atlassian Jira (v8.3.4#803005)