Hi Paul, you're analysis is right. The JobManager does not wait for pending operation results to be properly served. See https://issues.apache.org/jira/browse/FLINK-10309 for more details. I think a way to solve it is to wait for some time if the RestServerEndpoint still has some responses to serve.
Cheers, Till On Fri, Sep 14, 2018 at 4:41 AM Paul Lam <paullin3...@gmail.com> wrote: > Hi, > > I've been experiencing an issue that the client frontend is likely unable > to get cancel-with-savepoint responses from JobManager sometimes. > > I’m using Flink 1.5.3 on YARN cluster, and when I execute cancel with > savepoint command (FsStataBackend backed by HDFS), normally it finishes in > seconds, but sometimes it takes unexpectedly long and ends up with` > RetryException: Could not complete the operation. Number of retries has > been exhausted.` However, the server side logs are just like normal, the > savepoint is stored fine, and the job is also canceled. > > Haven’t looked into the codes yet, but I suspect the response was lost but > the JobManager was still shutting down, so when the client retries it can’t > get any response, and finally gets connection refused exception after the > shutdown completes (the logs are at the end of the mail). > > Does my opinion make sense? And how should I debug or fix this problem? > Any help would be appreciated, thank you very much! > > ``` client side > Commit Succeeded > > org.apache.flink.util.FlinkException: Could not cancel job > e3317308798e962270b95102a1afe3b7. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960) > at > org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583) > ... 9 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Could not complete the operation. Number of retries has been exhausted. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.CompletionException: > java.net.ConnectException: Connection refused: 10.191.56.77:27597 > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 16 more > Caused by: java.net.ConnectException: Connection refused: > 10.191.56.77:27597 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281) > ... 7 more > ``` > > ``` server side (skipped some debug level logs) > 2018-09-13 18:00:51,410 INFO > org.apache.flink.runtime.jobmaster.JobMaster - Savepoint > stored in > hdfs://horton/flink/horton/savepoints/e3317308798e962270b95102a1afe3b7/be7e952cff5344658dab4a8376d64742/savepoint-e33173-62bbcb9eb8a5. > Now cancelling e3317308798e962270b95102a1afe3b7. > 2018-09-13 18:00:51,410 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > state: OperatorState(operatorID: feca28aff5a3958840bee985ee7de4d3, > parallelism: 2, maxParallelism: 128, sub task states: 2, total size > (bytes): 11488) > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > LogStream-sigma-panama-panama_h48_serverlog_testing > (e3317308798e962270b95102a1afe3b7) switched from state RUNNING to > CANCELLING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from > RUNNING to CANCELING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from > RUNNING to CANCELING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from RUNNING > to CANCELING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from RUNNING > to CANCELING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: > LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from RUNNING > to CANCELING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: > LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from RUNNING > to CANCELING. > 2018-09-13 18:00:51,419 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,419 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex Source: Custom Source (2/2) - execution #1 to FAILED > while being CANCELED. > 2018-09-13 18:00:51,419 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{bc044e1b2a4f4560517001551b164afc}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,419 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,419 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex Source: KafkaSource (1/2) - execution #1 to FAILED > while being CANCELED. > 2018-09-13 18:00:51,419 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{d992569e383e1d991c7b4434510e1bde}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,420 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,420 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex Source: Custom Source (1/2) - execution #1 to FAILED > while being CANCELED. > 2018-09-13 18:00:51,420 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{76ec5acfc56cdfccb7af720fc31af31a}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,420 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,421 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex Source: KafkaSource (2/2) - execution #1 to FAILED > while being CANCELED. > 2018-09-13 18:00:51,421 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{176fa66fe26049ead6813fcb313ac7e9}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,425 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: > LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> > Log2JsonMap -> Sink: LogKafkaSink (1/2) - execution #1 to FAILED while > being CANCELED. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{4ee2fdd667af29402f86ef0afa090e25}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{301a0fd32ef0f4e0001973cc96f05526}] because: Release > multi task slot because all children have been released. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Adding > returned slot [AllocationID{e64791bd80fb2c37b9ee1448466330e1}] to available > slots > 2018-09-13 18:00:51,425 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: > LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> > Log2JsonMap -> Sink: LogKafkaSink (2/2) - execution #1 to FAILED while > being CANCELED. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{7c3e3b9fb6f6ea9b20aa63bf6af5b12e}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,425 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > LogStream-sigma-panama-panama_h48_serverlog_testing > (e3317308798e962270b95102a1afe3b7) switched from state CANCELLING to > CANCELED. > 2018-09-13 18:00:51,425 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job e3317308798e962270b95102a1afe3b7. > 2018-09-13 18:00:51,426 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - > Shutting down > 2018-09-13 18:00:51,428 INFO > org.apache.flink.runtime.dispatcher.MiniDispatcher - Job > e3317308798e962270b95102a1afe3b7 reached globally terminal state CANCELED. > 2018-09-13 18:00:51,769 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shut down > and terminate YarnJobClusterEntrypoint with return code 1444 and > application status CANCELED. > 2018-09-13 18:00:51,769 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Stopping > YarnJobClusterEntrypoint. > 2018-09-13 18:00:51,769 INFO org.apache.flink.yarn.YarnResourceManager > - Shut down cluster because application is in CANCELED, > diagnostics null. > 2018-09-13 18:00:51,769 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. > 2018-09-13 18:00:51,770 INFO org.apache.flink.yarn.YarnResourceManager > - Unregister application from the YARN Resource Manager > with final status KILLED. > 2018-09-13 18:00:51,775 INFO > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for > application to be successfully unregistered. > 2018-09-13 18:00:51,789 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-09-13 18:00:51,789 DEBUG > org.apache.flink.runtime.jobmaster.JobMaster - Close > ResourceManager connection 5eeea30d3582d0bfba2d0ec58f4da18c. > org.apache.flink.util.FlinkException: JobManager is shutting down. > at > org.apache.flink.runtime.jobmaster.JobMaster.postStop(JobMaster.java:365) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.postStop(FencedAkkaRpcActor.java:40) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) > at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:374) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2018-09-13 18:00:51,789 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcActor - The rpc > endpoint org.apache.flink.runtime.jobmaster.slotpool.SlotPool has not been > started yet. Discarding message > org.apache.flink.runtime.rpc.messages.LocalRpcInvocation until processing > is started. > 2018-09-13 18:00:51,794 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-09-13 18:00:52,890 INFO > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl > - backgroundOperationsLoop exiting > 2018-09-13 18:00:52,922 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - > Session: 0x16324f962f69800 closed > 2018-09-13 18:00:52,923 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - > EventThread shut down for session: 0x16324f962f69800 > 2018-09-13 18:00:52,923 INFO > org.apache.flink.runtime.blob.TransientBlobCache - Shutting > down BLOB cache > 2018-09-13 18:00:52,932 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping > Akka RPC service. > 2018-09-13 18:00:52,935 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting > down remote daemon. > 2018-09-13 18:00:52,936 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote > daemon shut down; proceeding with flushing remote transports. > 2018-09-13 18:00:52,960 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting > shut down. > > ``` > Best, > Paul Lam