Hi Paul, As far as writing programs are they are a few lines of code:
*CompletableFuture<String> savepointPath = client.triggerSavepoint();* *savepointPath.get(); //block until the savepoint completed* *client.cancel();* *Please note that this is just an example, not a real program, and there may be deviations.* Thanks, vino. Paul Lam <paullin3...@gmail.com> 于2018年9月14日周五 下午12:26写道: > Hi vino, > > Thank you for the helpful information! > > One more question, are these operations supposed to run concurrently to > ensure JobManager receives the cancel request before the savepoint is > completed? > > Best, > Paul lam > > 在 2018年9月14日,11:48,vino yang <yanghua1...@gmail.com> 写道: > > Hi Paul, > > It does not affect anything. It simply splits the two operations that can > be connected together into two separate operations. > The cancel operation will not be triggered until the savepoint operation > is completed. > > Thanks, vino. > > Paul Lam <paullin3...@gmail.com> 于2018年9月14日周五 上午11:12写道: > >> Hi Devin, >> >> Thanks for the reply! It seems like I missed an important thread. >> >> @vino mentioned a solution that is splitting the cancel-with-savepoint >> operation into two separated operations, and I wonder if it breaks the end >> to end exactly-once semantics in case of a at-least-once sink? Thanks a lot! >> >> Best, >> Paul Lam >> >> >> 在 2018年9月14日,10:49,devinduan(段丁瑞) <devind...@tencent.com> 写道: >> >> Hi, Paul >> https://issues.apache.org/jira/browse/FLINK-10309 >> >> >> *发件人:* Paul Lam <paullin3...@gmail.com> >> *发送时间:* 2018-09-14 10:41 >> *收件人:* user <user@flink.apache.org> >> *主题:* Client failed to get cancel with savepoint response(Internet mail) >> Hi, >> >> I've been experiencing an issue that the client frontend is likely unable >> to get cancel-with-savepoint responses from JobManager sometimes. >> >> I’m using Flink 1.5.3 on YARN cluster, and when I execute cancel with >> savepoint command (FsStataBackend backed by HDFS), normally it finishes in >> seconds, but sometimes it takes unexpectedly long and ends up with` >> RetryException: Could not complete the operation. Number of retries has >> been exhausted.` However, the server side logs are just like normal, the >> savepoint is stored fine, and the job is also canceled. >> >> Haven’t looked into the codes yet, but I suspect the response was lost >> but the JobManager was still shutting down, so when the client retries it >> can’t get any response, and finally gets connection refused exception after >> the shutdown completes (the logs are at the end of the mail). >> >> Does my opinion make sense? And how should I debug or fix this problem? >> Any help would be appreciated, thank you very much! >> >> ``` client side >> Commit Succeeded >> >> org.apache.flink.util.FlinkException: Could not cancel job >> e3317308798e962270b95102a1afe3b7. >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585) >> at >> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960) >> at >> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577) >> at >> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101) >> at java.security.AccessController.doPrivileged(Native Method) >> at javax.security.auth.Subject.doAs(Subject.java:422) >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) >> at >> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> at >> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101) >> Caused by: java.util.concurrent.ExecutionException: >> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >> complete the operation. Number of retries has been exhausted. >> at >> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357) >> at >> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895) >> at >> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398) >> at >> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583) >> ... 9 more >> Caused by: >> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not >> complete the operation. Number of retries has been exhausted. >> at >> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213) >> at >> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760) >> at >> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736) >> at >> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474) >> at >> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977) >> at >> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274) >> at >> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680) >> at >> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603) >> at >> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563) >> at >> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354) >> at >> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) >> at >> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.util.concurrent.CompletionException: >> java.net.ConnectException: Connection refused: 10.191.56.77:27597 >> at >> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292) >> at >> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308) >> at >> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943) >> at >> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926) >> ... 16 more >> Caused by: java.net.ConnectException: Connection refused: >> 10.191.56.77:27597 >> at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) >> at >> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224) >> at >> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281) >> ... 7 more >> ``` >> >> ``` server side (skipped some debug level logs) >> 2018-09-13 18:00:51,410 INFO >> org.apache.flink.runtime.jobmaster.JobMaster - Savepoint >> stored in >> hdfs://horton/flink/horton/savepoints/e3317308798e962270b95102a1afe3b7/be7e952cff5344658dab4a8376d64742/savepoint-e33173-62bbcb9eb8a5. >> Now cancelling e3317308798e962270b95102a1afe3b7. >> 2018-09-13 18:00:51,410 DEBUG >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint >> state: OperatorState(operatorID: feca28aff5a3958840bee985ee7de4d3, >> parallelism: 2, maxParallelism: 128, sub task states: 2, total size >> (bytes): 11488) >> 2018-09-13 18:00:51,411 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >> LogStream-sigma-panama-panama_h48_serverlog_testing >> (e3317308798e962270b95102a1afe3b7) switched from state RUNNING to >> CANCELLING. >> 2018-09-13 18:00:51,411 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: >> Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from >> RUNNING to CANCELING. >> 2018-09-13 18:00:51,411 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: >> Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from >> RUNNING to CANCELING. >> 2018-09-13 18:00:51,411 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: >> KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from RUNNING >> to CANCELING. >> 2018-09-13 18:00:51,411 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: >> KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from RUNNING >> to CANCELING. >> 2018-09-13 18:00:51,411 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: >> LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from RUNNING >> to CANCELING. >> 2018-09-13 18:00:51,411 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: >> LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from RUNNING >> to CANCELING. >> 2018-09-13 18:00:51,419 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: >> Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from >> CANCELING to CANCELED. >> 2018-09-13 18:00:51,419 DEBUG >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring >> transition of vertex Source: Custom Source (2/2) - execution #1 to FAILED >> while being CANCELED. >> 2018-09-13 18:00:51,419 DEBUG >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing >> slot [SlotRequestId{bc044e1b2a4f4560517001551b164afc}] because: Slot is >> being returned to the SlotPool. >> 2018-09-13 18:00:51,419 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: >> KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from >> CANCELING to CANCELED. >> 2018-09-13 18:00:51,419 DEBUG >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring >> transition of vertex Source: KafkaSource (1/2) - execution #1 to FAILED >> while being CANCELED. >> 2018-09-13 18:00:51,419 DEBUG >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing >> slot [SlotRequestId{d992569e383e1d991c7b4434510e1bde}] because: Slot is >> being returned to the SlotPool. >> 2018-09-13 18:00:51,420 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: >> Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from >> CANCELING to CANCELED. >> 2018-09-13 18:00:51,420 DEBUG >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring >> transition of vertex Source: Custom Source (1/2) - execution #1 to FAILED >> while being CANCELED. >> 2018-09-13 18:00:51,420 DEBUG >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing >> slot [SlotRequestId{76ec5acfc56cdfccb7af720fc31af31a}] because: Slot is >> being returned to the SlotPool. >> 2018-09-13 18:00:51,420 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: >> KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from >> CANCELING to CANCELED. >> 2018-09-13 18:00:51,421 DEBUG >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring >> transition of vertex Source: KafkaSource (2/2) - execution #1 to FAILED >> while being CANCELED. >> 2018-09-13 18:00:51,421 DEBUG >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing >> slot [SlotRequestId{176fa66fe26049ead6813fcb313ac7e9}] because: Slot is >> being returned to the SlotPool. >> 2018-09-13 18:00:51,425 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: >> LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from >> CANCELING to CANCELED. >> 2018-09-13 18:00:51,425 DEBUG >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring >> transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> >> Log2JsonMap -> Sink: LogKafkaSink (1/2) - execution #1 to FAILED while >> being CANCELED. >> 2018-09-13 18:00:51,425 DEBUG >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing >> slot [SlotRequestId{4ee2fdd667af29402f86ef0afa090e25}] because: Slot is >> being returned to the SlotPool. >> 2018-09-13 18:00:51,425 DEBUG >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing >> slot [SlotRequestId{301a0fd32ef0f4e0001973cc96f05526}] because: Release >> multi task slot because all children have been released. >> 2018-09-13 18:00:51,425 DEBUG >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Adding >> returned slot [AllocationID{e64791bd80fb2c37b9ee1448466330e1}] to available >> slots >> 2018-09-13 18:00:51,425 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: >> LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from >> CANCELING to CANCELED. >> 2018-09-13 18:00:51,425 DEBUG >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Ignoring >> transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> >> Log2JsonMap -> Sink: LogKafkaSink (2/2) - execution #1 to FAILED while >> being CANCELED. >> 2018-09-13 18:00:51,425 DEBUG >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Releasing >> slot [SlotRequestId{7c3e3b9fb6f6ea9b20aa63bf6af5b12e}] because: Slot is >> being returned to the SlotPool. >> 2018-09-13 18:00:51,425 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >> LogStream-sigma-panama-panama_h48_serverlog_testing >> (e3317308798e962270b95102a1afe3b7) switched from state CANCELLING to >> CANCELED. >> 2018-09-13 18:00:51,425 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping >> checkpoint coordinator for job e3317308798e962270b95102a1afe3b7. >> 2018-09-13 18:00:51,426 INFO >> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore - >> Shutting down >> 2018-09-13 18:00:51,428 INFO >> org.apache.flink.runtime.dispatcher.MiniDispatcher - Job >> e3317308798e962270b95102a1afe3b7 reached globally terminal state CANCELED. >> 2018-09-13 18:00:51,769 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Shut down >> and terminate YarnJobClusterEntrypoint with return code 1444 and >> application status CANCELED. >> 2018-09-13 18:00:51,769 INFO >> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Stopping >> YarnJobClusterEntrypoint. >> 2018-09-13 18:00:51,769 INFO >> org.apache.flink.yarn.YarnResourceManager - Shut down >> cluster because application is in CANCELED, diagnostics null. >> 2018-09-13 18:00:51,769 INFO >> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - >> Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >> 2018-09-13 18:00:51,770 INFO >> org.apache.flink.yarn.YarnResourceManager - Unregister >> application from the YARN Resource Manager with final status KILLED. >> 2018-09-13 18:00:51,775 INFO >> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl - Waiting for >> application to be successfully unregistered. >> 2018-09-13 18:00:51,789 INFO >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Suspending >> SlotPool. >> 2018-09-13 18:00:51,789 DEBUG >> org.apache.flink.runtime.jobmaster.JobMaster - Close >> ResourceManager connection 5eeea30d3582d0bfba2d0ec58f4da18c. >> org.apache.flink.util.FlinkException: JobManager is shutting down. >> at >> org.apache.flink.runtime.jobmaster.JobMaster.postStop(JobMaster.java:365) >> at >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105) >> at >> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.postStop(FencedAkkaRpcActor.java:40) >> at akka.actor.Actor$class.aroundPostStop(Actor.scala:515) >> at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95) >> at >> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210) >> at >> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172) >> at akka.actor.ActorCell.terminate(ActorCell.scala:374) >> at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467) >> at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483) >> at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260) >> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >> at >> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> 2018-09-13 18:00:51,789 INFO >> org.apache.flink.runtime.rpc.akka.AkkaRpcActor - The rpc >> endpoint org.apache.flink.runtime.jobmaster.slotpool.SlotPool has not been >> started yet. Discarding message >> org.apache.flink.runtime.rpc.messages.LocalRpcInvocation until processing >> is started. >> 2018-09-13 18:00:51,794 INFO >> org.apache.flink.runtime.jobmaster.slotpool.SlotPool - Stopping >> SlotPool. >> 2018-09-13 18:00:52,890 INFO >> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl >> - backgroundOperationsLoop exiting >> 2018-09-13 18:00:52,922 INFO >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper - >> Session: 0x16324f962f69800 closed >> 2018-09-13 18:00:52,923 INFO >> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - >> EventThread shut down for session: 0x16324f962f69800 >> 2018-09-13 18:00:52,923 INFO >> org.apache.flink.runtime.blob.TransientBlobCache - Shutting >> down BLOB cache >> 2018-09-13 18:00:52,932 INFO >> org.apache.flink.runtime.rpc.akka.AkkaRpcService - Stopping >> Akka RPC service. >> 2018-09-13 18:00:52,935 INFO >> akka.remote.RemoteActorRefProvider$RemotingTerminator - Shutting >> down remote daemon. >> 2018-09-13 18:00:52,936 INFO >> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remote >> daemon shut down; proceeding with flushing remote transports. >> 2018-09-13 18:00:52,960 INFO >> akka.remote.RemoteActorRefProvider$RemotingTerminator - Remoting >> shut down. >> >> ``` >> Best, >> Paul Lam >> >> >> >