The schema registry support is provided in ConfluentRegistryAvroSerializationSchema class (flink-avro-confluent-registry package).
On Thu, Feb 1, 2024 at 8:04 AM Yaroslav Tkachenko <yaros...@goldsky.com> wrote: > You can also implement a custom KafkaRecordSerializationSchema, which > allows creating a ProducerRecord (see "serialize" method) - you can set > message key, headers, etc. manually. It's supported in older versions. > > On Thu, Feb 1, 2024 at 4:49 AM Jiabao Sun <jiabao....@xtransfer.cn> wrote: > >> Sorry, I didn't notice the version information. >> This feature was completed in FLINK-31049[1] and will be released in >> version 3.1.0 of Kafka. >> The release process[2] is currently underway and will be completed soon. >> >> However, version 3.1.0 does not promise support for Flink 1.16. >> If you need to use this feature, you can consider cherry-picking this >> commit[3] to the v3.0 branch and package it for your own use. >> >> Regarding Schema Registry, I am not familiar with this feature and I >> apologize for not being able to provide an answer. >> >> Best, >> Jiabao >> >> [1] https://issues.apache.org/jira/browse/FLINK-31049 >> [2] >> https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0 >> [3] https://github.com/apache/flink-connector-kafka/pull/18 >> >> >> On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote: >> > Hi Jiabao, >> > >> > Thanks for reply. >> > >> > Currently I am using Flink 1.16.1 and I am not able to find any >> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder. >> > Although on github I found this support here: >> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java >> > But this doesn't seem released yet. Can you please point me towards >> correct Flink version? >> > >> > Also, any help on question 1 regarding Schema Registry? >> > >> > Regards, >> > Kirti Dhar >> > >> > -----Original Message----- >> > From: Jiabao Sun <ji...@xtransfer.cn> >> > Sent: 01 February 2024 13:29 >> > To: user@flink.apache.org >> > Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers >> > >> > Hi Kirti, >> > >> > Kafka Sink supports sending messages with headers. >> > You should implement a HeaderProvider to extract headers from input >> element. >> > >> > >> > KafkaSink<String> sink = KafkaSink.<String>builder() >> > .setBootstrapServers(brokers) >> > .setRecordSerializer(KafkaRecordSerializationSchema.builder() >> > .setTopic("topic-name") >> > .setValueSerializationSchema(new SimpleStringSchema()) >> > .setHeaderProvider(new HeaderProvider<String>() { >> > @Override >> > public Headers getHeaders(String input) { >> > //TODO: implements it >> > return null; >> > } >> > }) >> > .build() >> > ) >> > .build(); >> > >> > Best, >> > Jiabao >> > >> > >> > On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote: >> > > Hi Mates, >> > > >> > > I have below queries regarding Flink Kafka Sink. >> > > >> > > >> > > 1. Does Kafka Sink support schema registry? If yes, is there any >> documentations to configure the same? >> > > 2. Does Kafka Sink support sending messages (ProducerRecord) >> with headers? >> > > >> > > >> > > Regards, >> > > Kirti Dhar >> > > >> > > >> > >> >