[ 
https://issues.apache.org/jira/browse/FLINK-20672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17783892#comment-17783892
 ] 

Zakelly Lan commented on FLINK-20672:
-------------------------------------

[~yunta] I'm afraid this is still valid since the io executors are using the 
FatalExitExceptionHandler (see [initialization 
code|https://github.com/apache/flink/pull/16946/files#diff-2d970a09502b5a55d74c67dabafa932a18848d4a600b9b33f4afb584328dc0f0R178]).
 I did an experiment by throwing an RuntimeException within the abort message 
sending lambda:
{code:java}
        // send notification of aborted checkpoints asynchronously.
        executor.execute(
                () -> {
                    // send the "abort checkpoint" messages to necessary 
vertices.
                    // ...
                    LOG.info("Simulate something wrong.");
                    throw new RuntimeException("something wrong");
                });

{code}

And when a checkpoint expired it turns out the JM exits with this log:

{code:java}
org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired 
before completing.
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2323)
 [flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
[?:1.8.0_372]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
[?:1.8.0_372]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
 [?:1.8.0_372]
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
 [?:1.8.0_372]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
[?:1.8.0_372]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
[?:1.8.0_372]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
2023-11-08 13:58:13,693 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Simulate 
something wrong.
2023-11-08 13:58:13,694 ERROR org.apache.flink.util.FatalExitExceptionHandler   
           [] - FATAL: Thread 'jobmanager-io-thread-1' produced an uncaught 
exception. Stopping the process...
java.lang.RuntimeException: something wrong
        at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$sendAbortedMessages$12(CheckpointCoordinator.java:1605)
 ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
~[?:1.8.0_372]
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
~[?:1.8.0_372]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
2023-11-08 13:58:13,722 ERROR org.apache.flink.util.FatalExitExceptionHandler

(Thread dump omitted)

2023-11-08 13:58:13,724 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics 
Cluster entrypoint has been closed externally..
2023-11-08 13:58:13,725 INFO  org.apache.flink.runtime.blob.BlobServer          
           [] - Stopped BLOB server at 0.0.0.0:38077
2023-11-08 13:58:13,727 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2023-11-08 13:58:13,736 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing 
cache directory /tmp/flink-web-318212a0-3faa-4fe8-92df-ed445943d4e0/flink-web-ui
2023-11-08 13:58:13,736 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - 
http://10.0.3.79:38897 lost leadership
2023-11-08 13:58:13,736 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down 
complete.
2023-11-08 13:58:13,736 INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 [] - Closing components.
2023-11-08 13:58:13,737 INFO  
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - 
Stopping JobDispatcherLeaderProcess.
2023-11-08 13:58:13,737 INFO  
org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - 
Stopping resource manager service.
2023-11-08 13:58:13,739 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Stopping 
dispatcher pekko.tcp://flink@xxxxxxxxxx:44267/user/rpc/dispatcher_0.
2023-11-08 13:58:13,739 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Stopping all 
currently running jobs of dispatcher 
pekko.tcp://xxxxxxxxxx:44267/user/rpc/dispatcher_0.
2023-11-08 13:58:13,743 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Stopping the JobMaster for job 'testing6' 
(bf8a2c9a2628fedd369e1c50451f66ea).
2023-11-08 13:58:13,744 INFO  
org.apache.flink.runtime.dispatcher.MiniDispatcher           [] - Job 
bf8a2c9a2628fedd369e1c50451f66ea reached terminal state SUSPENDED.
2023-11-08 13:58:13,745 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - Job testing6 
(bf8a2c9a2628fedd369e1c50451f66ea) switched from state RUNNING to SUSPENDED.
org.apache.flink.util.FlinkException: Scheduler is being stopped.
        at 
org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:656)
 ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:1082)
 ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:1045)
 ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:451) 
~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239)
 ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.lambda$terminate$0(PekkoRpcActor.java:574)
 ~[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.terminate(PekkoRpcActor.java:573)
 ~[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:196)
 ~[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) 
[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
[?:1.8.0_372]
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
[?:1.8.0_372]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
[?:1.8.0_372]
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
[?:1.8.0_372]

{code}



> notifyCheckpointAborted RPC failure can fail JM
> -----------------------------------------------
>
>                 Key: FLINK-20672
>                 URL: https://issues.apache.org/jira/browse/FLINK-20672
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.11.3, 1.12.0
>            Reporter: Roman Khachatryan
>            Assignee: Zakelly Lan
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor, 
> pull-request-available
>
> Introduced in FLINK-8871, aborted RPC notifications are done asynchonously:
>  
> {code}
>       private void sendAbortedMessages(long checkpointId, long timeStamp) {
>               // send notification of aborted checkpoints asynchronously.
>               executor.execute(() -> {
>                       // send the "abort checkpoint" messages to necessary 
> vertices.
>                         // ..
>               });
>       }
> {code}
> However, the executor that eventually executes this request is created as 
> follows
> {code}
>               final ScheduledExecutorService futureExecutor = 
> Executors.newScheduledThreadPool(
>                               Hardware.getNumberCPUCores(),
>                               new ExecutorThreadFactory("jobmanager-future"));
> {code}
> ExecutorThreadFactory uses UncaughtExceptionHandler that exits JVM on error.
> cc: [~yunta]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to