Hi Paul,

you're analysis is right. The JobManager does not wait for pending
operation results to be properly served. See
https://issues.apache.org/jira/browse/FLINK-10309 for more details. I think
a way to solve it is to wait for some time if the RestServerEndpoint still
has some responses to serve.

Cheers,
Till

On Fri, Sep 14, 2018 at 4:41 AM Paul Lam <paullin3...@gmail.com> wrote:

> Hi,
>
> I've been experiencing an issue that the client frontend is likely unable
> to get cancel-with-savepoint responses from JobManager sometimes.
>
>  I’m using Flink 1.5.3 on YARN cluster, and when I execute cancel with
> savepoint command (FsStataBackend backed by HDFS), normally it finishes in
> seconds, but sometimes it takes unexpectedly long and ends up with`
> RetryException: Could not complete the operation. Number of retries has
> been exhausted.` However, the server side logs are just like normal, the
> savepoint is stored fine, and the job is also canceled.
>
> Haven’t looked into the codes yet, but I suspect the response was lost but
> the JobManager was still shutting down, so when the client retries it can’t
> get any response, and finally gets connection refused exception after the
> shutdown completes (the logs are at the end of the mail).
>
> Does my opinion make sense? And how should I debug or fix this problem?
> Any help would be appreciated, thank you very much!
>
> ``` client side
> Commit Succeeded
>
> org.apache.flink.util.FlinkException: Could not cancel job
> e3317308798e962270b95102a1afe3b7.
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>         at
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>         at
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>         at
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>         at
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at
> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
> Caused by: java.util.concurrent.ExecutionException:
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
> complete the operation. Number of retries has been exhausted.
>         at
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
>         at
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
>         ... 9 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException:
> Could not complete the operation. Number of retries has been exhausted.
>         at
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
>         at
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>         at
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException:
> java.net.ConnectException: Connection refused: 10.191.56.77:27597
>         at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>         at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>         at
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>         at
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 16 more
> Caused by: java.net.ConnectException: Connection refused:
> 10.191.56.77:27597
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>         at
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
>         ... 7 more
> ```
>
> ``` server side (skipped some debug level logs)
> 2018-09-13 18:00:51,410 INFO
> org.apache.flink.runtime.jobmaster.JobMaster                  - Savepoint
> stored in
> hdfs://horton/flink/horton/savepoints/e3317308798e962270b95102a1afe3b7/be7e952cff5344658dab4a8376d64742/savepoint-e33173-62bbcb9eb8a5.
> Now cancelling e3317308798e962270b95102a1afe3b7.
> 2018-09-13 18:00:51,410 DEBUG
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint
> state: OperatorState(operatorID: feca28aff5a3958840bee985ee7de4d3,
> parallelism: 2, maxParallelism: 128, sub task states: 2, total size
> (bytes): 11488)
> 2018-09-13 18:00:51,411 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> LogStream-sigma-panama-panama_h48_serverlog_testing
> (e3317308798e962270b95102a1afe3b7) switched from state RUNNING to
> CANCELLING.
> 2018-09-13 18:00:51,411 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from
> RUNNING to CANCELING.
> 2018-09-13 18:00:51,411 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from
> RUNNING to CANCELING.
> 2018-09-13 18:00:51,411 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from RUNNING
> to CANCELING.
> 2018-09-13 18:00:51,411 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from RUNNING
> to CANCELING.
> 2018-09-13 18:00:51,411 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink:
> LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from RUNNING
> to CANCELING.
> 2018-09-13 18:00:51,411 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink:
> LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from RUNNING
> to CANCELING.
> 2018-09-13 18:00:51,419 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from
> CANCELING to CANCELED.
> 2018-09-13 18:00:51,419 DEBUG
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
> transition of vertex Source: Custom Source (2/2) - execution #1 to FAILED
> while being CANCELED.
> 2018-09-13 18:00:51,419 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
> slot [SlotRequestId{bc044e1b2a4f4560517001551b164afc}] because: Slot is
> being returned to the SlotPool.
> 2018-09-13 18:00:51,419 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from
> CANCELING to CANCELED.
> 2018-09-13 18:00:51,419 DEBUG
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
> transition of vertex Source: KafkaSource (1/2) - execution #1 to FAILED
> while being CANCELED.
> 2018-09-13 18:00:51,419 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
> slot [SlotRequestId{d992569e383e1d991c7b4434510e1bde}] because: Slot is
> being returned to the SlotPool.
> 2018-09-13 18:00:51,420 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from
> CANCELING to CANCELED.
> 2018-09-13 18:00:51,420 DEBUG
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
> transition of vertex Source: Custom Source (1/2) - execution #1 to FAILED
> while being CANCELED.
> 2018-09-13 18:00:51,420 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
> slot [SlotRequestId{76ec5acfc56cdfccb7af720fc31af31a}] because: Slot is
> being returned to the SlotPool.
> 2018-09-13 18:00:51,420 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
> KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from
> CANCELING to CANCELED.
> 2018-09-13 18:00:51,421 DEBUG
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
> transition of vertex Source: KafkaSource (2/2) - execution #1 to FAILED
> while being CANCELED.
> 2018-09-13 18:00:51,421 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
> slot [SlotRequestId{176fa66fe26049ead6813fcb313ac7e9}] because: Slot is
> being returned to the SlotPool.
> 2018-09-13 18:00:51,425 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink:
> LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from
> CANCELING to CANCELED.
> 2018-09-13 18:00:51,425 DEBUG
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
> transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap ->
> Log2JsonMap -> Sink: LogKafkaSink (1/2) - execution #1 to FAILED while
> being CANCELED.
> 2018-09-13 18:00:51,425 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
> slot [SlotRequestId{4ee2fdd667af29402f86ef0afa090e25}] because: Slot is
> being returned to the SlotPool.
> 2018-09-13 18:00:51,425 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
> slot [SlotRequestId{301a0fd32ef0f4e0001973cc96f05526}] because: Release
> multi task slot because all children have been released.
> 2018-09-13 18:00:51,425 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Adding
> returned slot [AllocationID{e64791bd80fb2c37b9ee1448466330e1}] to available
> slots
> 2018-09-13 18:00:51,425 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink:
> LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from
> CANCELING to CANCELED.
> 2018-09-13 18:00:51,425 DEBUG
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
> transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap ->
> Log2JsonMap -> Sink: LogKafkaSink (2/2) - execution #1 to FAILED while
> being CANCELED.
> 2018-09-13 18:00:51,425 DEBUG
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
> slot [SlotRequestId{7c3e3b9fb6f6ea9b20aa63bf6af5b12e}] because: Slot is
> being returned to the SlotPool.
> 2018-09-13 18:00:51,425 INFO
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
> LogStream-sigma-panama-panama_h48_serverlog_testing
> (e3317308798e962270b95102a1afe3b7) switched from state CANCELLING to
> CANCELED.
> 2018-09-13 18:00:51,425 INFO
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping
> checkpoint coordinator for job e3317308798e962270b95102a1afe3b7.
> 2018-09-13 18:00:51,426 INFO
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
> Shutting down
> 2018-09-13 18:00:51,428 INFO
> org.apache.flink.runtime.dispatcher.MiniDispatcher            - Job
> e3317308798e962270b95102a1afe3b7 reached globally terminal state CANCELED.
> 2018-09-13 18:00:51,769 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shut down
> and terminate YarnJobClusterEntrypoint with return code 1444 and
> application status CANCELED.
> 2018-09-13 18:00:51,769 INFO
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Stopping
> YarnJobClusterEntrypoint.
> 2018-09-13 18:00:51,769 INFO  org.apache.flink.yarn.YarnResourceManager
>                  - Shut down cluster because application is in CANCELED,
> diagnostics null.
> 2018-09-13 18:00:51,769 INFO
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
> Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> 2018-09-13 18:00:51,770 INFO  org.apache.flink.yarn.YarnResourceManager
>                  - Unregister application from the YARN Resource Manager
> with final status KILLED.
> 2018-09-13 18:00:51,775 INFO
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Waiting for
> application to be successfully unregistered.
> 2018-09-13 18:00:51,789 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Suspending
> SlotPool.
> 2018-09-13 18:00:51,789 DEBUG
> org.apache.flink.runtime.jobmaster.JobMaster                  - Close
> ResourceManager connection 5eeea30d3582d0bfba2d0ec58f4da18c.
> org.apache.flink.util.FlinkException: JobManager is shutting down.
>     at
> org.apache.flink.runtime.jobmaster.JobMaster.postStop(JobMaster.java:365)
>     at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
>     at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.postStop(FencedAkkaRpcActor.java:40)
>     at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
>     at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
>     at
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>     at
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>     at akka.actor.ActorCell.terminate(ActorCell.scala:374)
>     at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
>     at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
>     at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2018-09-13 18:00:51,789 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor                - The rpc
> endpoint org.apache.flink.runtime.jobmaster.slotpool.SlotPool has not been
> started yet. Discarding message
> org.apache.flink.runtime.rpc.messages.LocalRpcInvocation until processing
> is started.
> 2018-09-13 18:00:51,794 INFO
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Stopping
> SlotPool.
> 2018-09-13 18:00:52,890 INFO
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
> - backgroundOperationsLoop exiting
> 2018-09-13 18:00:52,922 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
> Session: 0x16324f962f69800 closed
> 2018-09-13 18:00:52,923 INFO
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> EventThread shut down for session: 0x16324f962f69800
> 2018-09-13 18:00:52,923 INFO
> org.apache.flink.runtime.blob.TransientBlobCache              - Shutting
> down BLOB cache
> 2018-09-13 18:00:52,932 INFO
> org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping
> Akka RPC service.
> 2018-09-13 18:00:52,935 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting
> down remote daemon.
> 2018-09-13 18:00:52,936 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote
> daemon shut down; proceeding with flushing remote transports.
> 2018-09-13 18:00:52,960 INFO
> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
> shut down.
>
> ```
> Best,
> Paul Lam

Reply via email to