Hi Paul, It does not affect anything. It simply splits the two operations that can be connected together into two separate operations. The cancel operation will not be triggered until the savepoint operation is completed.
Thanks, vino. Paul Lam <paullin3...@gmail.com> 于2018年9月14日周五 上午11:12写道: > Hi Devin, > > Thanks for the reply! It seems like I missed an important thread. > > @vino mentioned a solution that is splitting the cancel-with-savepoint > operation into two separated operations, and I wonder if it breaks the end > to end exactly-once semantics in case of a at-least-once sink? Thanks a lot! > > Best, > Paul Lam > > > 在 2018年9月14日,10:49,devinduan(段丁瑞) <devind...@tencent.com> 写道: > > Hi, Paul > https://issues.apache.org/jira/browse/FLINK-10309 > > > *发件人:* Paul Lam <paullin3...@gmail.com> > *发送时间:* 2018-09-14 10:41 > *收件人:* user <user@flink.apache.org> > *主题:* Client failed to get cancel with savepoint response(Internet mail) > Hi, > > I've been experiencing an issue that the client frontend is likely unable > to get cancel-with-savepoint responses from JobManager sometimes. > > I’m using Flink 1.5.3 on YARN cluster, and when I execute cancel with > savepoint command (FsStataBackend backed by HDFS), normally it finishes in > seconds, but sometimes it takes unexpectedly long and ends up with` > RetryException: Could not complete the operation. Number of retries has > been exhausted.` However, the server side logs are just like normal, the > savepoint is stored fine, and the job is also canceled. > > Haven’t looked into the codes yet, but I suspect the response was lost but > the JobManager was still shutting down, so when the client retries it can’t > get any response, and finally gets connection refused exception after the > shutdown completes (the logs are at the end of the mail). > > Does my opinion make sense? And how should I debug or fix this problem? > Any help would be appreciated, thank you very much! > > ``` client side > Commit Succeeded > > org.apache.flink.util.FlinkException: Could not cancel job > e3317308798e962270b95102a1afe3b7. > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585) > at > org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960) > at > org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577) > at > org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034) > at > org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) > at > org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) > at > org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) > Caused by: java.util.concurrent.ExecutionException: > org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not > complete the operation. Number of retries has been exhausted. > at > java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) > at > java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) > at > org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398) > at > org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583) > ... 9 more > Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: > Could not complete the operation. Number of retries has been exhausted. > at > org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213) > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) > at > java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) > at > org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) > at > org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.util.concurrent.CompletionException: > java.net.ConnectException: Connection refused: 10.191.56.77:27597 > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) > ... 16 more > Caused by: java.net.ConnectException: Connection refused: > 10.191.56.77:27597 > at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) > at > sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) > at > org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) > at > org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281) > ... 7 more > ``` > > ``` server side (skipped some debug level logs) > 2018-09-13 18:00:51,410 INFO > org.apache.flink.runtime.jobmaster.JobMaster - Savepoint > stored in > hdfs://horton/flink/horton/savepoints/e3317308798e962270b95102a1afe3b7/be7e952cff5344658dab4a8376d64742/savepoint-e33173-62bbcb9eb8a5. > Now cancelling e3317308798e962270b95102a1afe3b7. > 2018-09-13 18:00:51,410 DEBUG > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > state: OperatorState(operatorID: feca28aff5a3958840bee985ee7de4d3, > parallelism: 2, maxParallelism: 128, sub task states: 2, total size > (bytes): 11488) > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > LogStream-sigma-panama-panama_h48_serverlog_testing > (e3317308798e962270b95102a1afe3b7) switched from state RUNNING to > CANCELLING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from > RUNNING to CANCELING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from > RUNNING to CANCELING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from RUNNING > to CANCELING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from RUNNING > to CANCELING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: > LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from RUNNING > to CANCELING. > 2018-09-13 18:00:51,411 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: > LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from RUNNING > to CANCELING. > 2018-09-13 18:00:51,419 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,419 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex Source: Custom Source (2/2) - execution #1 to FAILED > while being CANCELED. > 2018-09-13 18:00:51,419 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{bc044e1b2a4f4560517001551b164afc}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,419 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,419 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex Source: KafkaSource (1/2) - execution #1 to FAILED > while being CANCELED. > 2018-09-13 18:00:51,419 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{d992569e383e1d991c7b4434510e1bde}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,420 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,420 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex Source: Custom Source (1/2) - execution #1 to FAILED > while being CANCELED. > 2018-09-13 18:00:51,420 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{76ec5acfc56cdfccb7af720fc31af31a}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,420 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: > KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,421 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex Source: KafkaSource (2/2) - execution #1 to FAILED > while being CANCELED. > 2018-09-13 18:00:51,421 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{176fa66fe26049ead6813fcb313ac7e9}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,425 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: > LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> > Log2JsonMap -> Sink: LogKafkaSink (1/2) - execution #1 to FAILED while > being CANCELED. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{4ee2fdd667af29402f86ef0afa090e25}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{301a0fd32ef0f4e0001973cc96f05526}] because: Release > multi task slot because all children have been released. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Adding > returned slot [AllocationID{e64791bd80fb2c37b9ee1448466330e1}] to available > slots > 2018-09-13 18:00:51,425 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - > LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: > LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from > CANCELING to CANCELED. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring > transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> > Log2JsonMap -> Sink: LogKafkaSink (2/2) - execution #1 to FAILED while > being CANCELED. > 2018-09-13 18:00:51,425 DEBUG > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing > slot [SlotRequestId{7c3e3b9fb6f6ea9b20aa63bf6af5b12e}] because: Slot is > being returned to the SlotPool. > 2018-09-13 18:00:51,425 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > LogStream-sigma-panama-panama_h48_serverlog_testing > (e3317308798e962270b95102a1afe3b7) switched from state CANCELLING to > CANCELED. > 2018-09-13 18:00:51,425 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping > checkpoint coordinator for job e3317308798e962270b95102a1afe3b7. > 2018-09-13 18:00:51,426 INFO > org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - > Shutting down > 2018-09-13 18:00:51,428 INFO > org.apache.flink.runtime.dispatcher.MiniDispatcher - Job > e3317308798e962270b95102a1afe3b7 reached globally terminal state CANCELED. > 2018-09-13 18:00:51,769 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shut down > and terminate YarnJobClusterEntrypoint with return code 1444 and > application status CANCELED. > 2018-09-13 18:00:51,769 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Stopping > YarnJobClusterEntrypoint. > 2018-09-13 18:00:51,769 INFO > org.apache.flink.yarn.YarnResourceManager - Shut down > cluster because application is in CANCELED, diagnostics null. > 2018-09-13 18:00:51,769 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - > Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. > 2018-09-13 18:00:51,770 INFO > org.apache.flink.yarn.YarnResourceManager - Unregister > application from the YARN Resource Manager with final status KILLED. > 2018-09-13 18:00:51,775 INFO > org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for > application to be successfully unregistered. > 2018-09-13 18:00:51,789 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending > SlotPool. > 2018-09-13 18:00:51,789 DEBUG > org.apache.flink.runtime.jobmaster.JobMaster - Close > ResourceManager connection 5eeea30d3582d0bfba2d0ec58f4da18c. > org.apache.flink.util.FlinkException: JobManager is shutting down. > at > org.apache.flink.runtime.jobmaster.JobMaster.postStop(JobMaster.java:365) > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.postStop(FencedAkkaRpcActor.java:40) > at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) > at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) > at > akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) > at > akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) > at akka.actor.ActorCell.terminate(ActorCell.scala:374) > at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) > at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) > at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) > at akka.dispatch.Mailbox.run(Mailbox.scala:224) > at akka.dispatch.Mailbox.exec(Mailbox.scala:234) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 2018-09-13 18:00:51,789 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcActor - The rpc > endpoint org.apache.flink.runtime.jobmaster.slotpool.SlotPool has not been > started yet. Discarding message > org.apache.flink.runtime.rpc.messages.LocalRpcInvocation until processing > is started. > 2018-09-13 18:00:51,794 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping > SlotPool. > 2018-09-13 18:00:52,890 INFO > org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl > - backgroundOperationsLoop exiting > 2018-09-13 18:00:52,922 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - > Session: 0x16324f962f69800 closed > 2018-09-13 18:00:52,923 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - > EventThread shut down for session: 0x16324f962f69800 > 2018-09-13 18:00:52,923 INFO > org.apache.flink.runtime.blob.TransientBlobCache - Shutting > down BLOB cache > 2018-09-13 18:00:52,932 INFO > org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping > Akka RPC service. > 2018-09-13 18:00:52,935 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting > down remote daemon. > 2018-09-13 18:00:52,936 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote > daemon shut down; proceeding with flushing remote transports. > 2018-09-13 18:00:52,960 INFO > akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting > shut down. > > ``` > Best, > Paul Lam > > >