Hey Anders, take a look at Connect's serdes and SMTs. MirrorMaker can be configured to use them.
Ryanne On Tue, Sep 14, 2021, 3:13 AM Anders Engström <epirea...@gmail.com> wrote: > Hi! > I'm trying to replicate a few topics using Mirror Maker 2 (2.8). > > Both the source and the target cluster use Schema Registry (Karapace, on > Aiven) > and the Confluent Avro serialization format when publishing messages. > > This means that messages replicated from source->target are not readable > with > the Avro deserializer in the target cluster - since the schema-id either > does > not exist, or points to the wrong definition, in the target schema > registry. > > I've tried to work around this issue by forcing the source consumer to read > messages as Schema Registry Avro (using the source schema-registry), and > configure the target producer to write messages using the target > schema-registry: > > { > "name": "mirror-maker", > "config": { > "connector.class": > "org.apache.kafka.connect.mirror.MirrorSourceConnector", > "replication.factor": "1", > "heartbeats.topic.replication.factor": "1", > "checkpoints.topic.replication.factor": "1", > "offset-syncs.topic.replication.factor": "1", > "source.cluster.alias": "SOURCE", > "target.cluster.alias": "DEST", > "source.cluster.bootstrap.servers": "localhost:9092", > "target.cluster.bootstrap.servers": "localhost:9093", > "topics": "dev\\.pub\\..*", > "emit.checkpoints.interval.seconds": "1", > "emit.heartbeats.interval.seconds": "1", > "tasks.max": "3", > "sync.topic.configs.enabled": "false", > "sync.topic.acls.enabled": "false", > "sync.group.offsets.enabled": "false", > "refresh.topics.interval.seconds": "10", > "source.consumer.key.deserializer": > "io.confluent.kafka.serializers.KafkaAvroDeserializer", > "source.consumer.value.deserializer": > "io.confluent.kafka.serializers.KafkaAvroDeserializer", > "source.consumer.schema.registry.url": "http://localhost:8081", > "target.producer.key.serializer": > "io.confluent.kafka.serializers.KafkaAvroSerializer", > "target.consumer.value.serializer": > "io.confluent.kafka.serializers.KafkaAvroSerializer", > "target.producer.schema.registry.url": "http://localhost:8082" } > } > > However, this does not seem to have any effect. It seems that the > de/serializer > for consumers/producers are not overridable in the mirror-maker connector. > It > always uses `ByteArray(De)Serializer`. I guess this is by design? > > So - I would really appreciate advice on how to handle this replication > scenario. I'm guessing it's a pretty common setup. > > Best regards /Anders >