[ https://issues.apache.org/jira/browse/KAFKA-7273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782954#comment-16782954 ]
Yaroslav Tkachenko commented on KAFKA-7273: ------------------------------------------- I have a WIP implementation [here|https://github.com/apache/kafka/pull/6362] and I'd love to be assigned to this ticket. > Converters should have access to headers. > ----------------------------------------- > > Key: KAFKA-7273 > URL: https://issues.apache.org/jira/browse/KAFKA-7273 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect > Reporter: Jeremy Custenborder > Assignee: Jeremy Custenborder > Priority: Major > > I found myself wanting to build a converter that stored additional type > information within headers. The converter interface does not allow a > developer to access to the headers in a Converter. I'm not suggesting that we > change the method for serializing them, rather that > *org.apache.kafka.connect.header.Headers* be passed in for *fromConnectData* > and *toConnectData*. For example something like this. > {code:java} > import org.apache.kafka.connect.data.Schema; > import org.apache.kafka.connect.data.SchemaAndValue; > import org.apache.kafka.connect.header.Headers; > import org.apache.kafka.connect.storage.Converter; > public interface Converter { > default byte[] fromConnectData(String topic, Headers headers, Schema > schema, Object object) { > return fromConnectData(topic, schema, object); > } > default SchemaAndValue toConnectData(String topic, Headers headers, byte[] > payload) { > return toConnectData(topic, payload); > } > void configure(Map<String, ?> var1, boolean var2); > byte[] fromConnectData(String var1, Schema var2, Object var3); > SchemaAndValue toConnectData(String var1, byte[] var2); > } > {code} > This would be a similar approach to what was already done with > ExtendedDeserializer and ExtendedSerializer in the Kafka client. -- This message was sent by Atlassian JIRA (v7.6.3#76005)