I am still facing the same issue. On Fri, Sep 11, 2020, 4:46 PM <manoj.agraw...@cognizant.com> wrote:
> Hi Ananya > Are you able to resolve this issue ,I'm also facing same issue . > > What parameter should be pass here if I'm doing failover from cluster A > ---> B > > Map<TopicPartition, OffsetAndMetadata> newOffsets = > RemoteClusterUtils.translateOffsets(properties, "A", > "TestTopic-123", Duration.ofMillis(5500)); > > > > Properties= Bootstraps properties of cluster B > TestTopic-123= Topic name at cluster A > > Thanks > > > On 9/7/20, 8:43 AM, "Ananya Sen" <ananya281...@gmail.com> wrote: > > [External] > > > Hello All, > > I was using a mirror maker 2.0. I was testing the consumer > checkpointing functionality. I found that the > RemoteClusterUtils.translateOffsets do not give checkpoints for the > consumer which run in assign mode. > > I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version > 2.12 > My source Kafka setup is 1 broker 1 zookeeper having Kafka version > 1.0.0. Scala version 2.11 > My target Kafka setup is 1 broker 1 zookeeper having Kafka version > 1.0.0. Scala version 2.11 > > I am only doing 1-way replication from my source cluster to the target > cluster. > > Mirror Maker Config: > ================ > clusters = A, B > A.bootstrap.servers = localhost:9082 > B.bootstrap.servers = localhost:9092 > > A->B.enabled = true > A->B.topics = .* > A->B.groups = .* > > B->A.enabled = false > B->A.topics = .* > > replication.factor=1 > > checkpoints.topic.replication.factor=1 > heartbeats.topic.replication.factor=1 > offset-syncs.topic.replication.factor=1 > > offset.storage.replication.factor=1 > status.storage.replication.factor=1 > config.storage.replication.factor=1 > > emit.heartbeats.interval.seconds = 2 > refresh.topics.interval.seconds=1 > refresh.groups.interval.seconds=1 > emit.checkpoints.interval.seconds=1 > sync.topic.configs.enabled=true > sync.topic.configs.interval.seconds=1 > > replication.policy.class=com.ie.naukri.replicator.SimpleReplicationPolicy > > > ============================================================================ > In the replication policy, I have removed topic renaming and > replicating the topic as it is (same name in target cluster as source > cluster). > > > Steps to replicate: > ============= > 1) Create a topic on the source cluster > 2) Push some data in the topic using console producer > 3) Start a consumer in assign mode to read from the above topic but > only from 1 partition. > > Properties properties = new Properties(); > properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, > "localhost:9082"); > > properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, > ByteArrayDeserializer.class.getName()); > > properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, > ByteArrayDeserializer.class.getName()); > properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, > "TestTopic-123"); > properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, > "earliest"); > properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, > "2"); > > KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], > byte[]>(properties); > > TopicPartition tp = new TopicPartition("TestTopic-123", 1); > consumer.assign(Collections.singleton(tp)); > > while (true) { > ConsumerRecords<byte[], byte[]> records = > consumer.poll(Duration.ofMillis(500)); > for (ConsumerRecord<byte[], byte[]> record : records) { > System.out.println(new String(record.value()) + "__" + > record.partition()); > Thread.sleep(2000); > } > } > } > > 4) Stop consumer mid-way. Describe the consumer in the source cluster > to get the lag information. > > bin/kafka-consumer-groups.sh --describe --bootstrap-server > localhost:9082 --group TestTopic-123 > GROUP TOPIC PARTITION CURRENT-OFFSET > LOG-END-OFFSET LAG > TestTopic-123 TestTopic-123 0 5 28 > 23 > > 5) Run translate offset method to print the downstream offset. > > Map<TopicPartition, OffsetAndMetadata> newOffsets = > RemoteClusterUtils.translateOffsets(properties, "A", > "TestTopic-123", Duration.ofMillis(5500)); > System.out.println(newOffsets.toString()); > > 6) An empty map is returned > > Expected Outcome: Translated Committed offset should have been > returned. > > My Debugging > =========== > On debugging the issue, I found that the checkpoint topic in the > target cluster did not have this group's committed offset. > > Tried multiple times with different commit frequency and topic/group > name. It didn't work. Only consumer running in subscribe mode and console > consumer with --group flag is giving checkpoint. > > Question > ======== > 1) Is it intended as functionality that the assign mode consumer can > never be reset? Or is it a bug? > > > Any help would be greatly appreciated. > > > > > This e-mail and any files transmitted with it are for the sole use of the > intended recipient(s) and may contain confidential and privileged > information. If you are not the intended recipient(s), please reply to the > sender and destroy all copies of the original message. Any unauthorized > review, use, disclosure, dissemination, forwarding, printing or copying of > this email, and/or any action taken in reliance on the contents of this > e-mail is strictly prohibited and may be unlawful. Where permitted by > applicable law, this e-mail and other e-mail communications sent to and > from Cognizant e-mail addresses may be monitored. > This e-mail and any files transmitted with it are for the sole use of the > intended recipient(s) and may contain confidential and privileged > information. If you are not the intended recipient(s), please reply to the > sender and destroy all copies of the original message. Any unauthorized > review, use, disclosure, dissemination, forwarding, printing or copying of > this email, and/or any action taken in reliance on the contents of this > e-mail is strictly prohibited and may be unlawful. Where permitted by > applicable law, this e-mail and other e-mail communications sent to and > from Cognizant e-mail addresses may be monitored. >