Hi Ananya Are you able to resolve this issue ,I'm also facing same issue . What parameter should be pass here if I'm doing failover from cluster A ---> B
Map<TopicPartition, OffsetAndMetadata> newOffsets = RemoteClusterUtils.translateOffsets(properties, "A", "TestTopic-123", Duration.ofMillis(5500)); Properties= Bootstraps properties of cluster B TestTopic-123= Topic name at cluster A Thanks On 9/7/20, 8:43 AM, "Ananya Sen" <ananya281...@gmail.com> wrote: [External] Hello All, I was using a mirror maker 2.0. I was testing the consumer checkpointing functionality. I found that the RemoteClusterUtils.translateOffsets do not give checkpoints for the consumer which run in assign mode. I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12 My source Kafka setup is 1 broker 1 zookeeper having Kafka version 1.0.0. Scala version 2.11 My target Kafka setup is 1 broker 1 zookeeper having Kafka version 1.0.0. Scala version 2.11 I am only doing 1-way replication from my source cluster to the target cluster. Mirror Maker Config: ================ clusters = A, B A.bootstrap.servers = localhost:9082 B.bootstrap.servers = localhost:9092 A->B.enabled = true A->B.topics = .* A->B.groups = .* B->A.enabled = false B->A.topics = .* replication.factor=1 checkpoints.topic.replication.factor=1 heartbeats.topic.replication.factor=1 offset-syncs.topic.replication.factor=1 offset.storage.replication.factor=1 status.storage.replication.factor=1 config.storage.replication.factor=1 emit.heartbeats.interval.seconds = 2 refresh.topics.interval.seconds=1 refresh.groups.interval.seconds=1 emit.checkpoints.interval.seconds=1 sync.topic.configs.enabled=true sync.topic.configs.interval.seconds=1 replication.policy.class=com.ie.naukri.replicator.SimpleReplicationPolicy ============================================================================ In the replication policy, I have removed topic renaming and replicating the topic as it is (same name in target cluster as source cluster). Steps to replicate: ============= 1) Create a topic on the source cluster 2) Push some data in the topic using console producer 3) Start a consumer in assign mode to read from the above topic but only from 1 partition. Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9082"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName()); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "TestTopic-123"); properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2"); KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(properties); TopicPartition tp = new TopicPartition("TestTopic-123", 1); consumer.assign(Collections.singleton(tp)); while (true) { ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(500)); for (ConsumerRecord<byte[], byte[]> record : records) { System.out.println(new String(record.value()) + "__" + record.partition()); Thread.sleep(2000); } } } 4) Stop consumer mid-way. Describe the consumer in the source cluster to get the lag information. bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 --group TestTopic-123 GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG TestTopic-123 TestTopic-123 0 5 28 23 5) Run translate offset method to print the downstream offset. Map<TopicPartition, OffsetAndMetadata> newOffsets = RemoteClusterUtils.translateOffsets(properties, "A", "TestTopic-123", Duration.ofMillis(5500)); System.out.println(newOffsets.toString()); 6) An empty map is returned Expected Outcome: Translated Committed offset should have been returned. My Debugging =========== On debugging the issue, I found that the checkpoint topic in the target cluster did not have this group's committed offset. Tried multiple times with different commit frequency and topic/group name. It didn't work. Only consumer running in subscribe mode and console consumer with --group flag is giving checkpoint. Question ======== 1) Is it intended as functionality that the assign mode consumer can never be reset? Or is it a bug? Any help would be greatly appreciated. This e-mail and any files transmitted with it are for the sole use of the intended recipient(s) and may contain confidential and privileged information. If you are not the intended recipient(s), please reply to the sender and destroy all copies of the original message. Any unauthorized review, use, disclosure, dissemination, forwarding, printing or copying of this email, and/or any action taken in reliance on the contents of this e-mail is strictly prohibited and may be unlawful. Where permitted by applicable law, this e-mail and other e-mail communications sent to and from Cognizant e-mail addresses may be monitored. This e-mail and any files transmitted with it are for the sole use of the intended recipient(s) and may contain confidential and privileged information. If you are not the intended recipient(s), please reply to the sender and destroy all copies of the original message. Any unauthorized review, use, disclosure, dissemination, forwarding, printing or copying of this email, and/or any action taken in reliance on the contents of this e-mail is strictly prohibited and may be unlawful. Where permitted by applicable law, this e-mail and other e-mail communications sent to and from Cognizant e-mail addresses may be monitored.