I am still facing the same issue.

On Fri, Sep 11, 2020, 4:46 PM <manoj.agraw...@cognizant.com> wrote:

> Hi Ananya
> Are you able to resolve this issue ,I'm also facing same issue .
>
> What parameter should be pass here if I'm doing failover from cluster A
> ---> B
>
> Map<TopicPartition, OffsetAndMetadata> newOffsets =
>             RemoteClusterUtils.translateOffsets(properties, "A",
> "TestTopic-123", Duration.ofMillis(5500));
>
>
>
> Properties= Bootstraps properties of cluster B
> TestTopic-123= Topic name at cluster A
>
> Thanks
>
>
> On 9/7/20, 8:43 AM, "Ananya Sen" <ananya281...@gmail.com> wrote:
>
>     [External]
>
>
>     Hello All,
>
>     I was using a mirror maker 2.0. I was testing the consumer
> checkpointing functionality. I found that the
> RemoteClusterUtils.translateOffsets do not give checkpoints for the
> consumer which run in assign mode.
>
>     I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version
> 2.12
>     My source Kafka setup is 1 broker 1 zookeeper having Kafka version
> 1.0.0. Scala version 2.11
>     My target Kafka setup is 1 broker 1 zookeeper having Kafka version
> 1.0.0. Scala version 2.11
>
>     I am only doing 1-way replication from my source cluster to the target
> cluster.
>
>     Mirror Maker Config:
>     ================
>     clusters = A, B
>     A.bootstrap.servers = localhost:9082
>     B.bootstrap.servers = localhost:9092
>
>     A->B.enabled = true
>     A->B.topics = .*
>     A->B.groups = .*
>
>     B->A.enabled = false
>     B->A.topics = .*
>
>     replication.factor=1
>
>     checkpoints.topic.replication.factor=1
>     heartbeats.topic.replication.factor=1
>     offset-syncs.topic.replication.factor=1
>
>     offset.storage.replication.factor=1
>     status.storage.replication.factor=1
>     config.storage.replication.factor=1
>
>     emit.heartbeats.interval.seconds = 2
>     refresh.topics.interval.seconds=1
>     refresh.groups.interval.seconds=1
>     emit.checkpoints.interval.seconds=1
>     sync.topic.configs.enabled=true
>     sync.topic.configs.interval.seconds=1
>
> replication.policy.class=com.ie.naukri.replicator.SimpleReplicationPolicy
>
>
> ============================================================================
>     In the replication policy, I have removed topic renaming and
> replicating the topic as it is (same name in target cluster as source
> cluster).
>
>
>     Steps to replicate:
>     =============
>     1) Create a topic on the source cluster
>     2) Push some data in the topic using console producer
>     3) Start a consumer in assign mode to read from the above topic but
> only from 1 partition.
>
>         Properties properties = new Properties();
>         properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> "localhost:9082");
>
> properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
>             ByteArrayDeserializer.class.getName());
>
> properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
>             ByteArrayDeserializer.class.getName());
>         properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
> "TestTopic-123");
>         properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "earliest");
>         properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
> "2");
>
>         KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[],
> byte[]>(properties);
>
>         TopicPartition tp = new TopicPartition("TestTopic-123", 1);
>         consumer.assign(Collections.singleton(tp));
>
>         while (true) {
>           ConsumerRecords<byte[], byte[]> records =
> consumer.poll(Duration.ofMillis(500));
>           for (ConsumerRecord<byte[], byte[]> record : records) {
>             System.out.println(new String(record.value()) + "__" +
> record.partition());
>             Thread.sleep(2000);
>           }
>         }
>       }
>
>     4) Stop consumer mid-way. Describe the consumer in the source cluster
> to get the lag information.
>
>     bin/kafka-consumer-groups.sh --describe --bootstrap-server
> localhost:9082 --group TestTopic-123
>     GROUP           TOPIC           PARTITION  CURRENT-OFFSET
> LOG-END-OFFSET  LAG
>     TestTopic-123   TestTopic-123   0          5               28
>     23
>
>     5) Run translate offset method to print the downstream offset.
>
>     Map<TopicPartition, OffsetAndMetadata> newOffsets =
>             RemoteClusterUtils.translateOffsets(properties, "A",
> "TestTopic-123", Duration.ofMillis(5500));
>         System.out.println(newOffsets.toString());
>
>     6) An empty map is returned
>
>     Expected Outcome: Translated Committed offset should have been
> returned.
>
>     My Debugging
>     ===========
>     On debugging the issue, I found that the checkpoint topic in the
> target cluster did not have this group's committed offset.
>
>     Tried multiple times with different commit frequency and topic/group
> name. It didn't work. Only consumer running in subscribe mode and console
> consumer with --group flag is giving checkpoint.
>
>     Question
>     ========
>     1) Is it intended as functionality that the assign mode consumer can
> never be reset? Or is it a bug?
>
>
>     Any help would be greatly appreciated.
>
>
>
>
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. If you are not the intended recipient(s), please reply to the
> sender and destroy all copies of the original message. Any unauthorized
> review, use, disclosure, dissemination, forwarding, printing or copying of
> this email, and/or any action taken in reliance on the contents of this
> e-mail is strictly prohibited and may be unlawful. Where permitted by
> applicable law, this e-mail and other e-mail communications sent to and
> from Cognizant e-mail addresses may be monitored.
> This e-mail and any files transmitted with it are for the sole use of the
> intended recipient(s) and may contain confidential and privileged
> information. If you are not the intended recipient(s), please reply to the
> sender and destroy all copies of the original message. Any unauthorized
> review, use, disclosure, dissemination, forwarding, printing or copying of
> this email, and/or any action taken in reliance on the contents of this
> e-mail is strictly prohibited and may be unlawful. Where permitted by
> applicable law, this e-mail and other e-mail communications sent to and
> from Cognizant e-mail addresses may be monitored.
>

Reply via email to