[ https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16710556#comment-16710556 ]
ASF GitHub Bot commented on KAFKA-7678: --------------------------------------- mjsax closed pull request #5993: KAFKA-7678: Avoid NPE when closing the RecordCollector URL: https://github.com/apache/kafka/pull/5993 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index 5df14ee2815..d3a00301d7a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -249,8 +249,10 @@ public void flush() { @Override public void close() { log.debug("Closing producer"); - producer.close(); - producer = null; + if (producer != null) { + producer.close(); + producer = null; + } checkForException(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index e63751899f2..0bc65ccbe10 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -387,6 +387,18 @@ public void testRecordHeaderPassThroughSerializer() { } } + @Test + public void testShouldNotThrowNPEOnCloseIfProducerIsNotInitialized() { + final RecordCollectorImpl collector = new RecordCollectorImpl( + "NoNPE", + logContext, + new DefaultProductionExceptionHandler(), + new Metrics().sensor("skipped-records") + ); + + collector.close(); + } + private static class CustomStringSerializer extends StringSerializer { private boolean isKey; ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Failed to close producer due to java.lang.NullPointerException > -------------------------------------------------------------- > > Key: KAFKA-7678 > URL: https://issues.apache.org/jira/browse/KAFKA-7678 > Project: Kafka > Issue Type: Bug > Components: streams > Reporter: Jonathan Santilli > Assignee: Jonathan Santilli > Priority: Minor > Labels: bug > > This occurs when the group is rebalancing in a Kafka Stream application and > the process (the Kafka Stream application) receives a *SIGTERM* to stop it > gracefully. > > > {noformat} > ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] > Failed to close producer due to the following error: > java.lang.NullPointerException > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) > at > org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} > > > Although I have checked the code and the method > `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` > class is expecting any kind of error to happen since is catching > `*Throwable*`. > > > > {noformat} > try { > recordCollector.close(); > } catch (final Throwable e) { > log.error("Failed to close producer due to the following error:", e); > } finally { > producer = null; > }{noformat} > > Should we consider this a bug? > In my opinion, we could check for the `*null*` possibility at > `*RecordCollectorImpl*.*java*` class: > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > producer.close(); > producer = null; > checkForException(); > }{noformat} > > Change it for: > > {noformat} > @Override > public void close() { > log.debug("Closing producer"); > if ( Objects.nonNull(producer) ) { > producer.close(); > producer = null; > } > checkForException(); > }{noformat} > > How does that sound? > > Kafka Brokers running 2.0.0 > Kafka Stream and client 2.1.0 > OpenJDK 8 > -- This message was sent by Atlassian JIRA (v7.6.3#76005)