Hey Anders, take a look at Connect's serdes and SMTs. MirrorMaker can be
configured to use them.

Ryanne

On Tue, Sep 14, 2021, 3:13 AM Anders Engström <epirea...@gmail.com> wrote:

> Hi!
> I'm trying to replicate a few topics using Mirror Maker 2 (2.8).
>
> Both the source and the target cluster use Schema Registry (Karapace, on
> Aiven)
> and the Confluent Avro serialization format when publishing messages.
>
> This means that messages replicated from source->target are not readable
> with
> the Avro deserializer in the target cluster - since the schema-id either
> does
> not exist, or points to the wrong definition, in the target schema
> registry.
>
> I've tried to work around this issue by forcing the source consumer to read
> messages as Schema Registry Avro (using the source schema-registry), and
> configure the target producer to write messages using the target
> schema-registry:
>
> {
> "name": "mirror-maker",
>   "config": {
>   "connector.class":
> "org.apache.kafka.connect.mirror.MirrorSourceConnector",
>   "replication.factor": "1",
>   "heartbeats.topic.replication.factor": "1",
>   "checkpoints.topic.replication.factor": "1",
>   "offset-syncs.topic.replication.factor": "1",
>   "source.cluster.alias": "SOURCE",
>   "target.cluster.alias": "DEST",
>   "source.cluster.bootstrap.servers": "localhost:9092",
>   "target.cluster.bootstrap.servers": "localhost:9093",
>   "topics": "dev\\.pub\\..*",
>   "emit.checkpoints.interval.seconds": "1",
>   "emit.heartbeats.interval.seconds": "1",
>   "tasks.max": "3",
>   "sync.topic.configs.enabled": "false",
>   "sync.topic.acls.enabled": "false",
>   "sync.group.offsets.enabled": "false",
>   "refresh.topics.interval.seconds": "10",
>   "source.consumer.key.deserializer":
> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
>   "source.consumer.value.deserializer":
> "io.confluent.kafka.serializers.KafkaAvroDeserializer",
>   "source.consumer.schema.registry.url": "http://localhost:8081";,
>   "target.producer.key.serializer":
> "io.confluent.kafka.serializers.KafkaAvroSerializer",
>   "target.consumer.value.serializer":
> "io.confluent.kafka.serializers.KafkaAvroSerializer",
>   "target.producer.schema.registry.url": "http://localhost:8082"; }
> }
>
> However, this does not seem to have any effect. It seems that the
> de/serializer
> for consumers/producers are not overridable in the mirror-maker connector.
> It
> always uses `ByteArray(De)Serializer`. I guess this is by design?
>
> So - I would really appreciate advice on how to handle this replication
> scenario. I'm guessing it's a pretty common setup.
>
> Best regards /Anders
>

Reply via email to