Also forgot to attach the information regarding how did I convert the avro 
objects to bytes in the approach that I took with deprecated kafka producer.

    protected byte[] getValueBytes(Value value)
    {
        DatumWriter<Value> valWriter = new SpecificDatumWriter<Value>(
            Value.getSchema());
        ByteArrayOutputStream valOut = new ByteArrayOutputStream();
        BinaryEncoder valEncoder = EncoderFactory.get().binaryEncoder(valOut, 
null);

        try {
            valWriter.write(value, valEncoder);

            // TODO Auto-generated catch block

            valEncoder.flush();

            // TODO Auto-generated catch block

            valOut.close();

            // TODO Auto-generated catch block

        } catch (Exception e) {

        }

        return valOut.toByteArray();
    }

    protected byte[] getKeyBytes(Key key) {

        DatumWriter<Key> keyWriter = new SpecificDatumWriter<Key>(
            key.getSchema());
        ByteArrayOutputStream keyOut = new ByteArrayOutputStream();
        BinaryEncoder keyEncoder = EncoderFactory.get().binaryEncoder(keyOut, 
null);

        try {
            keyWriter.write(key, keyEncoder);

            // TODO Auto-generated catch block

            keyEncoder.flush();

            // TODO Auto-generated catch block

            keyOut.close();

            // TODO Auto-generated catch block

        } catch (Exception e) {

        }

        return keyOut.toByteArray();
    }



From: Ghiya, Jay (GE Healthcare)
Sent: 18 May 2022 21:51
To: user@flink.apache.org
Cc: d...@flink.apache.org; Pandiaraj, Satheesh kumar (GE Healthcare) 
<satheeshkumar.pandia...@ge.com>; Kumar, Vipin (GE Healthcare) 
<vipin.s.ku...@ge.com>
Subject: Kafka Sink Key and Value Avro Schema Usage Issues

Hi Team,

This is regarding Flink Kafka Sink. We have a usecase where we have headers and 
both key and value as Avro Schema.

Below is the expectation in terms of intuitiveness for avro kafka key and value:

KafkaSink.<ProducerRecord<Key,Value>>builder()
                        .setBootstrapServers(cloudkafkaBrokerAPI)
                        .setRecordSerializer(
                                KafkaRecordSerializationSchema.builder()
                                .setKeySerializationSchema(
                                    ConfluentRegistryAvroSerializationSchema
                                .forSpecific(
                                    key.class,
                                        "Key",
                                        cloudSchemaRegistryURL))
                                .setValueSerializationSchema(
                                                
ConfluentRegistryAvroSerializationSchema
                                                        .forSpecific(
                                                            Value.class,"val", 
cloudSchemaRegistryURL))
                                        .setTopic(outputTopic)
                                        .build())
                        .build();

What I understood currently it does not accept key and value both as avro 
schemas as part of kafka sink. It only accepts sink.

First I tried to use the deprecated Flink Kafka Producer by implementing 
KafkaSerializationSchema and supplying properties of avro ser and der via :
cloudKafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());
cloudKafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName());


The problem here is I am able to run this example but the schema that gets 
stored in confluent schema registry is:
{
    "subject": "ddp_out-key",
    "version": 1,
    "id": 1,
    "schema": "\"bytes\""
}

Instead of full schema it has just recognized the whole as bytes. So I am 
looking for a solution without kafka sink to make it work as of now and is 
there feature request part of roadmap for adding support
To kafka sink itself for producer record as that would be ideal. The previous 
operator can send the producer record with key,val and headers and then it can 
be forwarded ahead.

-Jay
GEHC


Reply via email to