[ 
https://issues.apache.org/jira/browse/KAFKA-7434?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633070#comment-16633070
 ] 

ASF GitHub Bot commented on KAFKA-7434:
---------------------------------------

mihbor closed pull request #5701: KAFKA-7434 fix NPE in DeadLetterQueueReporter 
- backport to 2.0
URL: https://github.com/apache/kafka/pull/5701
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
index c059dcff793..23122699783 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java
@@ -199,6 +199,10 @@ void populateContextHeaders(ProducerRecord<byte[], byte[]> 
producerRecord, Proce
     }
 
     private byte[] toBytes(String value) {
-        return value.getBytes(StandardCharsets.UTF_8);
+        if (value != null) {
+            return value.getBytes(StandardCharsets.UTF_8);
+        } else {
+            return null;
+        }
     }
 }
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
index fa628b09840..00a922f76ad 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/errors/ErrorReporterTest.java
@@ -59,6 +59,7 @@
 import static 
org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_TASK_ID;
 import static org.easymock.EasyMock.replay;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 @RunWith(PowerMockRunner.class)
@@ -205,6 +206,7 @@ public void testSetDLQConfigs() {
         assertEquals(configuration.dlqTopicReplicationFactor(), 7);
     }
 
+    @Test
     public void testDlqHeaderConsumerRecord() {
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
@@ -232,6 +234,34 @@ public void testDlqHeaderConsumerRecord() {
         assertTrue(headerValue(producerRecord, 
ERROR_HEADER_EXCEPTION_STACK_TRACE).startsWith("org.apache.kafka.connect.errors.ConnectException:
 Test Exception"));
     }
 
+    @Test
+    public void testDlqHeaderOnNullExceptionMessage() {
+        Map<String, String> props = new HashMap<>();
+        props.put(SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+        props.put(SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, 
"true");
+        DeadLetterQueueReporter deadLetterQueueReporter = new 
DeadLetterQueueReporter(producer, config(props), TASK_ID, errorHandlingMetrics);
+
+        ProcessingContext context = new ProcessingContext();
+        context.consumerRecord(new ConsumerRecord<>("source-topic", 7, 10, 
"source-key".getBytes(), "source-value".getBytes()));
+        context.currentContext(Stage.TRANSFORMATION, Transformation.class);
+        context.error(new NullPointerException());
+
+        ProducerRecord<byte[], byte[]> producerRecord = new 
ProducerRecord<>(DLQ_TOPIC, "source-key".getBytes(), "source-value".getBytes());
+
+        deadLetterQueueReporter.populateContextHeaders(producerRecord, 
context);
+        assertEquals("source-topic", headerValue(producerRecord, 
ERROR_HEADER_ORIG_TOPIC));
+        assertEquals("7", headerValue(producerRecord, 
ERROR_HEADER_ORIG_PARTITION));
+        assertEquals("10", headerValue(producerRecord, 
ERROR_HEADER_ORIG_OFFSET));
+        assertEquals(TASK_ID.connector(), headerValue(producerRecord, 
ERROR_HEADER_CONNECTOR_NAME));
+        assertEquals(String.valueOf(TASK_ID.task()), 
headerValue(producerRecord, ERROR_HEADER_TASK_ID));
+        assertEquals(Stage.TRANSFORMATION.name(), headerValue(producerRecord, 
ERROR_HEADER_STAGE));
+        assertEquals(Transformation.class.getName(), 
headerValue(producerRecord, ERROR_HEADER_EXECUTING_CLASS));
+        assertEquals(NullPointerException.class.getName(), 
headerValue(producerRecord, ERROR_HEADER_EXCEPTION));
+        
assertNull(producerRecord.headers().lastHeader(ERROR_HEADER_EXCEPTION_MESSAGE).value());
+        assertTrue(headerValue(producerRecord, 
ERROR_HEADER_EXCEPTION_STACK_TRACE).length() > 0);
+        assertTrue(headerValue(producerRecord, 
ERROR_HEADER_EXCEPTION_STACK_TRACE).startsWith("java.lang.NullPointerException"));
+    }
+
     @Test
     public void testDlqHeaderIsAppended() {
         Map<String, String> props = new HashMap<>();


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> DeadLetterQueueReporter throws NPE if transform throws NPE
> ----------------------------------------------------------
>
>                 Key: KAFKA-7434
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7434
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.0.0
>         Environment: jdk 8
>            Reporter: Michal Borowiecki
>            Assignee: Michal Borowiecki
>            Priority: Major
>             Fix For: 2.0.1, 2.1.0
>
>
> A NPE thrown from a transform in a connector configured with
> errors.deadletterqueue.context.headers.enable=true
> causes DeadLetterQueueReporter to break with a NPE.
> {code}
> Executing stage 'TRANSFORMATION' with class 
> 'org.apache.kafka.connect.transforms.Flatten$Value', where consumed record is 
> {topic='****', partition=1, offset=0, timestamp=1537370573366, 
> timestampType=CreateTime}. 
> (org.apache.kafka.connect.runtime.errors.LogReporter)
> java.lang.NullPointerException
> Task threw an uncaught and unrecoverable exception 
> (org.apache.kafka.connect.runtime.WorkerTask)
> java.lang.NullPointerException
>       at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.toBytes(DeadLetterQueueReporter.java:202)
>       at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.populateContextHeaders(DeadLetterQueueReporter.java:172)
>       at 
> org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.report(DeadLetterQueueReporter.java:146)
>       at 
> org.apache.kafka.connect.runtime.errors.ProcessingContext.report(ProcessingContext.java:137)
>       at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:108)
>       at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:44)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:532)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
>       at 
> org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
>       at 
> org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
>       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
>       at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
>  
> This is caused by populateContextHeaders only checking if the Throwable is 
> not null, but not checking that the message in the Throwable is not null 
> before trying to serialize the message:
> [https://github.com/apache/kafka/blob/cfd33b313c9856ae2b4b45ed3d4aac41d6ef5a6b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/errors/DeadLetterQueueReporter.java#L170-L177]
> {code:java}
> if (context.error() != null) {
>      headers.add(ERROR_HEADER_EXCEPTION, 
> toBytes(context.error().getClass().getName()));
>      headers.add(ERROR_HEADER_EXCEPTION_MESSAGE, 
> toBytes(context.error().getMessage()));
> {code}
> toBytes throws an NPE if passed null as the parameter.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to