Hi Jiabao, Thanks for reply.
Currently I am using Flink 1.16.1 and I am not able to find any HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder. Although on github I found this support here: https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java But this doesn't seem released yet. Can you please point me towards correct Flink version? Also, any help on question 1 regarding Schema Registry? Regards, Kirti Dhar -----Original Message----- From: Jiabao Sun <jiabao....@xtransfer.cn> Sent: 01 February 2024 13:29 To: user@flink.apache.org Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers Hi Kirti, Kafka Sink supports sending messages with headers. You should implement a HeaderProvider to extract headers from input element. KafkaSink<String> sink = KafkaSink.<String>builder() .setBootstrapServers(brokers) .setRecordSerializer(KafkaRecordSerializationSchema.builder() .setTopic("topic-name") .setValueSerializationSchema(new SimpleStringSchema()) .setHeaderProvider(new HeaderProvider<String>() { @Override public Headers getHeaders(String input) { //TODO: implements it return null; } }) .build() ) .build(); Best, Jiabao On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote: > Hi Mates, > > I have below queries regarding Flink Kafka Sink. > > > 1. Does Kafka Sink support schema registry? If yes, is there any > documentations to configure the same? > 2. Does Kafka Sink support sending messages (ProducerRecord) with headers? > > > Regards, > Kirti Dhar > >