Sebastiaan created KAFKA-8412: --------------------------------- Summary: Still a nullpointer exception thrown on shutdown while flushing before closing producers Key: KAFKA-8412 URL: https://issues.apache.org/jira/browse/KAFKA-8412 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.1.1 Reporter: Sebastiaan
I found a closed issue and replied there but decided to open one myself because although they're related they're slightly different. The original issue is at https://issues.apache.org/jira/browse/KAFKA-7678 The fix there has been to implement a null check around closing a producer because in some cases the producer is already null there (has been closed already) In version 2.1.1 we are getting a very similar exception, but in the 'flush' method that is called pre-close. This is in the log: {code:java} message: stream-thread [webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed while closing StreamTask 1_26 due to the following error: logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} Followed by: {code:java} message: task [1_26] Could not close task due to the following error: logger_name: org.apache.kafka.streams.processor.internals.StreamTask java.lang.NullPointerException: null at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245) at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493) at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code} If I look at the source code at this point, I see a nice null check in the close method, but not in the flush method that is called just before that: {code:java} public void flush() { this.log.debug("Flushing producer"); this.producer.flush(); this.checkForException(); } public void close() { this.log.debug("Closing producer"); if (this.producer != null) { this.producer.close(); this.producer = null; } this.checkForException(); }{code} Seems to my (ignorant) eye that the flush method should also be wrapped in a null check in the same way as has been done for close. -- This message was sent by Atlassian JIRA (v7.6.3#76005)