You can also implement a custom KafkaRecordSerializationSchema, which allows creating a ProducerRecord (see "serialize" method) - you can set message key, headers, etc. manually. It's supported in older versions.
On Thu, Feb 1, 2024 at 4:49 AM Jiabao Sun <jiabao....@xtransfer.cn> wrote: > Sorry, I didn't notice the version information. > This feature was completed in FLINK-31049[1] and will be released in > version 3.1.0 of Kafka. > The release process[2] is currently underway and will be completed soon. > > However, version 3.1.0 does not promise support for Flink 1.16. > If you need to use this feature, you can consider cherry-picking this > commit[3] to the v3.0 branch and package it for your own use. > > Regarding Schema Registry, I am not familiar with this feature and I > apologize for not being able to provide an answer. > > Best, > Jiabao > > [1] https://issues.apache.org/jira/browse/FLINK-31049 > [2] > https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0 > [3] https://github.com/apache/flink-connector-kafka/pull/18 > > > On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote: > > Hi Jiabao, > > > > Thanks for reply. > > > > Currently I am using Flink 1.16.1 and I am not able to find any > HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder. > > Although on github I found this support here: > https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java > > But this doesn't seem released yet. Can you please point me towards > correct Flink version? > > > > Also, any help on question 1 regarding Schema Registry? > > > > Regards, > > Kirti Dhar > > > > -----Original Message----- > > From: Jiabao Sun <ji...@xtransfer.cn> > > Sent: 01 February 2024 13:29 > > To: user@flink.apache.org > > Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers > > > > Hi Kirti, > > > > Kafka Sink supports sending messages with headers. > > You should implement a HeaderProvider to extract headers from input > element. > > > > > > KafkaSink<String> sink = KafkaSink.<String>builder() > > .setBootstrapServers(brokers) > > .setRecordSerializer(KafkaRecordSerializationSchema.builder() > > .setTopic("topic-name") > > .setValueSerializationSchema(new SimpleStringSchema()) > > .setHeaderProvider(new HeaderProvider<String>() { > > @Override > > public Headers getHeaders(String input) { > > //TODO: implements it > > return null; > > } > > }) > > .build() > > ) > > .build(); > > > > Best, > > Jiabao > > > > > > On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote: > > > Hi Mates, > > > > > > I have below queries regarding Flink Kafka Sink. > > > > > > > > > 1. Does Kafka Sink support schema registry? If yes, is there any > documentations to configure the same? > > > 2. Does Kafka Sink support sending messages (ProducerRecord) with > headers? > > > > > > > > > Regards, > > > Kirti Dhar > > > > > > > > >