[ https://issues.apache.org/jira/browse/KAFKA-6605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16883910#comment-16883910 ]
ASF GitHub Bot commented on KAFKA-6605: --------------------------------------- rhauch commented on pull request #5705: KAFKA-6605 fix NPE in Flatten when optional Struct is null URL: https://github.com/apache/kafka/pull/5705 ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flatten SMT does not properly handle fields that are null > --------------------------------------------------------- > > Key: KAFKA-6605 > URL: https://issues.apache.org/jira/browse/KAFKA-6605 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 1.0.0, 2.0.0 > Reporter: Randall Hauch > Assignee: Michal Borowiecki > Priority: Major > > When a message has a null field, the `Flatten` SMT does not properly handle > this and throws an NPE. Consider this message from Debezium: > {code} > { > "before": null, > "after": { > "dbserver1.mydb.team.Value": { > "id": 1, > "name": "kafka", > "email": "ka...@apache.org", > "last_modified": 1519939449000 > } > }, > "source": { > "version": { > "string": "0.7.3" > }, > "name": "dbserver1", > "server_id": 0, > "ts_sec": 0, > "gtid": null, > "file": "mysql-bin.000003", > "pos": 154, > "row": 0, > "snapshot": { > "boolean": true > }, > "thread": null, > "db": { > "string": "mydb" > }, > "table": { > "string": "team" > } > }, > "op": "c", > "ts_ms": { > "long": 1519939520285 > } > } > {code} > Note how `before` is null; this event represents a row was INSERTED and thus > there is no `before` state of the row. This results in an NPE: > {noformat} > java.lang.NullPointerException > at > org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:219) > at > org.apache.kafka.connect.transforms.Flatten.buildWithSchema(Flatten.java:234) > at > org.apache.kafka.connect.transforms.Flatten.applyWithSchema(Flatten.java:151) > at org.apache.kafka.connect.transforms.Flatten.apply(Flatten.java:75) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:211) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) > at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > Here's the connector configuration that was used: > {code} > { > "name": "debezium-connector-flatten", > "config": { > "connector.class": "io.debezium.connector.mysql.MySqlConnector", > "tasks.max": "1", > "database.hostname": "mysql", > "database.port": "3306", > "database.user": "debezium", > "database.password": "dbz", > "database.server.id": "223345", > "database.server.name": "dbserver-flatten", > "database.whitelist": "mydb", > "database.history.kafka.bootstrap.servers": > "kafka-1:9092,kafka-2:9092,kafka-3:9092", > "database.history.kafka.topic": "schema-flatten.mydb", > "include.schema.changes": "true", > "transforms": "flatten", > "transforms.flatten.type": > "org.apache.kafka.connect.transforms.Flatten$Value", > "transforms.flatten.delimiter": "_" > } > } > {code} > Note that the above configuration sets the delimiter to `_`. The default > delimiter is `.`, which is not a valid character within an Avro field, and > doing this results in the following exception: > {noformat} > org.apache.avro.SchemaParseException: Illegal character in: source.version > at org.apache.avro.Schema.validateName(Schema.java:1151) > at org.apache.avro.Schema.access$200(Schema.java:81) > at org.apache.avro.Schema$Field.<init>(Schema.java:403) > at > org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2124) > at > org.apache.avro.SchemaBuilder$FieldBuilder.completeField(SchemaBuilder.java:2116) > at > org.apache.avro.SchemaBuilder$FieldBuilder.access$5300(SchemaBuilder.java:2034) > at > org.apache.avro.SchemaBuilder$GenericDefault.withDefault(SchemaBuilder.java:2423) > at > io.confluent.connect.avro.AvroData.addAvroRecordField(AvroData.java:898) > at > io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:799) > at > io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:652) > at > io.confluent.connect.avro.AvroData.fromConnectSchema(AvroData.java:647) > at io.confluent.connect.avro.AvroData.fromConnectData(AvroData.java:324) > at > io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:75) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:220) > at > org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:187) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > {noformat} > This should probably be addressed in the documentation: when using Avro, set > the delimiter to `_` or another alphanumeric character. -- This message was sent by Atlassian JIRA (v7.6.14#76016)