Thanks Fabian, I ended up using something like below.
public class GenericSerializer implements KafkaSerializationSchema<GenericRecord> { private final SerializationSchema<GenericRecord> valueSerializer; private final String topic; public GenericSerializer(String topic, Schema schemaValue, String schemaRegistryUrl) { this.valueSerializer = ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schemaValue, schemaRegistryUrl); this.topic = topic; } @Override public ProducerRecord<byte[], byte[]> serialize(GenericRecord element, Long timestamp) { byte[] value = valueSerializer.serialize(element); return new ProducerRecord<>(topic, value); } } Then used a new object of GenericSerializer in the FlinkKafkaProducer FlinkKafkaProducer<GenericRecord> producer = new FlinkKafkaProducer<>(topic, new GenericSerializer(topic, schema, schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE); Thanks , Anil. On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske <fhue...@gmail.com> wrote: > Hi Anil, > > Here's a pointer to Flink's end-2-end test that's checking the integration > with schema registry [1]. > It was recently updated so I hope it works the same way in Flink 1.9. > > Best, > Fabian > > [1] > https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java > > Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K <sendto.ani...@gmail.com > >: > >> Hi, >> >> What is the best way to use Confluent SchemaRegistry with >> FlinkKafkaProducer? >> >> What I have right now is as follows. >> >> SerializationSchema<GenericRecord> serializationSchema = >> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, >> schemaRegistryUrl); >> >> FlinkKafkaProducer<GenericRecord> kafkaProducer = >> new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig); >> outputStream.addSink(producer); >> >> FlinkKafkaProducer with is SerializationSchema now depricated. >> >> I am using flink 1.9. >> >> How to use FlinkKafkaProducer with KafkaSerializationSchema with >> ConfluentSchemaRegsitry? >> >> Is there some reference/documentation i could use? >> >> Thanks , Anil. >> >>