Hi guys,

I’m working on a solution where I ingest Kafka Records and I need to sink them 
to another topic using Avro and Schema Registry.
The problem I’m facing, is that I can’t find a suitable configuration that 
actually works for me.

I’m going to explain.


  1.  I have a KafkaSource that consumes basically the initial stream of data.
  2.  I have an Operator that maps the kafka records to Avro Objects (Java 
POJOs generated using mvn avro plugin, based on .avsc files)
  3.  I register the schemas in Schema Registry using the mvn 
schema-registry:register plugin/goal (registering the schema type as AVRO.
  4.  I have a FlinkKafkaProducer<GeneratedAvroObject> where I provide a 
serialization schema of type ConfluentRegistrySerializationSchema.

My Kafka Properties for the Producer:

kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka:9092");
kafkaProps.put(
    KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, 
"http://schemaregistry:38081";);
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
KafkaAvroSerializer.class);
kafkaProps.put("auto.register.schemas", false);
kafkaProps.put("use.latest.version", true);

As I learned from other tutorials/articles, I need to basically use 
KafkaAvroSerializer.class over ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG.
This will bring me eventually in the place from KafkaAvroSerializer, where 
based on how the record actually looks, it will get me the schema, it will go 
to the schema registry and bring the schema for the needed record, and 
serialize it before it gets sent.
The problem I’m having, is that, in the FlinkKafkaProducer class, in invoke() 
method, the keyedSchema is null in my case, but kafkaSchema is not null, and it 
basically does a ‘pre-serialization’ that is transforming my Record into a 
byte[]. This has an effect when it ends up in the KafkaAvroSerializer, as the 
Record is already a byte[] and it basically returns back a schema of type 
“bytes” instead of returning the schema I have for that SpecificRecord. And 
when it brings the propper schema from the schema registry, it basically fails 
for not being compatible. Schema {} is not compatible with schema of type 
“bytes”.

For more context, this is how my Processor looks at this moment.


DataStream<ObjectNode> kafkaRecords =
    env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kafka");

SingleOutputStreamOperator<AvroObject> producedRecords =
    kafkaRecords
        .map(
            value -> {
              String kafkaKey = value.get(KEY).asText();
              String kafkaRecordJson = 
MAPPER.writeValueAsString(value.get(VALUE));
              return Converter.convert(kafkaKey, kafkaRecordJson);
            })
        .returns(TypeInformation.of(AvroObject.class));

AvroSerializationSchema<AvroObject > schema =
        ConfluentRegistryAvroSerializationSchema.forSpecific(AvroObject.class);

FlinkKafkaProducer< AvroObject > kafkaProducer =
        new FlinkKafkaProducer<>("sink_topic", schema, kafkaProps);

producedRecords.addSink(kafkaProducer);

env.execute();

Exception:
Caused by: java.io.IOException: Incompatible schema { avro schema here} }with 
refs [] of type AVRO for schema "bytes".

PS: If I remove the KafkaAvroSerializer from the producer properties, it works 
fine, but when I consume the messages, the first message gets consumed but the 
values from the record are default ones. And the second message throws 
exception EOFExcetion – could not debug yet exactly the cause. It seems like, 
when I don’t have the KafkaAvroSerializer, is not actually going to use the 
schema registry to get the schema back and use that as a serializer, so I 
definitely need to have that there, but I still think I need to do some more 
config changes maybe in other places, because it’s definitely not working as 
expected.

Thanks a lot!
I would appreciate at least some points where I could investigate more and if 
there is someone else that has a similar implementation, maybe some tips and 
tricks.

Regards,
Dan Serb


Reply via email to