[ https://issues.apache.org/jira/browse/FLINK-31049?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser closed FLINK-31049. ---------------------------------- Resolution: Fixed > Add support for Kafka record headers to KafkaSink > ------------------------------------------------- > > Key: FLINK-31049 > URL: https://issues.apache.org/jira/browse/FLINK-31049 > Project: Flink > Issue Type: Improvement > Components: Connectors / Kafka > Reporter: Alex Gout > Assignee: Alex Gout > Priority: Minor > Labels: KafkaSink, pull-request-available, stale-assigned > Fix For: kafka-3.1.0 > > Original Estimate: 48h > Remaining Estimate: 48h > > The default org.apache.flink.connector.kafka.sink.KafkaSink does not support > adding Kafka record headers. In some implementations, downstream consumers > might rely on Kafka record headers being set. > > A way to add Headers would be to create a custom > KafkaRecordSerializationSchema and inject that into the KafkaSink. > However, I'm assuming the KafkaRecordSerializationSchemaBuilder was added for > convenience and allows a more usable approach of creating a KafkaSink without > having to deal with details like the RecordProducer directly. This builder > does not support adding record headers. > This is where I think it should be added. > The code responsible for creating the Kafka record involves > org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchemaWrapper > where the RecordProducer is created. > It is relatively simple to add support for record headers by adding a > "HeaderProducer" to the KafkaRecordSerializationSchemaBuilder next to the key > and value serializers and using the appropriate RecordProducer constructor. > > The issue was discussed > [here|https://lists.apache.org/thread/shlbbcqho0q9w5shjwdlscnsywjvbfro]. > -- This message was sent by Atlassian Jira (v8.20.10#820010)