Hi All,

I was able to set up MM2 successfully but I see the same message being sent
again and again. Also I see the following log message which I don't
understand why. The topics on source and destination have the same number
of partitions. Please give me some advice.

[2021-03-10 09:41:02,072] INFO Found 0 new topic-partitions on target.
Found 0 deleted topic-partitions on target. Found 699 topic-partitions
missing on source.
(org.apache.kafka.connect.mirror.MirrorSourceConnector:241)

Here is my mm2.properties
clusters = source, target
source.bootstrap.servers = <<BROKERS>>
target.bootstrap.servers = <<BROKERS>>

# Source and target clusters configurations.
source.config.storage.replication.factor = 2
target.config.storage.replication.factor = 2

source.offset.storage.replication.factor = 2
target.offset.storage.replication.factor = 2

source.status.storage.replication.factor = 2
target.status.storage.replication.factor = 2

source->target.enabled = true
target->source.enabled = true

# Mirror maker configurations.
offset-syncs.topic.replication.factor = 2
heartbeats.topic.replication.factor = 2
checkpoints.topic.replication.factor = 2

topics = .*
groups = .*
replication.policy.class=com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy
source.cluster.producer.enable.idempotence = true
target.cluster.producer.enable.idempotence = true

tasks.max = 3
replication.factor = 2
refresh.topics.enabled = true
sync.topic.configs.enabled = true
refresh.topics.interval.seconds = 10

topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
groups.blacklist = console-consumer-.*, connect-.*, __.*

# Enable heartbeats and checkpoints.
source->target.emit.heartbeats.enabled = true
source->target.emit.checkpoints.enabled = true
target->source.emit.heartbeats.enabled = true
target->source.emit.checkpoints.enabled = true

# customize as needed
sync.topic.acls.enabled = false
emit.heartbeats.interval.seconds = 10
emit.checkpoints.interval.seconds = 10

Thanks

On Sun, Mar 7, 2021 at 10:17 PM Navneeth Krishnan <reachnavnee...@gmail.com>
wrote:

> Thanks, I checked a consumer group using the below command on old and new
> clusters but the new cluster shows error consumer group doesn't exist. Am I
> missing something?
>
> As per the properties all groups should be included except the backlisted
> ones.
>
> kafka-consumer-groups.sh --bootstrap-server <<BROKER>> --describe --group
> test-group --offsets
>
> groups = .*
> groups.blacklist = console-consumer-.*, connect-.*, __.*
>
> Thanks
>
> On Sun, Mar 7, 2021 at 9:54 PM Ryanne Dolan <ryannedo...@gmail.com> wrote:
>
>> You can see the replicated consumer group offsets using the
>> kafka-consumer-groups.sh tool. Make sure the consumer group is in the
>> groups allowlist.
>>
>> Ryanne
>>
>> On Sun, Mar 7, 2021, 11:09 PM Navneeth Krishnan <reachnavnee...@gmail.com
>> >
>> wrote:
>>
>> > Hi Ryanne,
>> >
>> > I commented out the alias config and set target to source as false. Now
>> I
>> > don't see the error anymore and everything looks good. Thanks a lot for
>> the
>> > feedback. One more question, how can I check if the consumer group
>> offsets
>> > are replicated. If I switch over an application to this new cluster, the
>> > consumer should just read from where it left off right.
>> >
>> > Also I will try the custom replication policy and let you know.
>> >
>> > Here is the new config.
>> >
>> > # Kafka datacenters.
>> > clusters = source, target
>> > source.bootstrap.servers = loc-kafka01.eu-prod.dnaspaces.io:9092,
>> > loc-kafka02.eu-prod.dnaspaces.io:9092
>> > target.bootstrap.servers = loc-msk-kafka-1.eu-prod.dnaspaces.io:9092,
>> > loc-msk-kafka-2.eu-prod.dnaspaces.io:9092,
>> > loc-msk-kafka-3.eu-prod.dnaspaces.io:9092
>> >
>> > # Source and target clusters configurations.
>> > source.config.storage.replication.factor = 2
>> > target.config.storage.replication.factor = 2
>> >
>> > source.offset.storage.replication.factor = 2
>> > target.offset.storage.replication.factor = 2
>> >
>> > source.status.storage.replication.factor = 2
>> > target.status.storage.replication.factor = 2
>> >
>> > source->target.enabled = true
>> > target->source.enabled = false
>> >
>> > # Mirror maker configurations.
>> > offset-syncs.topic.replication.factor = 2
>> > heartbeats.topic.replication.factor = 2
>> > checkpoints.topic.replication.factor = 2
>> >
>> > topics = .*
>> > groups = .*
>> >
>> > tasks.max = 3
>> > replication.factor = 2
>> > refresh.topics.enabled = true
>> > sync.topic.configs.enabled = true
>> > refresh.topics.interval.seconds = 10
>> >
>> > topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
>> > groups.blacklist = console-consumer-.*, connect-.*, __.*
>> >
>> > # Enable heartbeats and checkpoints.
>> > source->target.emit.heartbeats.enabled = true
>> > source->target.emit.checkpoints.enabled = true
>> >
>> > # customize as needed
>> > # replication.policy.separator = ""
>> > # source.cluster.alias: ""
>> > # target.cluster.alias: ""
>> > # sync.topic.acls.enabled = false
>> > # emit.heartbeats.interval.seconds = 5
>> >
>> > Thanks
>> >
>> > On Sun, Mar 7, 2021 at 2:17 PM Ryanne Dolan <ryannedo...@gmail.com>
>> wrote:
>> >
>> > > Navneeth, it looks like you are trying to override the cluster
>> aliases to
>> > > the empty string. If you look closely, these properties are defined
>> using
>> > > incorrect syntax. I'm not sure how the properties would be parsed, but
>> > you
>> > > should fix that to rule it out as a source of confusion.
>> > >
>> > > Then, be aware that overriding cluster aliases like that may not work
>> as
>> > > you intend. Consider using a custom ReplicationPolicy if you are
>> trying
>> > to
>> > > do "identity" replication. There are several implementations floating
>> > > around.
>> > >
>> > > Ryanne
>> > >
>> > > On Sun, Mar 7, 2021, 1:44 AM Navneeth Krishnan <
>> reachnavnee...@gmail.com
>> > >
>> > > wrote:
>> > >
>> > > > Hi All,
>> > > >
>> > > > I'm trying to use mirror maker 2 to replicate data to our new AWS
>> MSK
>> > > kafka
>> > > > cluster and I have been running into so many issues and I couldn't
>> find
>> > > > proper documentation. Need some help and it's very urgent. Thanks
>> > > >
>> > > > Also I don't see any of my topics created.
>> > > >
>> > > > Note: There are no consumers on the destination brokers
>> > > >
>> > > > *Run Command*
>> > > > bin/connect-mirror-maker.sh config/mm2.properties
>> > > >
>> > > > *Kafka Versions*
>> > > > Source: 2.3
>> > > > MSK: 2.6.1
>> > > > Mirror Maker Node: 2.7 (Using 2.7 to replicate group offsets)
>> > > >
>> > > > *Exception*
>> > > > [2021-03-07 07:32:15,145] ERROR Scheduler for
>> MirrorCheckpointConnector
>> > > > caught exception in scheduled task: creating internal topics
>> > > > (org.apache.kafka.connect.mirror.Scheduler:102)
>> > > > org.apache.kafka.connect.errors.ConnectException: Error while
>> > attempting
>> > > to
>> > > > create/find topic(s) '"".checkpoints.internal'
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.kafka.connect.util.TopicAdmin.createTopics(TopicAdmin.java:321)
>> > > >         at
>> > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.connect.mirror.MirrorUtils.createCompactedTopic(MirrorUtils.java:109)
>> > > >         at
>> > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.connect.mirror.MirrorUtils.createSinglePartitionCompactedTopic(MirrorUtils.java:114)
>> > > >         at
>> > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.connect.mirror.MirrorCheckpointConnector.createInternalTopics(MirrorCheckpointConnector.java:163)
>> > > >         at
>> > > org.apache.kafka.connect.mirror.Scheduler.run(Scheduler.java:93)
>> > > >         at
>> > > >
>> > >
>> >
>> org.apache.kafka.connect.mirror.Scheduler.executeThread(Scheduler.java:112)
>> > > >         at
>> > > >
>> > > >
>> > >
>> >
>> org.apache.kafka.connect.mirror.Scheduler.lambda$execute$2(Scheduler.java:63)
>> > > >         at
>> > > >
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>> > > >
>> > > > *mm2.properties*
>> > > > # Kafka brokers.
>> > > > clusters = source, target
>> > > > source.bootstrap.servers = <<BROKERS>>
>> > > > target.bootstrap.servers = <<MSK_BROKERS>>
>> > > >
>> > > > # Source and target clusters configurations.
>> > > > source.config.storage.replication.factor = 2
>> > > > target.config.storage.replication.factor = 2
>> > > >
>> > > > source.offset.storage.replication.factor = 2
>> > > > target.offset.storage.replication.factor = 2
>> > > >
>> > > > source.status.storage.replication.factor = 2
>> > > > target.status.storage.replication.factor = 2
>> > > >
>> > > > source->target.enabled = true
>> > > > target->source.enabled = true
>> > > >
>> > > > # Mirror maker configurations.
>> > > > offset-syncs.topic.replication.factor = 2
>> > > > heartbeats.topic.replication.factor = 2
>> > > > checkpoints.topic.replication.factor = 2
>> > > >
>> > > > topics = .*
>> > > > groups = .*
>> > > >
>> > > > tasks.max = 3
>> > > > replication.factor = 2
>> > > > refresh.topics.enabled = true
>> > > > sync.topic.configs.enabled = true
>> > > > refresh.topics.interval.seconds = 10
>> > > >
>> > > > topics.blacklist = .*[\-\.]internal, .*\.replica, __consumer_offsets
>> > > > groups.blacklist = console-consumer-.*, connect-.*, __.*
>> > > >
>> > > > # Enable heartbeats and checkpoints.
>> > > > source->target.emit.heartbeats.enabled = true
>> > > > source->target.emit.checkpoints.enabled = true
>> > > >
>> > > > # customize as needed
>> > > > replication.policy.separator = ""
>> > > > source.cluster.alias: ""
>> > > > target.cluster.alias: ""
>> > > > # sync.topic.acls.enabled = false
>> > > > # emit.heartbeats.interval.seconds = 5
>> > > >
>> > > > Thanks
>> > > >
>> > >
>> >
>>
>

Reply via email to