[ https://issues.apache.org/jira/browse/KAFKA-10428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17987948#comment-17987948 ]
Ed Berezitsky commented on KAFKA-10428: --------------------------------------- Suggestion above with explicit conversion of the headers to ByteArray works as long, as no InsertHeader SMT used along with MirrorMaker. We use InsertHeader to achieve bidirectional replication without prefixes. Currently there is no way to make InsertHeader SMT insert literal value with ByteArray schema. We had to create a new transformation for that. If headers converter is changed to ByteArray and InsertHeader is used, there will be another Exception, which will cause tasks to fail (Converter will be failing to convert headers inserted by SMT). I will contribute a change to InsertHeader transformation to enable schema configuration > Mirror Maker connect applies base64 encoding to string headers > -------------------------------------------------------------- > > Key: KAFKA-10428 > URL: https://issues.apache.org/jira/browse/KAFKA-10428 > Project: Kafka > Issue Type: Bug > Components: mirrormaker > Affects Versions: 2.4.0, 2.5.0, 2.6.0 > Reporter: Jennifer Thompson > Priority: Major > > MirrorSourceTask takes the header value as bytes from the ConsumerRecord, > which does not have a header schema, and adds it to the SourceRecord headers > using "addBytes". This uses Schema.BYTES as the schema for the header, and > somehow, base64 encoding gets applied when the record gets committed. > This means that my original header value "with_headers" (created with a > python producer, and stored as a 12 character byte array) becomes the string > value "d2l0aF9oZWFkZXJz", a 16 character byte array, which is the base64 > encoded version of the original. If I try to preempt this using > "d2l0aF9oZWFkZXJz" to start with, and base64 encoding the headers everywhere, > it just gets double encoded to "ZDJsMGFGOW9aV0ZrWlhKeg==" after passing > through the MirrorSourceTask. > I think the base64 encoding may be coming from Values#append > (https://github.com/apache/kafka/blob/trunk/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java#L674), > but I'm not sure how. That is invoked by > SimpleConnectorHeader#fromConnectHeader via Values#convertToString. > SimpleHeaderConverter#toConnectHeader produces the correct schema in this > case, and solves the problem for me, but it seems to guess at the schema, so > I'm not sure if it is the right solution. Since schemas seem to be required > for SourceRecord headers, but not available from ConsumerRecord headers, I'm > not sure what other option we have. I will open a PR with this solution -- This message was sent by Atlassian Jira (v8.20.10#820010)