Thanks for sharing your solution Anil! Cheers, Fabian
Am Di., 21. Apr. 2020 um 09:35 Uhr schrieb Anil K <sendto.ani...@gmail.com>: > Thanks Fabian, > > I ended up using something like below. > > public class GenericSerializer implements > KafkaSerializationSchema<GenericRecord> { > > private final SerializationSchema<GenericRecord> valueSerializer; > private final String topic; > > public GenericSerializer(String topic, Schema schemaValue, String > schemaRegistryUrl) { > this.valueSerializer = > ConfluentRegistryAvroSerializationSchema.forGeneric(topic, > schemaValue, schemaRegistryUrl); > this.topic = topic; > } > > @Override > public ProducerRecord<byte[], byte[]> serialize(GenericRecord element, Long > timestamp) { > byte[] value = valueSerializer.serialize(element); > return new ProducerRecord<>(topic, value); > } > } > > Then used a new object of GenericSerializer in the FlinkKafkaProducer > > FlinkKafkaProducer<GenericRecord> producer = > new FlinkKafkaProducer<>(topic, new GenericSerializer(topic, schema, > schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE); > > Thanks , Anil. > > > On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi Anil, >> >> Here's a pointer to Flink's end-2-end test that's checking the >> integration with schema registry [1]. >> It was recently updated so I hope it works the same way in Flink 1.9. >> >> Best, >> Fabian >> >> [1] >> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java >> >> Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K < >> sendto.ani...@gmail.com>: >> >>> Hi, >>> >>> What is the best way to use Confluent SchemaRegistry with >>> FlinkKafkaProducer? >>> >>> What I have right now is as follows. >>> >>> SerializationSchema<GenericRecord> serializationSchema = >>> ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, >>> schemaRegistryUrl); >>> >>> FlinkKafkaProducer<GenericRecord> kafkaProducer = >>> new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig); >>> outputStream.addSink(producer); >>> >>> FlinkKafkaProducer with is SerializationSchema now depricated. >>> >>> I am using flink 1.9. >>> >>> How to use FlinkKafkaProducer with KafkaSerializationSchema with >>> ConfluentSchemaRegsitry? >>> >>> Is there some reference/documentation i could use? >>> >>> Thanks , Anil. >>> >>>