Hi all,

I'm currently working on a few pipelines sinking to Kafka. The downstream
consumers of the sink topics expect some Kafka headers to be set. However
the default org.apache.flink.connector.kafka.sink.KafkaSink does
not support adding Kafka record headers.

I tracked the code path down to
org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper
where the RecordProducer is created.
It is relatively simple to add support for record headers by adding a
"HeaderProducer" next to the key and value serializers and using the
appropriate RecordProducer constructor.

For the benefit of my own projects, I have implemented this header support
and would be eager to share my implementation as a proposal if there's a
consensus this would indeed be a valuable addition.

Please let me know what you think.
Thanks,
- Alex

Reply via email to