[ https://issues.apache.org/jira/browse/KAFKA-5164?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch updated KAFKA-5164: --------------------------------- Status: Patch Available (was: Open) > SetSchemaMetadata does not replace the schemas in structs correctly > ------------------------------------------------------------------- > > Key: KAFKA-5164 > URL: https://issues.apache.org/jira/browse/KAFKA-5164 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 0.10.2.1 > Reporter: Ewen Cheslack-Postava > Assignee: Randall Hauch > > In SetSchemaMetadataTest we verify that the name and version of the schema in > the record have been replaced: > https://github.com/apache/kafka/blob/trunk/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java#L62 > However, in the case of Structs, the schema will be attached to both the > record and the Struct itself. So we correctly rebuild the Record: > https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L77 > https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L104 > https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L119 > But if the key or value are a struct, they will still contain the old schema > embedded in the struct. > Ultimately this can lead to validations in other code failing (even for very > simple changes like adjusting the name of a schema): > {code} > (org.apache.kafka.connect.runtime.WorkerTask:141) > org.apache.kafka.connect.errors.DataException: Mismatching struct schema > at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:471) > at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295) > at > io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {code} > The solution to this is probably to check whether we're dealing with a Struct > when we use a new schema and potentially copy/reallocate it. > This particular issue would only appear if we don't modify the data, so I > think SetSchemaMetadata is currently the only transformation that would have > the issue. -- This message was sent by Atlassian JIRA (v6.3.15#6346)