Hi Mehrtens,

The MirrorSourceConnector/Task relies on the Connect framework to
instantiate the producer used for mirroring.
The "target.cluster.bootstrap.servers" is indeed ineffective for changing
the framework client configuration, that should only affect clients
instantiated within the task (admin clients, offset-syncs producers, etc)

I think the configuration that you need to provide is
"producer.override.bootstrap.servers", in addition to the configuration you
already have.
Some of your producer tuning may also need to be specified via
"producer.override.*" in order to affect the mirrored records.

I don't understand your test cases exactly, are you observing some version
dependence here? I don't recall any recent changes which would invalidate
the above information.

Hope this helps,
Greg

On Thu, Feb 13, 2025 at 9:35 AM Mehrtens, Mazrim
<mmehr...@amazon.com.invalid> wrote:

> I’ve found that Kafka Connect never respects the
> “target.cluster.bootstrap.servers” configuration in the MirrorMaker2 task
> config. It always uses the Kafka Connect broker information instead.
> Running Kafka Connect on the source cluster causes an infinite loop of
> messages read from the source cluster, then written back to the same topic
> on the source cluster when using an IdentityReplicationPolicy. Running
> Kafka Connect on a third cluster causes the messages to get written to the
> Kafka Connect cluster, not the configured target cluster. Below are the
> scenarios I tested, and an example of the Kafka Connect task settings used.
> The only scenario that produced the correct result is running Kafka Connect
> on the target server.
>
> Is this a hard requirement? Am I misunderstanding how the MM2 configs get
> used in Kafka Connect? We generally recommend that for MirrorMaker2
> applications, users run Kafka Connect against the “target” Kafka cluster to
> help minimize network latency for the producers. However, in some scenarios
> it makes sense to run Kafka Connect against the “source” Kafka cluster, or
> even a third, unrelated Kafka cluster. This is because we don’t always have
> control over topic creation in the source/target clusters, and want
> MirrorMaker2 to only replicate data/offsets to / from existing topics.
>
>
> connect-distributed.properties:
>
> bootstrap.servers=source.broker.address:9092
> group.id=demo-loop
> key.converter=org.apache.kafka.connect.json.JsonConverter
> value.converter=org.apache.kafka.connect.json.JsonConverter
> key.converter.schemas.enable=true
> value.converter.schemas.enable=true
> offset.storage.topic=connect-offsets-demo-loop
> offset.storage.replication.factor=3
> config.storage.topic=connect-configs-demo-loop
> config.storage.replication.factor=3
> status.storage.topic=connect-status-demo-loop
> status.storage.replication.factor=3
> offset.flush.interval.ms=10000
> connector.client.config.override.policy=All
>
> Kafka Connect MM2 task config:
>
> {
>   "name": "mm2-msc",
>   "connector.class":
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
>
> "replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
>   "clusters": "msksource,mskdest",
>   "source.cluster.alias": "msksource",
>   "target.cluster.alias": "mskdest",
>   "target.cluster.bootstrap.servers": "target.broker.address:9092",
>   "source.cluster.bootstrap.servers": "source.broker.address:9092",
>   "topics": "example-topic",
>   "tasks.max": "1",
>   "key.converter": "
> org.apache.kafka.connect.converters.ByteArrayConverter",
>   "value.converter":
> "org.apache.kafka.connect.converters.ByteArrayConverter",
>   "replication.factor": "3",
>   "offset-syncs.topic.replication.factor": "3",
>   "sync.topic.acls.interval.seconds": "600",
>   "sync.topic.configs.interval.seconds": "600",
>   "refresh.topics.interval.seconds": "300",
>   "refresh.groups.interval.seconds": "20",
>   "producer.enable.idempotence":"true",
>   "consumer.group.id": "mm2-msc",
>   "source.cluster.max.poll.records" : "50000",
>   "source.cluster.receive.buffer.bytes" : "33554432",
>   "source.cluster.send.buffer.bytes" : "33554432",
>   "source.cluster.max.partition.fetch.bytes" : "33554432",
>   "source.cluster.message.max.bytes" : "37755000",
>   "source.cluster.compression.type" : "gzip",
>   "source.cluster.max.request.size" : "26214400",
>   "source.cluster.buffer.memory" : "524288000",
>   "source.cluster.batch.size" : "524288",
>   "target.cluster.max.poll.records" : "20000",
>   "target.cluster.receive.buffer.bytes" : "33554432",
>   "target.cluster.send.buffer.bytes" : "33554432",
>   "target.cluster.max.partition.fetch.bytes" : "33554432",
>   "target.cluster.message.max.bytes" : "37755000",
>   "target.cluster.compression.type" : "gzip",
>   "target.cluster.max.request.size" : "26214400",
>   "target.cluster.buffer.memory" : "524288000",
>   "target.cluster.batch.size" : "52428"
> }
>
>
>
>
> Test
>
> Kafka Connect Server
>
> Kafka Connect/ MM2 Version
>
> Offset Sync Location
>
> Source Cluster Version
>
> Target Cluster Version
>
> Result
>
> Control (BNSF config)
>
> Source
>
> 3.8.1
>
> Source
>
> 3.5.1
>
> 2.7.0
>
> Infinite loop
>
> 1
>
> Source
>
> 3.8.1
>
> Target
>
> 3.5.1
>
> 2.7.0
>
> Infinite loop
>
> 2
>
> Source
>
> 3.8.1
>
> Source
>
> 3.5.1
>
> 3.5.1
>
> Infinite loop
>
> 3
>
> Source
>
> 3.9.0
>
> Source
>
> 3.5.1
>
> 3.5.1
>
> Infinite loop
>
> 4
>
> Source
>
> 3.7.1
>
> Source
>
> 3.5.1
>
> 3.5.1
>
> Infinite loop
>
> 5
>
> Source
>
> 3.6.0
>
> Source
>
> 3.5.1
>
> 3.5.1
>
> Infinite loop
>
> 6
>
> Source
>
> 2.7.1
>
> Source
>
> 3.5.1
>
> 3.5.1
>
> Infinite loop
>
> 7
>
> Source
>
> 3.8.1
>
> Source, and tested with various other config changes (e.g. varying how
> target.cluster.bootstrap.servers setting is provided)
>
> 3.5.1
>
> 3.5.1
>
> Infinite loop
>
> 8
>
> A Third MSK Server
>
> 3.8.1
>
> Source
>
> 3.5.1
>
> 3.5.1
>
> Data replicated to the third server, not the target OR source
>
> 9
>
> Target
>
> 3.8.1
>
> Source
>
> 3.5.1
>
> 3.5.1
>
> Correct behavior (source replicated to target)
>
>
>
>

Reply via email to