[ https://issues.apache.org/jira/browse/KAFKA-8412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang resolved KAFKA-8412. ---------------------------------- Resolution: Fixed > Still a nullpointer exception thrown on shutdown while flushing before > closing producers > ---------------------------------------------------------------------------------------- > > Key: KAFKA-8412 > URL: https://issues.apache.org/jira/browse/KAFKA-8412 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.1.1 > Reporter: Sebastiaan > Assignee: Chris Pettitt > Priority: Minor > Fix For: 2.1.2, 2.2.2, 2.4.0, 2.3.1 > > > I found a closed issue and replied there but decided to open one myself > because although they're related they're slightly different. The original > issue is at https://issues.apache.org/jira/browse/KAFKA-7678 > The fix there has been to implement a null check around closing a producer > because in some cases the producer is already null there (has been closed > already) > In version 2.1.1 we are getting a very similar exception, but in the 'flush' > method that is called pre-close. This is in the log: > {code:java} > message: stream-thread > [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed > while closing StreamTask 1_26 due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > Followed by: > > {code:java} > message: task [1_26] Could not close task due to the following error: > logger_name: org.apache.kafka.streams.processor.internals.StreamTask > java.lang.NullPointerException: null > at > org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) > at > org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) > at > org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) > at > org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) > at > org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) > at > org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) > at > org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) > at > org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} > If I look at the source code at this point, I see a nice null check in the > close method, but not in the flush method that is called just before that: > {code:java} > public void flush() { > this.log.debug("Flushing producer"); > this.producer.flush(); > this.checkForException(); > } > public void close() { > this.log.debug("Closing producer"); > if (this.producer != null) { > this.producer.close(); > this.producer = null; > } > this.checkForException(); > }{code} > Seems to my (ignorant) eye that the flush method should also be wrapped in a > null check in the same way as has been done for close. -- This message was sent by Atlassian Jira (v8.3.2#803003)