Allen Wang created FLINK-11303:
----------------------------------
Summary: Utilizing Kafka headers for serialization and
deserialization
Key: FLINK-11303
URL: https://issues.apache.org/jira/browse/FLINK-11303
Project: Flink
Issue Type: Improvement
Components: Kafka Connector
Reporter: Allen Wang
Kafka introduces headers in producer and consumer record since version 0.11.
This is the high level description:
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-82+-+Add+Record+Headers]
However, current Flink Kafka connector simply ignores the headers. This will
make it hard to integrate with the Kafka ecosystem where other Kafka clients
make use of the headers.
I propose to support headers in Flink by modifying the following API:
* In KeyedSerializationSchema, add
{code:java}
List<Tuple2<String, byte[]>> getHeaders(T element)
{code}
* In KeyedDeserializationSchema, add
{code:java}
T deserailize(byte[] messageKey, byte[] message, List<Tuple2<String, byte[]>>
headers, String topic, int partition, long offset) throws IOException{code}
These new methods will be invoked by FlinkKafkaProducer and KafkaFetcher in the
serialization and deserialization process. If backward compatibility is a
concern, we can add default implementation to these methods where headers are
ignored.
If backward compatiblity
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)