[ https://issues.apache.org/jira/browse/FLINK-20672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17783892#comment-17783892 ]
Zakelly Lan commented on FLINK-20672: ------------------------------------- [~yunta] I'm afraid this is still valid since the io executors are using the FatalExitExceptionHandler (see [initialization code|https://github.com/apache/flink/pull/16946/files#diff-2d970a09502b5a55d74c67dabafa932a18848d4a600b9b33f4afb584328dc0f0R178]). I did an experiment by throwing an RuntimeException within the abort message sending lambda: {code:java} // send notification of aborted checkpoints asynchronously. executor.execute( () -> { // send the "abort checkpoint" messages to necessary vertices. // ... LOG.info("Simulate something wrong."); throw new RuntimeException("something wrong"); }); {code} And when a checkpoint expired it turns out the JM exits with this log: {code:java} org.apache.flink.runtime.checkpoint.CheckpointException: Checkpoint expired before completing. at org.apache.flink.runtime.checkpoint.CheckpointCoordinator$CheckpointCanceller.run(CheckpointCoordinator.java:2323) [flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_372] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_372] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_372] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_372] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372] 2023-11-08 13:58:13,693 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Simulate something wrong. 2023-11-08 13:58:13,694 ERROR org.apache.flink.util.FatalExitExceptionHandler [] - FATAL: Thread 'jobmanager-io-thread-1' produced an uncaught exception. Stopping the process... java.lang.RuntimeException: something wrong at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.lambda$sendAbortedMessages$12(CheckpointCoordinator.java:1605) ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_372] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_372] at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372] 2023-11-08 13:58:13,722 ERROR org.apache.flink.util.FatalExitExceptionHandler (Thread dump omitted) 2023-11-08 13:58:13,724 INFO org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting YarnJobClusterEntrypoint down with application status UNKNOWN. Diagnostics Cluster entrypoint has been closed externally.. 2023-11-08 13:58:13,725 INFO org.apache.flink.runtime.blob.BlobServer [] - Stopped BLOB server at 0.0.0.0:38077 2023-11-08 13:58:13,727 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting down rest endpoint. 2023-11-08 13:58:13,736 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing cache directory /tmp/flink-web-318212a0-3faa-4fe8-92df-ed445943d4e0/flink-web-ui 2023-11-08 13:58:13,736 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - http://10.0.3.79:38897 lost leadership 2023-11-08 13:58:13,736 INFO org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down complete. 2023-11-08 13:58:13,736 INFO org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent [] - Closing components. 2023-11-08 13:58:13,737 INFO org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - Stopping JobDispatcherLeaderProcess. 2023-11-08 13:58:13,737 INFO org.apache.flink.runtime.resourcemanager.ResourceManagerServiceImpl [] - Stopping resource manager service. 2023-11-08 13:58:13,739 INFO org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Stopping dispatcher pekko.tcp://flink@xxxxxxxxxx:44267/user/rpc/dispatcher_0. 2023-11-08 13:58:13,739 INFO org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Stopping all currently running jobs of dispatcher pekko.tcp://xxxxxxxxxx:44267/user/rpc/dispatcher_0. 2023-11-08 13:58:13,743 INFO org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping the JobMaster for job 'testing6' (bf8a2c9a2628fedd369e1c50451f66ea). 2023-11-08 13:58:13,744 INFO org.apache.flink.runtime.dispatcher.MiniDispatcher [] - Job bf8a2c9a2628fedd369e1c50451f66ea reached terminal state SUSPENDED. 2023-11-08 13:58:13,745 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job testing6 (bf8a2c9a2628fedd369e1c50451f66ea) switched from state RUNNING to SUSPENDED. org.apache.flink.util.FlinkException: Scheduler is being stopped. at org.apache.flink.runtime.scheduler.SchedulerBase.closeAsync(SchedulerBase.java:656) ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.jobmaster.JobMaster.stopScheduling(JobMaster.java:1082) ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.jobmaster.JobMaster.stopJobExecution(JobMaster.java:1045) ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:451) ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.rpc.RpcEndpoint.internalCallOnStop(RpcEndpoint.java:239) ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.lambda$terminate$0(PekkoRpcActor.java:574) ~[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[flink-dist-1.19-SNAPSHOT.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor$StartedState.terminate(PekkoRpcActor.java:573) ~[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:196) ~[flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) [flink-rpc-akka0bb3ee80-884c-4212-8125-e26007032634.jar:1.19-SNAPSHOT] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_372] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_372] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_372] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_372] {code} > notifyCheckpointAborted RPC failure can fail JM > ----------------------------------------------- > > Key: FLINK-20672 > URL: https://issues.apache.org/jira/browse/FLINK-20672 > Project: Flink > Issue Type: Bug > Components: Runtime / Checkpointing > Affects Versions: 1.11.3, 1.12.0 > Reporter: Roman Khachatryan > Assignee: Zakelly Lan > Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor, > pull-request-available > > Introduced in FLINK-8871, aborted RPC notifications are done asynchonously: > > {code} > private void sendAbortedMessages(long checkpointId, long timeStamp) { > // send notification of aborted checkpoints asynchronously. > executor.execute(() -> { > // send the "abort checkpoint" messages to necessary > vertices. > // .. > }); > } > {code} > However, the executor that eventually executes this request is created as > follows > {code} > final ScheduledExecutorService futureExecutor = > Executors.newScheduledThreadPool( > Hardware.getNumberCPUCores(), > new ExecutorThreadFactory("jobmanager-future")); > {code} > ExecutorThreadFactory uses UncaughtExceptionHandler that exits JVM on error. > cc: [~yunta] -- This message was sent by Atlassian Jira (v8.20.10#820010)