[ https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Randall Hauch resolved KAFKA-6605. ---------------------------------- Resolution: Fixed Reviewer: Randall Hauch Fix Version/s: 2.3.1 2.4.0 2.2.2 2.1.2 2.0.2 1.1.2 1.0.3 > Flatten SMT does not properly handle fields that are null > --------------------------------------------------------- > > Key: KAFKA-6605 > URL: https://issues.apache.org/jira/browse/KAFKA-6605 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 1.0.0, 2.0.0 > Reporter: Randall Hauch > Assignee: Michal Borowiecki > Priority: Major > Fix For: 1.0.3, 1.1.2, 2.0.2, 2.1.2, 2.2.2, 2.4.0, 2.3.1 > > > When a message has a null field, the `Flatten` SMT does not properly handle > this and throws an NPE. Consider this message from Debezium: > {code} > { > "before": null, > "after": { > "dbserver1.mydb.team.Value": { > "id": 1, > "name": "kafka", > "email": "ka...@apache.org", > "last_modified": 1519939449000 > } > }, > "source": { > "version": { > "string": "0.7.3" > }, > "name": "dbserver1", > "server_id": 0, > "ts_sec": 0, > "gtid": null, > "file": "mysql-bin.000003", > "pos": 154, > "row": 0, > "snapshot": { > "boolean": true > }, > "thread": null, > "db": { > "string": "mydb" > }, > "table": { > "string": "team" > } > }, > "op": "c", > "ts_ms": { > "long": 1519939520285 > } > } > {code} > Note how `before` is null; this event represents a row was INSERTED and thus > there is no `before` state of the row. This results in an NPE: > {noformat} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:219) > at > org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:234) > at > org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:151) > at org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:75) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:211) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > Here's the connector configuration that was used: > {code} > { > "name": "debezium-connector-flatten", > "config": { > "connector.class": "io.debezium.connector.mysql.MySqlConnector", > "tasks.max": "1", > "database.hostname": "mysql", > "database.port": "3306", > "database.user": "debezium", > "database.password": "dbz", > "database.server.id": "223345", > "database.server.name": "dbserver-flatten", > "database.whitelist": "mydb", > "database.history.kafka.bootstrap.servers": > "kafka-1:9092,kafka-2:9092,kafka-3:9092", > "database.history.kafka.topic": "schema-flatten.mydb", > "include.schema.changes": "true", > "transforms": "flatten", > "transforms.flatten.type": > "org.apache.kafka.connect.transforms.Flatten$Value", > "transforms.flatten.delimiter": "_" > } > } > {code} > Note that the above configuration sets the delimiter to `_`. The default > delimiter is `.`, which is not a valid character within an Avro field, and > doing this results in the following exception: > {noformat} > org.apache.avro.SchemaParseException: Illegal character in: source.version > at org.apache.avro.Schema.validateName(Schema.java:1151) > at org.apache.avro.Schema.access$200(Schema.java:81) > at org.apache.avro.Schema$Field.<init>(Schema.java:403) > at > org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) > at > org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) > at > org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) > at > org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) > at > io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) > at > io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) > at > io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652) > at > io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647) > at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324) > at > io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > This should probably be addressed in the documentation: when using Avro, set > the delimiter to `_` or another alphanumeric character. -- This message was sent by Atlassian JIRA (v7.6.14#76016)