Hello All,

I was using a mirror maker 2.0. I was testing the consumer checkpointing 
functionality. I found that the RemoteClusterUtils.translateOffsets do not give 
checkpoints for the consumer which run in assign mode.

I am using mirror maker 2.0 of Kafka Version 2.5.0 and Scala version 2.12
My source Kafka setup is 1 broker 1 zookeeper having Kafka version 1.0.0. Scala 
version 2.11
My target Kafka setup is 1 broker 1 zookeeper having Kafka version 1.0.0. Scala 
version 2.11

I am only doing 1-way replication from my source cluster to the target cluster.

Mirror Maker Config: 
================
clusters = A, B
A.bootstrap.servers = localhost:9082
B.bootstrap.servers = localhost:9092

A->B.enabled = true
A->B.topics = .*
A->B.groups = .*

B->A.enabled = false
B->A.topics = .*

replication.factor=1

checkpoints.topic.replication.factor=1
heartbeats.topic.replication.factor=1
offset-syncs.topic.replication.factor=1

offset.storage.replication.factor=1
status.storage.replication.factor=1
config.storage.replication.factor=1

emit.heartbeats.interval.seconds = 2
refresh.topics.interval.seconds=1
refresh.groups.interval.seconds=1
emit.checkpoints.interval.seconds=1
sync.topic.configs.enabled=true
sync.topic.configs.interval.seconds=1
replication.policy.class=com.ie.naukri.replicator.SimpleReplicationPolicy

============================================================================
In the replication policy, I have removed topic renaming and replicating the 
topic as it is (same name in target cluster as source cluster).


Steps to replicate: 
=============
1) Create a topic on the source cluster 
2) Push some data in the topic using console producer
3) Start a consumer in assign mode to read from the above topic but only from 1 
partition.

    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
"localhost:9082");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        ByteArrayDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        ByteArrayDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "TestTopic-123");
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "2");

    KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], 
byte[]>(properties);

    TopicPartition tp = new TopicPartition("TestTopic-123", 1);
    consumer.assign(Collections.singleton(tp));

    while (true) {
      ConsumerRecords<byte[], byte[]> records = 
consumer.poll(Duration.ofMillis(500));
      for (ConsumerRecord<byte[], byte[]> record : records) {
        System.out.println(new String(record.value()) + "__" + 
record.partition());
        Thread.sleep(2000);
      }
    }
  }

4) Stop consumer mid-way. Describe the consumer in the source cluster to get 
the lag information.

bin/kafka-consumer-groups.sh --describe --bootstrap-server localhost:9082 
--group TestTopic-123
GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  
           
TestTopic-123   TestTopic-123   0          5               28              23   
         

5) Run translate offset method to print the downstream offset.

Map<TopicPartition, OffsetAndMetadata> newOffsets =
        RemoteClusterUtils.translateOffsets(properties, "A", "TestTopic-123", 
Duration.ofMillis(5500));
    System.out.println(newOffsets.toString());

6) An empty map is returned

Expected Outcome: Translated Committed offset should have been returned.

My Debugging
===========
On debugging the issue, I found that the checkpoint topic in the target cluster 
did not have this group's committed offset.

Tried multiple times with different commit frequency and topic/group name. It 
didn't work. Only consumer running in subscribe mode and console consumer with 
--group flag is giving checkpoint.

Question
========
1) Is it intended as functionality that the assign mode consumer can never be 
reset? Or is it a bug?


Any help would be greatly appreciated. 


Reply via email to