Hi Ananya
Are you able to resolve this issue ,I'm also facing same issue .

What parameter should be pass here if I'm doing failover from cluster A ---> B

Map<TopicPartition, OffsetAndMetadata> newOffsets =
            RemoteClusterUtils.translateOffsets(properties, "A", 
"TestTopic-123", Duration.ofMillis(5500));



Properties= Bootstraps properties of cluster B
TestTopic-123= Topic name at cluster A

Thanks


On 9/7/20, 8:43 AM, "Ananya Sen" <ananya281...@gmail.com> wrote:

    [External]


    Hello All,

    I was using a mirror maker 2.0. I was testing the consumer checkpointing 
functionality. I found that the RemoteClusterUtils.translateOffsets do not give 
checkpoints for the consumer which run in assign mode.

    I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12
    My source Kafka setup is 1 broker 1 zookeeper having Kafka version 1.0.0. 
Scala version 2.11
    My target Kafka setup is 1 broker 1 zookeeper having Kafka version 1.0.0. 
Scala version 2.11

    I am only doing 1-way replication from my source cluster to the target 
cluster.

    Mirror Maker Config:
    ================
    clusters = A, B
    A.bootstrap.servers = localhost:9082
    B.bootstrap.servers = localhost:9092

    A->B.enabled = true
    A->B.topics = .*
    A->B.groups = .*

    B->A.enabled = false
    B->A.topics = .*

    replication.factor=1

    checkpoints.topic.replication.factor=1
    heartbeats.topic.replication.factor=1
    offset-syncs.topic.replication.factor=1

    offset.storage.replication.factor=1
    status.storage.replication.factor=1
    config.storage.replication.factor=1

    emit.heartbeats.interval.seconds = 2
    refresh.topics.interval.seconds=1
    refresh.groups.interval.seconds=1
    emit.checkpoints.interval.seconds=1
    sync.topic.configs.enabled=true
    sync.topic.configs.interval.seconds=1
    replication.policy.class=com.ie.naukri.replicator.SimpleReplicationPolicy

    ============================================================================
    In the replication policy, I have removed topic renaming and replicating 
the topic as it is (same name in target cluster as source cluster).


    Steps to replicate:
    =============
    1) Create a topic on the source cluster
    2) Push some data in the topic using console producer
    3) Start a consumer in assign mode to read from the above topic but only 
from 1 partition.

        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9082");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            ByteArrayDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            ByteArrayDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "TestTopic-123");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2");

        KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], 
byte[]>(properties);

        TopicPartition tp = new TopicPartition("TestTopic-123", 1);
        consumer.assign(Collections.singleton(tp));

        while (true) {
          ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
          for (ConsumerRecord<byte[], byte[]> record : records) {
            System.out.println(new String(record.value()) + "__" + 
record.partition());
            Thread.sleep(2000);
          }
        }
      }

    4) Stop consumer mid-way. Describe the consumer in the source cluster to 
get the lag information.

    bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 
--group TestTopic-123
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  
LAG
    TestTopic-123   TestTopic-123   0          5               28              
23

    5) Run translate offset method to print the downstream offset.

    Map<TopicPartition, OffsetAndMetadata> newOffsets =
            RemoteClusterUtils.translateOffsets(properties, "A", 
"TestTopic-123", Duration.ofMillis(5500));
        System.out.println(newOffsets.toString());

    6) An empty map is returned

    Expected Outcome: Translated Committed offset should have been returned.

    My Debugging
    ===========
    On debugging the issue, I found that the checkpoint topic in the target 
cluster did not have this group's committed offset.

    Tried multiple times with different commit frequency and topic/group name. 
It didn't work. Only consumer running in subscribe mode and console consumer 
with --group flag is giving checkpoint.

    Question
    ========
    1) Is it intended as functionality that the assign mode consumer can never 
be reset? Or is it a bug?


    Any help would be greatly appreciated.




This e-mail and any files transmitted with it are for the sole use of the 
intended recipient(s) and may contain confidential and privileged information. 
If you are not the intended recipient(s), please reply to the sender and 
destroy all copies of the original message. Any unauthorized review, use, 
disclosure, dissemination, forwarding, printing or copying of this email, 
and/or any action taken in reliance on the contents of this e-mail is strictly 
prohibited and may be unlawful. Where permitted by applicable law, this e-mail 
and other e-mail communications sent to and from Cognizant e-mail addresses may 
be monitored.
This e-mail and any files transmitted with it are for the sole use of the 
intended recipient(s) and may contain confidential and privileged information. 
If you are not the intended recipient(s), please reply to the sender and 
destroy all copies of the original message. Any unauthorized review, use, 
disclosure, dissemination, forwarding, printing or copying of this email, 
and/or any action taken in reliance on the contents of this e-mail is strictly 
prohibited and may be unlawful. Where permitted by applicable law, this e-mail 
and other e-mail communications sent to and from Cognizant e-mail addresses may 
be monitored.

Reply via email to