[ 
https://issues.apache.org/jira/browse/KAFKA-7678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16845824#comment-16845824
 ] 

Sebastiaan commented on KAFKA-7678:
-----------------------------------

Hi guys! I think this fix is not complete yet. In version 2.1.1 we are getting 
a very similar exception, but in the 'flush' method that is called pre-close. 
This is the full stacktrace:


{code:java}
message: stream-thread 
[webhook-poster-7034dbb0-7423-476b-98f3-d18db675d6d6-StreamThread-1] Failed 
while closing StreamTask 1_26 due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.AssignedStreamsTasks

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758)
{code}


Followed by:

 
{code:java}
message: task [1_26] Could not close task due to the following error:
logger_name: org.apache.kafka.streams.processor.internals.StreamTask

java.lang.NullPointerException: null
    at 
org.apache.kafka.streams.processor.internals.RecordCollectorImpl.flush(RecordCollectorImpl.java:245)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:493)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:443)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:568)
    at 
org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
    at 
org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:397)
    at 
org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
    at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){code}

If I look at the source code at this point, I see a nice null check in the 
close method, but not in the flush method that is called just before that:
{code:java}
public void flush() {
    this.log.debug("Flushing producer");
    this.producer.flush();
    this.checkForException();
}

public void close() {
    this.log.debug("Closing producer");
    if (this.producer != null) {
        this.producer.close();
        this.producer = null;
    }

    this.checkForException();
}{code}
{color:#000080} {color}

Seems to my (ignorant) eye that the flush method should also be wrapped in a 
null check.

> Failed to close producer due to java.lang.NullPointerException
> --------------------------------------------------------------
>
>                 Key: KAFKA-7678
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7678
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.1, 2.0.1, 2.1.0
>            Reporter: Jonathan Santilli
>            Assignee: Jonathan Santilli
>            Priority: Minor
>             Fix For: 1.1.2, 2.2.0, 2.1.1, 2.0.2
>
>
> This occurs when the group is rebalancing in a Kafka Stream application and 
> the process (the Kafka Stream application) receives a *SIGTERM* to stop it 
> gracefully.
>  
>  
> {noformat}
> ERROR org.apache.kafka.streams.processor.internals.StreamTask - task [1_46] 
> Failed to close producer due to the following error:
> java.lang.NullPointerException
>  at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.close(RecordCollectorImpl.java:252)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.maybeAbortTransactionAndCloseRecordCollector(StreamTask.java:607)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:584)
>  at 
> org.apache.kafka.streams.processor.internals.StreamTask.close(StreamTask.java:691)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.closeUnclean(AssignedTasks.java:428)
>  at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.close(AssignedTasks.java:408)
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.shutdown(TaskManager.java:260)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.completeShutdown(StreamThread.java:1181)
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:758){noformat}
>  
>  
> Although I have checked the code and the method 
> `*maybeAbortTransactionAndCloseRecordCollector*` in the `*StreamTask*.*java*` 
> class is expecting any kind of error to happen since is catching 
> `*Throwable*`.
>  
>  
>  
> {noformat}
> try {
>  recordCollector.close();
> } catch (final Throwable e) {
>  log.error("Failed to close producer due to the following error:", e);
> } finally {
>  producer = null;
> }{noformat}
>  
> Should we consider this a bug?
> In my opinion, we could check for the `*null*` possibility at 
> `*RecordCollectorImpl*.*java*` class:
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  producer.close();
>  producer = null;
>  checkForException();
> }{noformat}
>  
> Change it for:
>  
> {noformat}
> @Override
> public void close() {
>  log.debug("Closing producer");
>  if ( Objects.nonNull(producer) ) {
>     producer.close();
>     producer = null;
>  }
>  checkForException();
> }{noformat}
>  
> How does that sound?
>  
> Kafka Brokers running 2.0.0
> Kafka Stream and client 2.1.0
> OpenJDK 8
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to