Ewen Cheslack-Postava created KAFKA-5164:
--------------------------------------------

             Summary: SetSchemaMetadata does not replace the schemas in structs 
correctly
                 Key: KAFKA-5164
                 URL: https://issues.apache.org/jira/browse/KAFKA-5164
             Project: Kafka
          Issue Type: Bug
          Components: KafkaConnect
    Affects Versions: 0.10.2.1
            Reporter: Ewen Cheslack-Postava


In SetSchemaMetadataTest we verify that the name and version of the schema in 
the record have been replaced:

https://github.com/apache/kafka/blob/trunk/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java#L62

However, in the case of Structs, the schema will be attached to both the record 
and the Struct itself. So we correctly rebuild the Record:

https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L77
https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L104
https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java#L119

But if the key or value are a struct, they will still contain the old schema 
embedded in the struct.

Ultimately this can lead to validations in other code failing (even for very 
simple changes like adjusting the name of a schema):

{code}
(org.apache.kafka.connect.runtime.WorkerTask:141)
org.apache.kafka.connect.errors.DataException: Mismatching struct schema
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:471)
    at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:295)
    at 
io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:73)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:196)
    at 
org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:167)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:139)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:182)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
{code}

The solution to this is probably to check whether we're dealing with a Struct 
when we use a new schema and potentially copy/reallocate it.

This particular issue would only appear if we don't modify the data, so I think 
SetSchemaMetadata is currently the only transformation that would have the 
issue.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to