Hi All, I was able to set up MM2 successfully but I see the same message being sent again and again. Also I see the following log message which I don't understand why. The topics on source and destination have the same number of partitions. Please give me some advice.
[2021-03-10 09:41:02,072] INFO Found 0 new topic-partitions on target. Found 0 deleted topic-partitions on target. Found 699 topic-partitions missing on source. (org.apache.kafka.connect.mirror.MirrorSourceConnector:241) Here is my mm2.properties clusters = source, target source.bootstrap.servers = <<BROKERS>> target.bootstrap.servers = <<BROKERS>> # Source and target clusters configurations. source.config.storage.replication.factor = 2 target.config.storage.replication.factor = 2 source.offset.storage.replication.factor = 2 target.offset.storage.replication.factor = 2 source.status.storage.replication.factor = 2 target.status.storage.replication.factor = 2 source->target.enabled = true target->source.enabled = true # Mirror maker configurations. offset-syncs.topic.replication.factor = 2 heartbeats.topic.replication.factor = 2 checkpoints.topic.replication.factor = 2 topics = .* groups = .* replication.policy.class=com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy source.cluster.producer.enable.idempotence = true target.cluster.producer.enable.idempotence = true tasks.max = 3 replication.factor = 2 refresh.topics.enabled = true sync.topic.configs.enabled = true refresh.topics.interval.seconds = 10 topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets groups.blacklist = console-consumer-.*, connect-.*, __.* # Enable heartbeats and checkpoints. source->target.emit.heartbeats.enabled = true source->target.emit.checkpoints.enabled = true target->source.emit.heartbeats.enabled = true target->source.emit.checkpoints.enabled = true # customize as needed sync.topic.acls.enabled = false emit.heartbeats.interval.seconds = 10 emit.checkpoints.interval.seconds = 10 Thanks On Sun, Mar 7, 2021 at 10:17 PM Navneeth Krishnan <reachnavnee...@gmail.com> wrote: > Thanks, I checked a consumer group using the below command on old and new > clusters but the new cluster shows error consumer group doesn't exist. Am I > missing something? > > As per the properties all groups should be included except the backlisted > ones. > > kafka-consumer-groups.sh --bootstrap-server <<BROKER>> --describe --group > test-group --offsets > > groups = .* > groups.blacklist = console-consumer-.*, connect-.*, __.* > > Thanks > > On Sun, Mar 7, 2021 at 9:54 PM Ryanne Dolan <ryannedo...@gmail.com> wrote: > >> You can see the replicated consumer group offsets using the >> kafka-consumer-groups.sh tool. Make sure the consumer group is in the >> groups allowlist. >> >> Ryanne >> >> On Sun, Mar 7, 2021, 11:09 PM Navneeth Krishnan <reachnavnee...@gmail.com >> > >> wrote: >> >> > Hi Ryanne, >> > >> > I commented out the alias config and set target to source as false. Now >> I >> > don't see the error anymore and everything looks good. Thanks a lot for >> the >> > feedback. One more question, how can I check if the consumer group >> offsets >> > are replicated. If I switch over an application to this new cluster, the >> > consumer should just read from where it left off right. >> > >> > Also I will try the custom replication policy and let you know. >> > >> > Here is the new config. >> > >> > # Kafka datacenters. >> > clusters = source, target >> > source.bootstrap.servers = loc-kafka01.eu-prod.dnaspaces.io:9092, >> > loc-kafka02.eu-prod.dnaspaces.io:9092 >> > target.bootstrap.servers = loc-msk-kafka-1.eu-prod.dnaspaces.io:9092, >> > loc-msk-kafka-2.eu-prod.dnaspaces.io:9092, >> > loc-msk-kafka-3.eu-prod.dnaspaces.io:9092 >> > >> > # Source and target clusters configurations. >> > source.config.storage.replication.factor = 2 >> > target.config.storage.replication.factor = 2 >> > >> > source.offset.storage.replication.factor = 2 >> > target.offset.storage.replication.factor = 2 >> > >> > source.status.storage.replication.factor = 2 >> > target.status.storage.replication.factor = 2 >> > >> > source->target.enabled = true >> > target->source.enabled = false >> > >> > # Mirror maker configurations. >> > offset-syncs.topic.replication.factor = 2 >> > heartbeats.topic.replication.factor = 2 >> > checkpoints.topic.replication.factor = 2 >> > >> > topics = .* >> > groups = .* >> > >> > tasks.max = 3 >> > replication.factor = 2 >> > refresh.topics.enabled = true >> > sync.topic.configs.enabled = true >> > refresh.topics.interval.seconds = 10 >> > >> > topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets >> > groups.blacklist = console-consumer-.*, connect-.*, __.* >> > >> > # Enable heartbeats and checkpoints. >> > source->target.emit.heartbeats.enabled = true >> > source->target.emit.checkpoints.enabled = true >> > >> > # customize as needed >> > # replication.policy.separator = "" >> > # source.cluster.alias: "" >> > # target.cluster.alias: "" >> > # sync.topic.acls.enabled = false >> > # emit.heartbeats.interval.seconds = 5 >> > >> > Thanks >> > >> > On Sun, Mar 7, 2021 at 2:17 PM Ryanne Dolan <ryannedo...@gmail.com> >> wrote: >> > >> > > Navneeth, it looks like you are trying to override the cluster >> aliases to >> > > the empty string. If you look closely, these properties are defined >> using >> > > incorrect syntax. I'm not sure how the properties would be parsed, but >> > you >> > > should fix that to rule it out as a source of confusion. >> > > >> > > Then, be aware that overriding cluster aliases like that may not work >> as >> > > you intend. Consider using a custom ReplicationPolicy if you are >> trying >> > to >> > > do "identity" replication. There are several implementations floating >> > > around. >> > > >> > > Ryanne >> > > >> > > On Sun, Mar 7, 2021, 1:44 AM Navneeth Krishnan < >> reachnavnee...@gmail.com >> > > >> > > wrote: >> > > >> > > > Hi All, >> > > > >> > > > I'm trying to use mirror maker 2 to replicate data to our new AWS >> MSK >> > > kafka >> > > > cluster and I have been running into so many issues and I couldn't >> find >> > > > proper documentation. Need some help and it's very urgent. Thanks >> > > > >> > > > Also I don't see any of my topics created. >> > > > >> > > > Note: There are no consumers on the destination brokers >> > > > >> > > > *Run Command* >> > > > bin/connect-mirror-maker.sh config/mm2.properties >> > > > >> > > > *Kafka Versions* >> > > > Source: 2.3 >> > > > MSK: 2.6.1 >> > > > Mirror Maker Node: 2.7 (Using 2.7 to replicate group offsets) >> > > > >> > > > *Exception* >> > > > [2021-03-07 07:32:15,145] ERROR Scheduler for >> MirrorCheckpointConnector >> > > > caught exception in scheduled task: creating internal topics >> > > > (org.apache.kafka.connect.mirror.Scheduler:102) >> > > > org.apache.kafka.connect.errors.ConnectException: Error while >> > attempting >> > > to >> > > > create/find topic(s) '"".checkpoints.internal' >> > > > at >> > > > >> > > >> > >> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:321) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:109) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.kafka.connect.mirror.MirrorUtils.createSinglePartitionCompactedTopic(MirrorUtils.java:114) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.kafka.connect.mirror.MirrorCheckpointConnector.createInternalTopics(MirrorCheckpointConnector.java:163) >> > > > at >> > > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93) >> > > > at >> > > > >> > > >> > >> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112) >> > > > at >> > > > >> > > > >> > > >> > >> org.apache.kafka.connect.mirror.Scheduler.lambda$execute$2(Scheduler.java:63) >> > > > at >> > > > >> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> > > > >> > > > *mm2.properties* >> > > > # Kafka brokers. >> > > > clusters = source, target >> > > > source.bootstrap.servers = <<BROKERS>> >> > > > target.bootstrap.servers = <<MSK_BROKERS>> >> > > > >> > > > # Source and target clusters configurations. >> > > > source.config.storage.replication.factor = 2 >> > > > target.config.storage.replication.factor = 2 >> > > > >> > > > source.offset.storage.replication.factor = 2 >> > > > target.offset.storage.replication.factor = 2 >> > > > >> > > > source.status.storage.replication.factor = 2 >> > > > target.status.storage.replication.factor = 2 >> > > > >> > > > source->target.enabled = true >> > > > target->source.enabled = true >> > > > >> > > > # Mirror maker configurations. >> > > > offset-syncs.topic.replication.factor = 2 >> > > > heartbeats.topic.replication.factor = 2 >> > > > checkpoints.topic.replication.factor = 2 >> > > > >> > > > topics = .* >> > > > groups = .* >> > > > >> > > > tasks.max = 3 >> > > > replication.factor = 2 >> > > > refresh.topics.enabled = true >> > > > sync.topic.configs.enabled = true >> > > > refresh.topics.interval.seconds = 10 >> > > > >> > > > topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets >> > > > groups.blacklist = console-consumer-.*, connect-.*, __.* >> > > > >> > > > # Enable heartbeats and checkpoints. >> > > > source->target.emit.heartbeats.enabled = true >> > > > source->target.emit.checkpoints.enabled = true >> > > > >> > > > # customize as needed >> > > > replication.policy.separator = "" >> > > > source.cluster.alias: "" >> > > > target.cluster.alias: "" >> > > > # sync.topic.acls.enabled = false >> > > > # emit.heartbeats.interval.seconds = 5 >> > > > >> > > > Thanks >> > > > >> > > >> > >> >