I’ve found that Kafka Connect never respects the “target.cluster.bootstrap.servers” configuration in the MirrorMaker2 task config. It always uses the Kafka Connect broker information instead. Running Kafka Connect on the source cluster causes an infinite loop of messages read from the source cluster, then written back to the same topic on the source cluster when using an IdentityReplicationPolicy. Running Kafka Connect on a third cluster causes the messages to get written to the Kafka Connect cluster, not the configured target cluster. Below are the scenarios I tested, and an example of the Kafka Connect task settings used. The only scenario that produced the correct result is running Kafka Connect on the target server.
Is this a hard requirement? Am I misunderstanding how the MM2 configs get used in Kafka Connect? We generally recommend that for MirrorMaker2 applications, users run Kafka Connect against the “target” Kafka cluster to help minimize network latency for the producers. However, in some scenarios it makes sense to run Kafka Connect against the “source” Kafka cluster, or even a third, unrelated Kafka cluster. This is because we don’t always have control over topic creation in the source/target clusters, and want MirrorMaker2 to only replicate data/offsets to / from existing topics. connect-distributed.properties: bootstrap.servers=source.broker.address:9092 group.id=demo-loop key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true offset.storage.topic=connect-offsets-demo-loop offset.storage.replication.factor=3 config.storage.topic=connect-configs-demo-loop config.storage.replication.factor=3 status.storage.topic=connect-status-demo-loop status.storage.replication.factor=3 offset.flush.interval.ms=10000 connector.client.config.override.policy=All Kafka Connect MM2 task config: { "name": "mm2-msc", "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy", "clusters": "msksource,mskdest", "source.cluster.alias": "msksource", "target.cluster.alias": "mskdest", "target.cluster.bootstrap.servers": "target.broker.address:9092", "source.cluster.bootstrap.servers": "source.broker.address:9092", "topics": "example-topic", "tasks.max": "1", "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "replication.factor": "3", "offset-syncs.topic.replication.factor": "3", "sync.topic.acls.interval.seconds": "600", "sync.topic.configs.interval.seconds": "600", "refresh.topics.interval.seconds": "300", "refresh.groups.interval.seconds": "20", "producer.enable.idempotence":"true", "consumer.group.id": "mm2-msc", "source.cluster.max.poll.records" : "50000", "source.cluster.receive.buffer.bytes" : "33554432", "source.cluster.send.buffer.bytes" : "33554432", "source.cluster.max.partition.fetch.bytes" : "33554432", "source.cluster.message.max.bytes" : "37755000", "source.cluster.compression.type" : "gzip", "source.cluster.max.request.size" : "26214400", "source.cluster.buffer.memory" : "524288000", "source.cluster.batch.size" : "524288", "target.cluster.max.poll.records" : "20000", "target.cluster.receive.buffer.bytes" : "33554432", "target.cluster.send.buffer.bytes" : "33554432", "target.cluster.max.partition.fetch.bytes" : "33554432", "target.cluster.message.max.bytes" : "37755000", "target.cluster.compression.type" : "gzip", "target.cluster.max.request.size" : "26214400", "target.cluster.buffer.memory" : "524288000", "target.cluster.batch.size" : "52428" } Test Kafka Connect Server Kafka Connect/ MM2 Version Offset Sync Location Source Cluster Version Target Cluster Version Result Control (BNSF config) Source 3.8.1 Source 3.5.1 2.7.0 Infinite loop 1 Source 3.8.1 Target 3.5.1 2.7.0 Infinite loop 2 Source 3.8.1 Source 3.5.1 3.5.1 Infinite loop 3 Source 3.9.0 Source 3.5.1 3.5.1 Infinite loop 4 Source 3.7.1 Source 3.5.1 3.5.1 Infinite loop 5 Source 3.6.0 Source 3.5.1 3.5.1 Infinite loop 6 Source 2.7.1 Source 3.5.1 3.5.1 Infinite loop 7 Source 3.8.1 Source, and tested with various other config changes (e.g. varying how target.cluster.bootstrap.servers setting is provided) 3.5.1 3.5.1 Infinite loop 8 A Third MSK Server 3.8.1 Source 3.5.1 3.5.1 Data replicated to the third server, not the target OR source 9 Target 3.8.1 Source 3.5.1 3.5.1 Correct behavior (source replicated to target)