Thanks Fabian,

I ended up using something like below.

public class GenericSerializer implements
KafkaSerializationSchema<GenericRecord> {

  private final SerializationSchema<GenericRecord> valueSerializer;
  private final String topic;

  public GenericSerializer(String topic, Schema schemaValue, String
schemaRegistryUrl) {
    this.valueSerializer =
        ConfluentRegistryAvroSerializationSchema.forGeneric(topic,
schemaValue, schemaRegistryUrl);
    this.topic = topic;
  }

  @Override
  public ProducerRecord<byte[], byte[]> serialize(GenericRecord
element, Long timestamp) {
    byte[] value = valueSerializer.serialize(element);
    return new ProducerRecord<>(topic, value);
  }
}

Then used a new object of GenericSerializer in the FlinkKafkaProducer

FlinkKafkaProducer<GenericRecord> producer =
    new FlinkKafkaProducer<>(topic, new GenericSerializer(topic,
schema, schemaRegistryUrl), kafkaConfig, Semantic.AT_LEAST_ONCE);

Thanks , Anil.


On Tue, Apr 21, 2020 at 3:34 AM Fabian Hueske <fhue...@gmail.com> wrote:

> Hi Anil,
>
> Here's a pointer to Flink's end-2-end test that's checking the integration
> with schema registry [1].
> It was recently updated so I hope it works the same way in Flink 1.9.
>
> Best,
> Fabian
>
> [1]
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-confluent-schema-registry/src/main/java/org/apache/flink/schema/registry/test/TestAvroConsumerConfluent.java
>
> Am Sa., 18. Apr. 2020 um 19:17 Uhr schrieb Anil K <sendto.ani...@gmail.com
> >:
>
>> Hi,
>>
>> What is the best way to use Confluent SchemaRegistry with
>> FlinkKafkaProducer?
>>
>> What I have right now is as follows.
>>
>> SerializationSchema<GenericRecord> serializationSchema =
>>     ConfluentRegistryAvroSerializationSchema.forGeneric(topic, schema, 
>> schemaRegistryUrl);
>>
>> FlinkKafkaProducer<GenericRecord> kafkaProducer =
>>     new FlinkKafkaProducer<>(topic, serializationSchema, kafkaConfig);
>> outputStream.addSink(producer);
>>
>> FlinkKafkaProducer with is SerializationSchema now depricated.
>>
>> I am using flink 1.9.
>>
>> How to use FlinkKafkaProducer with KafkaSerializationSchema with 
>> ConfluentSchemaRegsitry?
>>
>> Is there some reference/documentation i could use?
>>
>> Thanks , Anil.
>>
>>

Reply via email to