Piotr Nowojski created FLINK-27202:
--------------------------------------

             Summary: NullPointerException on stop-with-savepoint with 
AsyncWaitOperator followed by FlinkKafkaProducer 
                 Key: FLINK-27202
                 URL: https://issues.apache.org/jira/browse/FLINK-27202
             Project: Flink
          Issue Type: Bug
          Components: Connectors / Kafka, Runtime / Task
    Affects Versions: 1.13.6, 1.12.7
            Reporter: Piotr Nowojski


Some lingering mails from {{AsyncWaitOperator}} (or other operators using 
mailbox, or maybe even processing time timers), that are chained with 
{{FlinkKafkaProducer}} can cause the following exceptions when using 
stop-with-savepoint:

{noformat}
2022-04-11 15:46:19,781 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - static 
enrichment -> Map -> Sink: enriched events sink (179/256) 
(3fefa588ad05fa8d2a10a6ad4a740cc6) switched from RUNNING to FAILED on 
10.239.104.67:38149-12df6c @ 10.239.104.67 (dataPort=35745).
java.lang.NullPointerException: null
        at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction$TransactionHolder.access$000(TwoPhaseCommitSinkFunction.java:591)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:223)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:71)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:46)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:26)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:50)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry.emitResult(StreamRecordQueueEntry.java:64)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue$Segment.emitCompleted(UnorderedStreamElementQueue.java:272)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.emitCompletedElement(UnorderedStreamElementQueue.java:159)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.outputCompletedElement(AsyncWaitOperator.java:287)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator.access$100(AsyncWaitOperator.java:78)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.processResults(AsyncWaitOperator.java:356)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.api.operators.async.AsyncWaitOperator$ResultHandler.lambda$processInMailbox$0(AsyncWaitOperator.java:337)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.Mail.run(Mail.java:90) 
~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.drain(MailboxProcessor.java:170)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.afterInvoke(StreamTask.java:647)
 ~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:591) 
~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:758) 
~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:573) 
~[flink-dist_2.12-1.12.7-stream2.jar:1.12.7-stream2]
        at java.lang.Thread.run(Thread.java:829) ~[?:?]
{noformat}

This happens since {{FlinkKafkaProducer}} can be closed, without quiescing the 
mailbox. This issue might have been fixed by either FLINK-23532
or FLINK-23408.




--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to