Jonathan Santilli created KAFKA-7678: ----------------------------------------
Summary: Failed to close producer due to java.lang.NullPointerException Key: KAFKA-7678 URL: https://issues.apache.org/jira/browse/KAFKA-7678 Project: Kafka Issue Type: Bug Reporter: Jonathan Santilli This occurs when the group is rebalancing in a Kafka Stream application and the process (the Kafka Stream application) receives a SIGTERM to stop it gracefully. {noformat} ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] Failed to close producer due to the following error: java.lang.NullPointerException at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252) at org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584) at org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691) at org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428) at org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408) at org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260) at org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat} Although I have checked the code and the method `maybeAbortTransactionAndCloseRecordCollector` in the `StreamTask.java` class is expecting any kind of error to happen since is catching `Throwable`. {noformat} try { recordCollector.close(); } catch (final Throwable e) { log.error("Failed to close producer due to the following error:", e); } finally { producer = null; }{noformat} Should we consider this a bug? In my opinion, we could check for the `null` possibility at `RecordCollectorImpl.java` class: {noformat} @Override public void close() { log.debug("Closing producer"); producer.close(); producer = null; checkForException(); }{noformat} Change it for: {noformat} @Override public void close() { log.debug("Closing producer"); if ( Objects.nonNull(producer) ) { producer.close(); producer = null; } checkForException(); }{noformat} How does that sound? Kafka Brokers running 2.0.0 Kafka Stream and client 2.1.0 OpenJDK 8 -- This message was sent by Atlassian JIRA (v7.6.3#76005)