[ https://issues.apache.org/jira/browse/KAFKA-7434?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ewen Cheslack-Postava resolved KAFKA-7434. ------------------------------------------ Resolution: Fixed Fix Version/s: 2.1.0 2.0.1 Issue resolved by pull request 5700 [https://github.com/apache/kafka/pull/5700] > DeadLetterQueueReporter throws NPE if transform throws NPE > ---------------------------------------------------------- > > Key: KAFKA-7434 > URL: https://issues.apache.org/jira/browse/KAFKA-7434 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect > Affects Versions: 2.0.0 > Environment: jdk 8 > Reporter: Michal Borowiecki > Assignee: Michal Borowiecki > Priority: Major > Fix For: 2.0.1, 2.1.0 > > > A NPE thrown from a transform in a connector configured with > errors.deadletterqueue.context.headers.enable=true > causes DeadLetterQueueReporter to break with a NPE. > {code} > Executing stage 'TRANSFORMATION' with class > 'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is > {topic='****', partition=1, offset=0, timestamp=1537370573366, > timestampType=CreateTime}. > (org.apache.kafka.connect.runtime.errors.LogReporter) > java.lang.NullPointerException > Task threw an uncaught and unrecoverable exception > (org.apache.kafka.connect.runtime.WorkerTask) > java.lang.NullPointerException > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202) > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172) > at > org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146) > at > org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137) > at > org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108) > at > org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225) > at > org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193) > at > org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175) > at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > > This is caused by populateContextHeaders only checking if the Throwable is > not null, but not checking that the message in the Throwable is not null > before trying to serialize the message: > [https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177] > {code:java} > if (context.error() != null) { > headers.add(ERROR_HEADER_EXCEPTION, > toBytes(context.error().getClass().getName())); > headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, > toBytes(context.error().getMessage())); > {code} > toBytes throws an NPE if passed null as the parameter. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)