I’ve found that Kafka Connect never respects the 
“target.cluster.bootstrap.servers” configuration in the MirrorMaker2 task 
config. It always uses the Kafka Connect broker information instead. Running 
Kafka Connect on the source cluster causes an infinite loop of messages read 
from the source cluster, then written back to the same topic on the source 
cluster when using an IdentityReplicationPolicy. Running Kafka Connect on a 
third cluster causes the messages to get written to the Kafka Connect cluster, 
not the configured target cluster. Below are the scenarios I tested, and an 
example of the Kafka Connect task settings used. The only scenario that 
produced the correct result is running Kafka Connect on the target server.

Is this a hard requirement? Am I misunderstanding how the MM2 configs get used 
in Kafka Connect? We generally recommend that for MirrorMaker2 applications, 
users run Kafka Connect against the “target” Kafka cluster to help minimize 
network latency for the producers. However, in some scenarios it makes sense to 
run Kafka Connect against the “source” Kafka cluster, or even a third, 
unrelated Kafka cluster. This is because we don’t always have control over 
topic creation in the source/target clusters, and want MirrorMaker2 to only 
replicate data/offsets to / from existing topics.


connect-distributed.properties:

bootstrap.servers=source.broker.address:9092
group.id=demo-loop
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.topic=connect-offsets-demo-loop
offset.storage.replication.factor=3
config.storage.topic=connect-configs-demo-loop
config.storage.replication.factor=3
status.storage.topic=connect-status-demo-loop
status.storage.replication.factor=3
offset.flush.interval.ms=10000
connector.client.config.override.policy=All

Kafka Connect MM2 task config:

{
  "name": "mm2-msc",
  "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
  
"replication.policy.class":"com.amazonaws.kafka.samples.CustomMM2ReplicationPolicy",
  "clusters": "msksource,mskdest",
  "source.cluster.alias": "msksource",
  "target.cluster.alias": "mskdest",
  "target.cluster.bootstrap.servers": "target.broker.address:9092",
  "source.cluster.bootstrap.servers": "source.broker.address:9092",
  "topics": "example-topic",
  "tasks.max": "1",
  "key.converter": " org.apache.kafka.connect.converters.ByteArrayConverter",
  "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
  "replication.factor": "3",
  "offset-syncs.topic.replication.factor": "3",
  "sync.topic.acls.interval.seconds": "600",
  "sync.topic.configs.interval.seconds": "600",
  "refresh.topics.interval.seconds": "300",
  "refresh.groups.interval.seconds": "20",
  "producer.enable.idempotence":"true",
  "consumer.group.id": "mm2-msc",
  "source.cluster.max.poll.records" : "50000",
  "source.cluster.receive.buffer.bytes" : "33554432",
  "source.cluster.send.buffer.bytes" : "33554432",
  "source.cluster.max.partition.fetch.bytes" : "33554432",
  "source.cluster.message.max.bytes" : "37755000",
  "source.cluster.compression.type" : "gzip",
  "source.cluster.max.request.size" : "26214400",
  "source.cluster.buffer.memory" : "524288000",
  "source.cluster.batch.size" : "524288",
  "target.cluster.max.poll.records" : "20000",
  "target.cluster.receive.buffer.bytes" : "33554432",
  "target.cluster.send.buffer.bytes" : "33554432",
  "target.cluster.max.partition.fetch.bytes" : "33554432",
  "target.cluster.message.max.bytes" : "37755000",
  "target.cluster.compression.type" : "gzip",
  "target.cluster.max.request.size" : "26214400",
  "target.cluster.buffer.memory" : "524288000",
  "target.cluster.batch.size" : "52428"
}




Test

Kafka Connect Server

Kafka Connect/ MM2 Version

Offset Sync Location

Source Cluster Version

Target Cluster Version

Result

Control (BNSF config)

Source

3.8.1

Source

3.5.1

2.7.0

Infinite loop

1

Source

3.8.1

Target

3.5.1

2.7.0

Infinite loop

2

Source

3.8.1

Source

3.5.1

3.5.1

Infinite loop

3

Source

3.9.0

Source

3.5.1

3.5.1

Infinite loop

4

Source

3.7.1

Source

3.5.1

3.5.1

Infinite loop

5

Source

3.6.0

Source

3.5.1

3.5.1

Infinite loop

6

Source

2.7.1

Source

3.5.1

3.5.1

Infinite loop

7

Source

3.8.1

Source, and tested with various other config changes (e.g. varying how 
target.cluster.bootstrap.servers setting is provided)

3.5.1

3.5.1

Infinite loop

8

A Third MSK Server

3.8.1

Source

3.5.1

3.5.1

Data replicated to the third server, not the target OR source

9

Target

3.8.1

Source

3.5.1

3.5.1

Correct behavior (source replicated to target)



Reply via email to