[ https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch resolved KAFKA-7273. ---------------------------------- Fix Version/s: 2.4.0 Reviewer: Randall Hauch Resolution: Fixed [KIP-440|https://cwiki.apache.org/confluence/display/KAFKA/KIP-440%3A+Extend+Connect+Converter+to+support+headers] has been approved, and the aforementioned PR has been merged and will first appear in AK 2.4.0. > Converters should have access to headers. > ----------------------------------------- > > Key: KAFKA-7273 > URL: https://issues.apache.org/jira/browse/KAFKA-7273 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Reporter: Jeremy Custenborder > Assignee: Jeremy Custenborder > Priority: Major > Fix For: 2.4.0 > > > I found myself wanting to build a converter that stored additional type > information within headers. The converter interface does not allow a > developer to access to the headers in a Converter. I'm not suggesting that we > change the method for serializing them, rather that > *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* > and *toConnectData*. For example something like this. > {code:java} > import org.apache.kafka.connect.data.Schema; > import org.apache.kafka.connect.data.SchemaAndValue; > import org.apache.kafka.connect.header.Headers; > import org.apache.kafka.connect.storage.Converter; > public interface Converter { > default byte[] fromConnectData(String topic, Headers headers, Schema > schema, Object object) { > return fromConnectData(topic, schema, object); > } > default SchemaAndValue toConnectData(String topic, Headers headers, byte[] > payload) { > return toConnectData(topic, payload); > } > void configure(Map<String, ?> var1, boolean var2); > byte[] fromConnectData(String var1, Schema var2, Object var3); > SchemaAndValue toConnectData(String var1, byte[] var2); > } > {code} > This would be a similar approach to what was already done with > ExtendedDeserializer and ExtendedSerializer in the Kafka client. -- This message was sent by Atlassian Jira (v8.3.4#803005)