Hi Paul,

As far as writing programs are they are a few lines of code:

*CompletableFuture<String> savepointPath = client.triggerSavepoint();*
*savepointPath.get(); //block until the savepoint completed*
*client.cancel();*


*Please note that this is just an example, not a real program, and there
may be deviations.*

Thanks, vino.

Paul Lam <paullin3...@gmail.com> 于2018年9月14日周五 下午12:26写道:

> Hi vino,
>
> Thank you for the helpful information!
>
> One more question, are these operations supposed to run concurrently to
> ensure JobManager receives the cancel request before the savepoint is
> completed?
>
> Best,
> Paul lam
>
> 在 2018年9月14日,11:48,vino yang <yanghua1...@gmail.com> 写道:
>
> Hi Paul,
>
> It does not affect anything. It simply splits the two operations that can
> be connected together into two separate operations.
> The cancel operation will not be triggered until the savepoint operation
> is completed.
>
> Thanks, vino.
>
> Paul Lam <paullin3...@gmail.com> 于2018年9月14日周五 上午11:12写道:
>
>> Hi Devin,
>>
>> Thanks for the reply! It seems like I missed an important thread.
>>
>> @vino mentioned a solution that is splitting the cancel-with-savepoint
>> operation into two separated operations, and I wonder if it breaks the end
>> to end exactly-once semantics in case of a at-least-once sink? Thanks a lot!
>>
>> Best,
>> Paul Lam
>>
>>
>> 在 2018年9月14日,10:49,devinduan(段丁瑞) <devind...@tencent.com> 写道:
>>
>> Hi, Paul
>>    https://issues.apache.org/jira/browse/FLINK-10309
>>
>>
>> *发件人:* Paul Lam <paullin3...@gmail.com>
>> *发送时间:* 2018-09-14 10:41
>> *收件人:* user <user@flink.apache.org>
>> *主题:* Client failed to get cancel with savepoint response(Internet mail)
>> Hi,
>>
>> I've been experiencing an issue that the client frontend is likely unable
>> to get cancel-with-savepoint responses from JobManager sometimes.
>>
>>  I’m using Flink 1.5.3 on YARN cluster, and when I execute cancel with
>> savepoint command (FsStataBackend backed by HDFS), normally it finishes in
>> seconds, but sometimes it takes unexpectedly long and ends up with`
>> RetryException: Could not complete the operation. Number of retries has
>> been exhausted.` However, the server side logs are just like normal, the
>> savepoint is stored fine, and the job is also canceled.
>>
>> Haven’t looked into the codes yet, but I suspect the response was lost
>> but the JobManager was still shutting down, so when the client retries it
>> can’t get any response, and finally gets connection refused exception after
>> the shutdown completes (the logs are at the end of the mail).
>>
>> Does my opinion make sense? And how should I debug or fix this problem?
>> Any help would be appreciated, thank you very much!
>>
>> ``` client side
>> Commit Succeeded
>>
>> org.apache.flink.util.FlinkException: Could not cancel job
>> e3317308798e962270b95102a1afe3b7.
>>         at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>>         at
>> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>>         at
>> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>>         at
>> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>>         at
>> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
>>         at java.security.AccessController.doPrivileged(Native Method)
>>         at javax.security.auth.Subject.doAs(Subject.java:422)
>>         at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>>         at
>> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>>         at
>> org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
>> Caused by: java.util.concurrent.ExecutionException:
>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
>> complete the operation. Number of retries has been exhausted.
>>         at
>> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>>         at
>> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>>         at
>> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
>>         at
>> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
>>         ... 9 more
>> Caused by:
>> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not
>> complete the operation. Number of retries has been exhausted.
>>         at
>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
>>         at
>> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>>         at
>> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>>         at
>> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>>         at
>> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>>         at
>> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>>         at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.util.concurrent.CompletionException:
>> java.net.ConnectException: Connection refused: 10.191.56.77:27597
>>         at
>> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>>         at
>> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>>         at
>> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>>         at
>> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>>         ... 16 more
>> Caused by: java.net.ConnectException: Connection refused:
>> 10.191.56.77:27597
>>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>         at
>> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>>         at
>> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
>>         ... 7 more
>> ```
>>
>> ``` server side (skipped some debug level logs)
>> 2018-09-13 18:00:51,410 INFO
>> org.apache.flink.runtime.jobmaster.JobMaster                  - Savepoint
>> stored in
>> hdfs://horton/flink/horton/savepoints/e3317308798e962270b95102a1afe3b7/be7e952cff5344658dab4a8376d64742/savepoint-e33173-62bbcb9eb8a5.
>> Now cancelling e3317308798e962270b95102a1afe3b7.
>> 2018-09-13 18:00:51,410 DEBUG
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint
>> state: OperatorState(operatorID: feca28aff5a3958840bee985ee7de4d3,
>> parallelism: 2, maxParallelism: 128, sub task states: 2, total size
>> (bytes): 11488)
>> 2018-09-13 18:00:51,411 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>> LogStream-sigma-panama-panama_h48_serverlog_testing
>> (e3317308798e962270b95102a1afe3b7) switched from state RUNNING to
>> CANCELLING.
>> 2018-09-13 18:00:51,411 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>> Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from
>> RUNNING to CANCELING.
>> 2018-09-13 18:00:51,411 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>> Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from
>> RUNNING to CANCELING.
>> 2018-09-13 18:00:51,411 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>> KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from RUNNING
>> to CANCELING.
>> 2018-09-13 18:00:51,411 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>> KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from RUNNING
>> to CANCELING.
>> 2018-09-13 18:00:51,411 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink:
>> LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from RUNNING
>> to CANCELING.
>> 2018-09-13 18:00:51,411 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink:
>> LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from RUNNING
>> to CANCELING.
>> 2018-09-13 18:00:51,419 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>> Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from
>> CANCELING to CANCELED.
>> 2018-09-13 18:00:51,419 DEBUG
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
>> transition of vertex Source: Custom Source (2/2) - execution #1 to FAILED
>> while being CANCELED.
>> 2018-09-13 18:00:51,419 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
>> slot [SlotRequestId{bc044e1b2a4f4560517001551b164afc}] because: Slot is
>> being returned to the SlotPool.
>> 2018-09-13 18:00:51,419 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>> KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from
>> CANCELING to CANCELED.
>> 2018-09-13 18:00:51,419 DEBUG
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
>> transition of vertex Source: KafkaSource (1/2) - execution #1 to FAILED
>> while being CANCELED.
>> 2018-09-13 18:00:51,419 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
>> slot [SlotRequestId{d992569e383e1d991c7b4434510e1bde}] because: Slot is
>> being returned to the SlotPool.
>> 2018-09-13 18:00:51,420 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>> Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from
>> CANCELING to CANCELED.
>> 2018-09-13 18:00:51,420 DEBUG
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
>> transition of vertex Source: Custom Source (1/2) - execution #1 to FAILED
>> while being CANCELED.
>> 2018-09-13 18:00:51,420 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
>> slot [SlotRequestId{76ec5acfc56cdfccb7af720fc31af31a}] because: Slot is
>> being returned to the SlotPool.
>> 2018-09-13 18:00:51,420 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source:
>> KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from
>> CANCELING to CANCELED.
>> 2018-09-13 18:00:51,421 DEBUG
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
>> transition of vertex Source: KafkaSource (2/2) - execution #1 to FAILED
>> while being CANCELED.
>> 2018-09-13 18:00:51,421 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
>> slot [SlotRequestId{176fa66fe26049ead6813fcb313ac7e9}] because: Slot is
>> being returned to the SlotPool.
>> 2018-09-13 18:00:51,425 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink:
>> LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from
>> CANCELING to CANCELED.
>> 2018-09-13 18:00:51,425 DEBUG
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
>> transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap ->
>> Log2JsonMap -> Sink: LogKafkaSink (1/2) - execution #1 to FAILED while
>> being CANCELED.
>> 2018-09-13 18:00:51,425 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
>> slot [SlotRequestId{4ee2fdd667af29402f86ef0afa090e25}] because: Slot is
>> being returned to the SlotPool.
>> 2018-09-13 18:00:51,425 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
>> slot [SlotRequestId{301a0fd32ef0f4e0001973cc96f05526}] because: Release
>> multi task slot because all children have been released.
>> 2018-09-13 18:00:51,425 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Adding
>> returned slot [AllocationID{e64791bd80fb2c37b9ee1448466330e1}] to available
>> slots
>> 2018-09-13 18:00:51,425 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        -
>> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink:
>> LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from
>> CANCELING to CANCELED.
>> 2018-09-13 18:00:51,425 DEBUG
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring
>> transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap ->
>> Log2JsonMap -> Sink: LogKafkaSink (2/2) - execution #1 to FAILED while
>> being CANCELED.
>> 2018-09-13 18:00:51,425 DEBUG
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing
>> slot [SlotRequestId{7c3e3b9fb6f6ea9b20aa63bf6af5b12e}] because: Slot is
>> being returned to the SlotPool.
>> 2018-09-13 18:00:51,425 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>> LogStream-sigma-panama-panama_h48_serverlog_testing
>> (e3317308798e962270b95102a1afe3b7) switched from state CANCELLING to
>> CANCELED.
>> 2018-09-13 18:00:51,425 INFO
>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping
>> checkpoint coordinator for job e3317308798e962270b95102a1afe3b7.
>> 2018-09-13 18:00:51,426 INFO
>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  -
>> Shutting down
>> 2018-09-13 18:00:51,428 INFO
>> org.apache.flink.runtime.dispatcher.MiniDispatcher            - Job
>> e3317308798e962270b95102a1afe3b7 reached globally terminal state CANCELED.
>> 2018-09-13 18:00:51,769 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shut down
>> and terminate YarnJobClusterEntrypoint with return code 1444 and
>> application status CANCELED.
>> 2018-09-13 18:00:51,769 INFO
>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Stopping
>> YarnJobClusterEntrypoint.
>> 2018-09-13 18:00:51,769 INFO
>> org.apache.flink.yarn.YarnResourceManager                     - Shut down
>> cluster because application is in CANCELED, diagnostics null.
>> 2018-09-13 18:00:51,769 INFO
>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>> Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>> 2018-09-13 18:00:51,770 INFO
>> org.apache.flink.yarn.YarnResourceManager                     - Unregister
>> application from the YARN Resource Manager with final status KILLED.
>> 2018-09-13 18:00:51,775 INFO
>> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Waiting for
>> application to be successfully unregistered.
>> 2018-09-13 18:00:51,789 INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Suspending
>> SlotPool.
>> 2018-09-13 18:00:51,789 DEBUG
>> org.apache.flink.runtime.jobmaster.JobMaster                  - Close
>> ResourceManager connection 5eeea30d3582d0bfba2d0ec58f4da18c.
>> org.apache.flink.util.FlinkException: JobManager is shutting down.
>>     at
>> org.apache.flink.runtime.jobmaster.JobMaster.postStop(JobMaster.java:365)
>>     at
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
>>     at
>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.postStop(FencedAkkaRpcActor.java:40)
>>     at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
>>     at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
>>     at
>> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>>     at
>> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>>     at akka.actor.ActorCell.terminate(ActorCell.scala:374)
>>     at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
>>     at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
>>     at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>     at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>     at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>> 2018-09-13 18:00:51,789 INFO
>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor                - The rpc
>> endpoint org.apache.flink.runtime.jobmaster.slotpool.SlotPool has not been
>> started yet. Discarding message
>> org.apache.flink.runtime.rpc.messages.LocalRpcInvocation until processing
>> is started.
>> 2018-09-13 18:00:51,794 INFO
>> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Stopping
>> SlotPool.
>> 2018-09-13 18:00:52,890 INFO
>> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>> - backgroundOperationsLoop exiting
>> 2018-09-13 18:00:52,922 INFO
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  -
>> Session: 0x16324f962f69800 closed
>> 2018-09-13 18:00:52,923 INFO
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
>> EventThread shut down for session: 0x16324f962f69800
>> 2018-09-13 18:00:52,923 INFO
>> org.apache.flink.runtime.blob.TransientBlobCache              - Shutting
>> down BLOB cache
>> 2018-09-13 18:00:52,932 INFO
>> org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping
>> Akka RPC service.
>> 2018-09-13 18:00:52,935 INFO
>> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting
>> down remote daemon.
>> 2018-09-13 18:00:52,936 INFO
>> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote
>> daemon shut down; proceeding with flushing remote transports.
>> 2018-09-13 18:00:52,960 INFO
>> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting
>> shut down.
>>
>> ```
>> Best,
>> Paul Lam
>>
>>
>>
>

Reply via email to