Badai Aqrandista created KAFKA-9459:
---------------------------------------

             Summary: MM2 sync topic config does work
                 Key: KAFKA-9459
                 URL: https://issues.apache.org/jira/browse/KAFKA-9459
             Project: Kafka
          Issue Type: Bug
          Components: mirrormaker
    Affects Versions: 2.4.0
            Reporter: Badai Aqrandista


I have MM2 configured as follow:

{code:java}
{
        "name": "mm2-from-1-to-2",
        "config": {
          
"connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector",
          "topics":"foo",
          "key.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
          "value.converter": 
"org.apache.kafka.connect.converters.ByteArrayConverter",
          "sync.topic.configs.enabled":"true",
          "sync.topic.configs.interval.seconds": 60,
          "sync.topic.acls.enabled": "false",
          "replication.factor": 1,
          "offset-syncs.topic.replication.factor": 1,
          "heartbeats.topic.replication.factor": 1,
          "checkpoints.topic.replication.factor": 1,

          "target.cluster.alias":"dest",
          "target.cluster.bootstrap.servers":"dest.example.com:9092",

          "source.cluster.alias":"src",
          "source.cluster.bootstrap.servers":"src.example.com:9092",

          "tasks.max": 1}
}
{code}

Topic "foo" is configured with "cleanup.policy=compact". But after waiting for 
15 minutes, I still don't see "src.foo" in the destination cluster has 
"cleanup.policy=compact".

I had the connect node to run in TRACE level and I could not find any calls to 
describeConfigs 
(https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L327).
 This implies it never actually get a list of topics that it needs to get topic 
configs from.

And I am suspecting this code 
(https://github.com/apache/kafka/blob/2.4.0/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorSourceConnector.java#L214-L220):


{code:java}
    private Set<String> topicsBeingReplicated() {
        return knownTopicPartitions.stream()
            .map(x -> x.topic())
            .distinct()
            .filter(x -> knownTargetTopics.contains(formatRemoteTopic(x)))
            .collect(Collectors.toSet());
    }
{code}

knownTopicPartitions contains topic-partitions from the source cluster.
knownTargetTopics contains topic-partitions from the target cluster, whose 
topic names contain source alias already.

So, why is topicsBeingReplicated (list of topic-partitions from source cluster) 
being filtered using knownTopicPartitions (list of topic-partitions from target 
cluster)?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to