Also forgot to attach the information regarding how did I convert the avro objects to bytes in the approach that I took with deprecated kafka producer.
protected byte[] getValueBytes(Value value) { DatumWriter<Value> valWriter = new SpecificDatumWriter<Value>( Value.getSchema()); ByteArrayOutputStream valOut = new ByteArrayOutputStream(); BinaryEncoder valEncoder = EncoderFactory.get().binaryEncoder(valOut, null); try { valWriter.write(value, valEncoder); // TODO Auto-generated catch block valEncoder.flush(); // TODO Auto-generated catch block valOut.close(); // TODO Auto-generated catch block } catch (Exception e) { } return valOut.toByteArray(); } protected byte[] getKeyBytes(Key key) { DatumWriter<Key> keyWriter = new SpecificDatumWriter<Key>( key.getSchema()); ByteArrayOutputStream keyOut = new ByteArrayOutputStream(); BinaryEncoder keyEncoder = EncoderFactory.get().binaryEncoder(keyOut, null); try { keyWriter.write(key, keyEncoder); // TODO Auto-generated catch block keyEncoder.flush(); // TODO Auto-generated catch block keyOut.close(); // TODO Auto-generated catch block } catch (Exception e) { } return keyOut.toByteArray(); } From: Ghiya, Jay (GE Healthcare) Sent: 18 May 2022 21:51 To: user@flink.apache.org Cc: d...@flink.apache.org; Pandiaraj, Satheesh kumar (GE Healthcare) <satheeshkumar.pandia...@ge.com>; Kumar, Vipin (GE Healthcare) <vipin.s.ku...@ge.com> Subject: Kafka Sink Key and Value Avro Schema Usage Issues Hi Team, This is regarding Flink Kafka Sink. We have a usecase where we have headers and both key and value as Avro Schema. Below is the expectation in terms of intuitiveness for avro kafka key and value: KafkaSink.<ProducerRecord<Key,Value>>builder() .setBootstrapServers(cloudkafkaBrokerAPI) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setKeySerializationSchema( ConfluentRegistryAvroSerializationSchema .forSpecific( key.class, "Key", cloudSchemaRegistryURL)) .setValueSerializationSchema( ConfluentRegistryAvroSerializationSchema .forSpecific( Value.class,"val", cloudSchemaRegistryURL)) .setTopic(outputTopic) .build()) .build(); What I understood currently it does not accept key and value both as avro schemas as part of kafka sink. It only accepts sink. First I tried to use the deprecated Flink Kafka Producer by implementing KafkaSerializationSchema and supplying properties of avro ser and der via : cloudKafkaProperties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName()); cloudKafkaProperties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class.getName()); The problem here is I am able to run this example but the schema that gets stored in confluent schema registry is: { "subject": "ddp_out-key", "version": 1, "id": 1, "schema": "\"bytes\"" } Instead of full schema it has just recognized the whole as bytes. So I am looking for a solution without kafka sink to make it work as of now and is there feature request part of roadmap for adding support To kafka sink itself for producer record as that would be ideal. The previous operator can send the producer record with key,val and headers and then it can be forwarded ahead. -Jay GEHC