Navneeth, it looks like you are trying to override the cluster aliases to
the empty string. If you look closely, these properties are defined using
incorrect syntax. I'm not sure how the properties would be parsed, but you
should fix that to rule it out as a source of confusion.

Then, be aware that overriding cluster aliases like that may not work as
you intend. Consider using a custom ReplicationPolicy if you are trying to
do "identity" replication. There are several implementations floating
around.

Ryanne

On Sun, Mar 7, 2021, 1:44 AM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> Hi All,
>
> I'm trying to use mirror maker 2 to replicate data to our new AWS MSK kafka
> cluster and I have been running into so many issues and I couldn't find
> proper documentation. Need some help and it's very urgent. Thanks
>
> Also I don't see any of my topics created.
>
> Note: There are no consumers on the destination brokers
>
> *Run Command*
> bin/connect-mirror-maker.sh config/mm2.properties
>
> *Kafka Versions*
> Source: 2.3
> MSK: 2.6.1
> Mirror Maker Node: 2.7 (Using 2.7 to replicate group offsets)
>
> *Exception*
> [2021-03-07 07:32:15,145] ERROR Scheduler for MirrorCheckpointConnector
> caught exception in scheduled task: creating internal topics
> (org.apache.kafka.connect.mirror.Scheduler:102)
> org.apache.kafka.connect.errors.ConnectException: Error while attempting to
> create/find topic(s) '"".checkpoints.internal'
>         at
> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:321)
>         at
>
> org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:109)
>         at
>
> org.apache.kafka.connect.mirror.MirrorUtils.createSinglePartitionCompactedTopic(MirrorUtils.java:114)
>         at
>
> org.apache.kafka.connect.mirror.MirrorCheckpointConnector.createInternalTopics(MirrorCheckpointConnector.java:163)
>         at org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
>         at
> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
>         at
>
> org.apache.kafka.connect.mirror.Scheduler.lambda$execute$2(Scheduler.java:63)
>         at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>
> *mm2.properties*
> # Kafka brokers.
> clusters = source, target
> source.bootstrap.servers = <<BROKERS>>
> target.bootstrap.servers = <<MSK_BROKERS>>
>
> # Source and target clusters configurations.
> source.config.storage.replication.factor = 2
> target.config.storage.replication.factor = 2
>
> source.offset.storage.replication.factor = 2
> target.offset.storage.replication.factor = 2
>
> source.status.storage.replication.factor = 2
> target.status.storage.replication.factor = 2
>
> source->target.enabled = true
> target->source.enabled = true
>
> # Mirror maker configurations.
> offset-syncs.topic.replication.factor = 2
> heartbeats.topic.replication.factor = 2
> checkpoints.topic.replication.factor = 2
>
> topics = .*
> groups = .*
>
> tasks.max = 3
> replication.factor = 2
> refresh.topics.enabled = true
> sync.topic.configs.enabled = true
> refresh.topics.interval.seconds = 10
>
> topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
> groups.blacklist = console-consumer-.*, connect-.*, __.*
>
> # Enable heartbeats and checkpoints.
> source->target.emit.heartbeats.enabled = true
> source->target.emit.checkpoints.enabled = true
>
> # customize as needed
> replication.policy.separator = ""
> source.cluster.alias: ""
> target.cluster.alias: ""
> # sync.topic.acls.enabled = false
> # emit.heartbeats.interval.seconds = 5
>
> Thanks
>

Reply via email to