Navneeth, it looks like you are trying to override the cluster aliases to the empty string. If you look closely, these properties are defined using incorrect syntax. I'm not sure how the properties would be parsed, but you should fix that to rule it out as a source of confusion.
Then, be aware that overriding cluster aliases like that may not work as you intend. Consider using a custom ReplicationPolicy if you are trying to do "identity" replication. There are several implementations floating around. Ryanne On Sun, Mar 7, 2021, 1:44 AM Navneeth Krishnan <reachnavnee...@gmail.com> wrote: > Hi All, > > I'm trying to use mirror maker 2 to replicate data to our new AWS MSK kafka > cluster and I have been running into so many issues and I couldn't find > proper documentation. Need some help and it's very urgent. Thanks > > Also I don't see any of my topics created. > > Note: There are no consumers on the destination brokers > > *Run Command* > bin/connect-mirror-maker.sh config/mm2.properties > > *Kafka Versions* > Source: 2.3 > MSK: 2.6.1 > Mirror Maker Node: 2.7 (Using 2.7 to replicate group offsets) > > *Exception* > [2021-03-07 07:32:15,145] ERROR Scheduler for MirrorCheckpointConnector > caught exception in scheduled task: creating internal topics > (org.apache.kafka.connect.mirror.Scheduler:102) > org.apache.kafka.connect.errors.ConnectException: Error while attempting to > create/find topic(s) '"".checkpoints.internal' > at > org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:321) > at > > org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:109) > at > > org.apache.kafka.connect.mirror.MirrorUtils.createSinglePartitionCompactedTopic(MirrorUtils.java:114) > at > > org.apache.kafka.connect.mirror.MirrorCheckpointConnector.createInternalTopics(MirrorCheckpointConnector.java:163) > at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) > at > org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) > at > > org.apache.kafka.connect.mirror.Scheduler.lambda$execute$2(Scheduler.java:63) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > > *mm2.properties* > # Kafka brokers. > clusters = source, target > source.bootstrap.servers = <<BROKERS>> > target.bootstrap.servers = <<MSK_BROKERS>> > > # Source and target clusters configurations. > source.config.storage.replication.factor = 2 > target.config.storage.replication.factor = 2 > > source.offset.storage.replication.factor = 2 > target.offset.storage.replication.factor = 2 > > source.status.storage.replication.factor = 2 > target.status.storage.replication.factor = 2 > > source->target.enabled = true > target->source.enabled = true > > # Mirror maker configurations. > offset-syncs.topic.replication.factor = 2 > heartbeats.topic.replication.factor = 2 > checkpoints.topic.replication.factor = 2 > > topics = .* > groups = .* > > tasks.max = 3 > replication.factor = 2 > refresh.topics.enabled = true > sync.topic.configs.enabled = true > refresh.topics.interval.seconds = 10 > > topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets > groups.blacklist = console-consumer-.*, connect-.*, __.* > > # Enable heartbeats and checkpoints. > source->target.emit.heartbeats.enabled = true > source->target.emit.checkpoints.enabled = true > > # customize as needed > replication.policy.separator = "" > source.cluster.alias: "" > target.cluster.alias: "" > # sync.topic.acls.enabled = false > # emit.heartbeats.interval.seconds = 5 > > Thanks >