Thanks Jiabao and Yaroslav for your quick responses.

Regards,
Kirti Dhar

From: Yaroslav Tkachenko <yaros...@goldsky.com>
Sent: 01 February 2024 21:42
Cc: user@flink.apache.org
Subject: Re: RE: Flink Kafka Sink + Schema Registry + Message Headers

The schema registry support is provided in 
ConfluentRegistryAvroSerializationSchema class (flink-avro-confluent-registry 
package).

On Thu, Feb 1, 2024 at 8:04 AM Yaroslav Tkachenko 
<yaros...@goldsky.com<mailto:yaros...@goldsky.com>> wrote:
You can also implement a custom KafkaRecordSerializationSchema, which allows 
creating a ProducerRecord (see "serialize" method) - you can set message key, 
headers, etc. manually. It's supported in older versions.

On Thu, Feb 1, 2024 at 4:49 AM Jiabao Sun 
<jiabao....@xtransfer.cn<mailto:jiabao....@xtransfer.cn>> wrote:
Sorry, I didn't notice the version information.
This feature was completed in FLINK-31049[1] and will be released in version 
3.1.0 of Kafka.
The release process[2] is currently underway and will be completed soon.

However, version 3.1.0 does not promise support for Flink 1.16.
If you need to use this feature, you can consider cherry-picking this commit[3] 
to the v3.0 branch and package it for your own use.

Regarding Schema Registry, I am not familiar with this feature and I apologize 
for not being able to provide an answer.

Best,
Jiabao

[1] https://issues.apache.org/jira/browse/FLINK-31049
[2] 
https://lists.apache.org/list?d...@flink.apache.org:lte=1M:flink-connector-kafka%20v3.1.0
[3] 
https://github.com/apache/flink-connector-kafka/pull/18<https://protect2.fireeye.com/v1/url?k=31323334-501cfaf3-313273af-454445554331-1e24d52ba288559e&q=1&e=bfa69810-8bec-43fb-9f3e-34bf00ccc1c9&u=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fpull%2F18>


On 2024/02/01 11:58:29 Kirti Dhar Upadhyay K via user wrote:
> Hi Jiabao,
>
> Thanks for reply.
>
> Currently I am using Flink 1.16.1 and I am not able to find any 
> HeaderProvider setter method in class KafkaRecordSerializationSchemaBuilder.
> Although on github I found this support here: 
> https://github.com/apache/flink-connector-kafka/blob/v3.1/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaRecordSerializationSchemaBuilder.java<https://protect2.fireeye.com/v1/url?k=31323334-501cfaf3-313273af-454445554331-cabd0ad9c1eb5efc&q=1&e=bfa69810-8bec-43fb-9f3e-34bf00ccc1c9&u=https%3A%2F%2Fgithub.com%2Fapache%2Fflink-connector-kafka%2Fblob%2Fv3.1%2Fflink-connector-kafka%2Fsrc%2Fmain%2Fjava%2Forg%2Fapache%2Fflink%2Fconnector%2Fkafka%2Fsink%2FKafkaRecordSerializationSchemaBuilder.java>
> But this doesn't seem released yet. Can you please point me towards correct 
> Flink version?
>
> Also, any help on question 1 regarding Schema Registry?
>
> Regards,
> Kirti Dhar
>
> -----Original Message-----
> From: Jiabao Sun <ji...@xtransfer.cn<mailto:ji...@xtransfer.cn>>
> Sent: 01 February 2024 13:29
> To: user@flink.apache.org<mailto:user@flink.apache.org>
> Subject: RE: Flink Kafka Sink + Schema Registry + Message Headers
>
> Hi Kirti,
>
> Kafka Sink supports sending messages with headers.
> You should implement a HeaderProvider to extract headers from input element.
>
>
> KafkaSink<String> sink = KafkaSink.<String>builder()
>         .setBootstrapServers(brokers)
>         .setRecordSerializer(KafkaRecordSerializationSchema.builder()
>                 .setTopic("topic-name")
>                 .setValueSerializationSchema(new SimpleStringSchema())
>                 .setHeaderProvider(new HeaderProvider<String>() {
>                     @Override
>                     public Headers getHeaders(String input) {
>                         //TODO: implements it
>                         return null;
>                     }
>                 })
>                 .build()
>         )
>         .build();
>
> Best,
> Jiabao
>
>
> On 2024/02/01 07:46:38 Kirti Dhar Upadhyay K via user wrote:
> > Hi Mates,
> >
> > I have below queries regarding Flink Kafka Sink.
> >
> >
> >   1.  Does Kafka Sink support schema registry? If yes, is there any 
> > documentations to configure the same?
> >   2.  Does Kafka Sink support sending  messages (ProducerRecord) with 
> > headers?
> >
> >
> > Regards,
> > Kirti Dhar
> >
> >
>

Reply via email to