Hi Devin, 

Thanks for the reply! It seems like I missed an important thread. 

@vino mentioned a solution that is splitting the cancel-with-savepoint 
operation into two separated operations, and I wonder if it breaks the end to 
end exactly-once semantics in case of a at-least-once sink? Thanks a lot!

Best,
Paul Lam


> 在 2018年9月14日,10:49,devinduan(段丁瑞) <devind...@tencent.com> 写道:
> 
> Hi, Paul
>    https://issues.apache.org/jira/browse/FLINK-10309 
> <https://issues.apache.org/jira/browse/FLINK-10309>
> 
>  
> 发件人: Paul Lam <mailto:paullin3...@gmail.com>
> 发送时间: 2018-09-14 10:41
> 收件人: user <mailto:user@flink.apache.org>
> 主题: Client failed to get cancel with savepoint response(Internet mail)
> Hi,
> 
> I've been experiencing an issue that the client frontend is likely unable to 
> get cancel-with-savepoint responses from JobManager sometimes.
> 
>  I’m using Flink 1.5.3 on YARN cluster, and when I execute cancel with 
> savepoint command (FsStataBackend backed by HDFS), normally it finishes in 
> seconds, but sometimes it takes unexpectedly long and ends up with` 
> RetryException: Could not complete the operation. Number of retries has been 
> exhausted.` However, the server side logs are just like normal, the savepoint 
> is stored fine, and the job is also canceled.
> 
> Haven’t looked into the codes yet, but I suspect the response was lost but 
> the JobManager was still shutting down, so when the client retries it can’t 
> get any response, and finally gets connection refused exception after the 
> shutdown completes (the logs are at the end of the mail).
> 
> Does my opinion make sense? And how should I debug or fix this problem? Any 
> help would be appreciated, thank you very much!
> 
> ``` client side
> Commit Succeeded
> 
> org.apache.flink.util.FlinkException: Could not cancel job 
> e3317308798e962270b95102a1afe3b7.
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:585)
>         at 
> org.apache.flink.client.cli.CliFrontend.runClusterAction(CliFrontend.java:960)
>         at 
> org.apache.flink.client.cli.CliFrontend.cancel(CliFrontend.java:577)
>         at 
> org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1034)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$main$9(CliFrontend.java:1101)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:422)
>         at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)
>         at 
> org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
>         at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1101)
> Caused by: java.util.concurrent.ExecutionException: 
> org.apache.flink.runtime.concurrent.FutureUtils$RetryException: Could not 
> complete the operation. Number of retries has been exhausted.
>         at 
> java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
>         at 
> java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1895)
>         at 
> org.apache.flink.client.program.rest.RestClusterClient.cancelWithSavepoint(RestClusterClient.java:398)
>         at 
> org.apache.flink.client.cli.CliFrontend.lambda$cancel$4(CliFrontend.java:583)
>         ... 9 more
> Caused by: org.apache.flink.runtime.concurrent.FutureUtils$RetryException: 
> Could not complete the operation. Number of retries has been exhausted.
>         at 
> org.apache.flink.runtime.concurrent.FutureUtils.lambda$retryOperationWithDelay$5(FutureUtils.java:213)
>         at 
> java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:760)
>         at 
> java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:736)
>         at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:474)
>         at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1977)
>         at 
> org.apache.flink.runtime.rest.RestClient.lambda$submitRequest$1(RestClient.java:274)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners0(DefaultPromise.java:603)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:563)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.fulfillConnectPromise(AbstractNioChannel.java:268)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:284)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:528)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
>         at 
> org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
>         at java.lang.Thread.run(Thread.java:748)
> Caused by: java.util.concurrent.CompletionException: 
> java.net.ConnectException: Connection refused: 10.191.56.77:27597
>         at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>         at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>         at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:943)
>         at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:926)
>         ... 16 more
> Caused by: java.net.ConnectException: Connection refused: 10.191.56.77:27597
>         at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>         at 
> sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:224)
>         at 
> org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:281)
>         ... 7 more
> ```
> 
> ``` server side (skipped some debug level logs)
> 2018-09-13 18:00:51,410 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>               - Savepoint stored in 
> hdfs://horton/flink/horton/savepoints/e3317308798e962270b95102a1afe3b7/be7e952cff5344658dab4a8376d64742/savepoint-e33173-62bbcb9eb8a5.
>  Now cancelling e3317308798e962270b95102a1afe3b7.
> 2018-09-13 18:00:51,410 DEBUG 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Checkpoint 
> state: OperatorState(operatorID: feca28aff5a3958840bee985ee7de4d3, 
> parallelism: 2, maxParallelism: 128, sub task states: 2, total size (bytes): 
> 11488)
> 2018-09-13 18:00:51,411 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job 
> LogStream-sigma-panama-panama_h48_serverlog_testing 
> (e3317308798e962270b95102a1afe3b7) switched from state RUNNING to CANCELLING.
> 2018-09-13 18:00:51,411 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from RUNNING 
> to CANCELING.
> 2018-09-13 18:00:51,411 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from RUNNING 
> to CANCELING.
> 2018-09-13 18:00:51,411 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from RUNNING to 
> CANCELING.
> 2018-09-13 18:00:51,411 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from RUNNING to 
> CANCELING.
> 2018-09-13 18:00:51,411 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: 
> LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from RUNNING 
> to CANCELING.
> 2018-09-13 18:00:51,411 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: 
> LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from RUNNING 
> to CANCELING.
> 2018-09-13 18:00:51,419 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> Custom Source (2/2) (d7b7be0353d6db13ccc01545a81e49f8) switched from 
> CANCELING to CANCELED.
> 2018-09-13 18:00:51,419 DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring 
> transition of vertex Source: Custom Source (2/2) - execution #1 to FAILED 
> while being CANCELED.
> 2018-09-13 18:00:51,419 DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing 
> slot [SlotRequestId{bc044e1b2a4f4560517001551b164afc}] because: Slot is being 
> returned to the SlotPool.
> 2018-09-13 18:00:51,419 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> KafkaSource (1/2) (3fdfeec06eab17e166939ad6b2c8de79) switched from CANCELING 
> to CANCELED.
> 2018-09-13 18:00:51,419 DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring 
> transition of vertex Source: KafkaSource (1/2) - execution #1 to FAILED while 
> being CANCELED.
> 2018-09-13 18:00:51,419 DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing 
> slot [SlotRequestId{d992569e383e1d991c7b4434510e1bde}] because: Slot is being 
> returned to the SlotPool.
> 2018-09-13 18:00:51,420 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> Custom Source (1/2) (7e8374fe5fb448a9efe2ada838ec89ef) switched from 
> CANCELING to CANCELED.
> 2018-09-13 18:00:51,420 DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring 
> transition of vertex Source: Custom Source (1/2) - execution #1 to FAILED 
> while being CANCELED.
> 2018-09-13 18:00:51,420 DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing 
> slot [SlotRequestId{76ec5acfc56cdfccb7af720fc31af31a}] because: Slot is being 
> returned to the SlotPool.
> 2018-09-13 18:00:51,420 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Source: 
> KafkaSource (2/2) (a0fc76872049e7bbcbf97fd27661ab24) switched from CANCELING 
> to CANCELED.
> 2018-09-13 18:00:51,421 DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring 
> transition of vertex Source: KafkaSource (2/2) - execution #1 to FAILED while 
> being CANCELED.
> 2018-09-13 18:00:51,421 DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing 
> slot [SlotRequestId{176fa66fe26049ead6813fcb313ac7e9}] because: Slot is being 
> returned to the SlotPool.
> 2018-09-13 18:00:51,425 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: 
> LogKafkaSink (1/2) (83845fb6e8f19f8ca81a56ab70ea3093) switched from CANCELING 
> to CANCELED.
> 2018-09-13 18:00:51,425 DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring 
> transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> 
> Log2JsonMap -> Sink: LogKafkaSink (1/2) - execution #1 to FAILED while being 
> CANCELED.
> 2018-09-13 18:00:51,425 DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing 
> slot [SlotRequestId{4ee2fdd667af29402f86ef0afa090e25}] because: Slot is being 
> returned to the SlotPool.
> 2018-09-13 18:00:51,425 DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing 
> slot [SlotRequestId{301a0fd32ef0f4e0001973cc96f05526}] because: Release multi 
> task slot because all children have been released.
> 2018-09-13 18:00:51,425 DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Adding 
> returned slot [AllocationID{e64791bd80fb2c37b9ee1448466330e1}] to available 
> slots
> 2018-09-13 18:00:51,425 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - 
> LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> Log2JsonMap -> Sink: 
> LogKafkaSink (2/2) (dc3dc6a9cb3177960a98ae57beeaab79) switched from CANCELING 
> to CANCELED.
> 2018-09-13 18:00:51,425 DEBUG 
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Ignoring 
> transition of vertex LogAndConfigCoMap -> Filter -> FieldRegexFlatMap -> 
> Log2JsonMap -> Sink: LogKafkaSink (2/2) - execution #1 to FAILED while being 
> CANCELED.
> 2018-09-13 18:00:51,425 DEBUG 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Releasing 
> slot [SlotRequestId{7c3e3b9fb6f6ea9b20aa63bf6af5b12e}] because: Slot is being 
> returned to the SlotPool.
> 2018-09-13 18:00:51,425 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job 
> LogStream-sigma-panama-panama_h48_serverlog_testing 
> (e3317308798e962270b95102a1afe3b7) switched from state CANCELLING to CANCELED.
> 2018-09-13 18:00:51,425 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator     - Stopping 
> checkpoint coordinator for job e3317308798e962270b95102a1afe3b7.
> 2018-09-13 18:00:51,426 INFO  
> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore  - 
> Shutting down
> 2018-09-13 18:00:51,428 INFO  
> org.apache.flink.runtime.dispatcher.MiniDispatcher            - Job 
> e3317308798e962270b95102a1afe3b7 reached globally terminal state CANCELED.
> 2018-09-13 18:00:51,769 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Shut down and 
> terminate YarnJobClusterEntrypoint with return code 1444 and application 
> status CANCELED.
> 2018-09-13 18:00:51,769 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint         - Stopping 
> YarnJobClusterEntrypoint.
> 2018-09-13 18:00:51,769 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Shut down cluster because application is in CANCELED, 
> diagnostics null.
> 2018-09-13 18:00:51,769 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService  - 
> Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> 2018-09-13 18:00:51,770 INFO  org.apache.flink.yarn.YarnResourceManager       
>               - Unregister application from the YARN Resource Manager with 
> final status KILLED.
> 2018-09-13 18:00:51,775 INFO  
> org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl         - Waiting for 
> application to be successfully unregistered.
> 2018-09-13 18:00:51,789 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Suspending 
> SlotPool.
> 2018-09-13 18:00:51,789 DEBUG org.apache.flink.runtime.jobmaster.JobMaster    
>               - Close ResourceManager connection 
> 5eeea30d3582d0bfba2d0ec58f4da18c.
> org.apache.flink.util.FlinkException: JobManager is shutting down.
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.postStop(JobMaster.java:365)
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.postStop(AkkaRpcActor.java:105)
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.postStop(FencedAkkaRpcActor.java:40)
>     at akka.actor.Actor$class.aroundPostStop(Actor.scala:515)
>     at akka.actor.UntypedActor.aroundPostStop(UntypedActor.scala:95)
>     at 
> akka.actor.dungeon.FaultHandling$class.akka$actor$dungeon$FaultHandling$$finishTerminate(FaultHandling.scala:210)
>     at 
> akka.actor.dungeon.FaultHandling$class.terminate(FaultHandling.scala:172)
>     at akka.actor.ActorCell.terminate(ActorCell.scala:374)
>     at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:467)
>     at akka.actor.ActorCell.systemInvoke(ActorCell.scala:483)
>     at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:282)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:260)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 2018-09-13 18:00:51,789 INFO  org.apache.flink.runtime.rpc.akka.AkkaRpcActor  
>               - The rpc endpoint 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool has not been started 
> yet. Discarding message 
> org.apache.flink.runtime.rpc.messages.LocalRpcInvocation until processing is 
> started.
> 2018-09-13 18:00:51,794 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool          - Stopping 
> SlotPool.
> 2018-09-13 18:00:52,890 INFO  
> org.apache.flink.shaded.curator.org.apache.curator.framework.imps.CuratorFrameworkImpl
>   - backgroundOperationsLoop exiting
> 2018-09-13 18:00:52,922 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ZooKeeper  - Session: 
> 0x16324f962f69800 closed
> 2018-09-13 18:00:52,923 INFO  
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - 
> EventThread shut down for session: 0x16324f962f69800
> 2018-09-13 18:00:52,923 INFO  
> org.apache.flink.runtime.blob.TransientBlobCache              - Shutting down 
> BLOB cache
> 2018-09-13 18:00:52,932 INFO  
> org.apache.flink.runtime.rpc.akka.AkkaRpcService              - Stopping Akka 
> RPC service.
> 2018-09-13 18:00:52,935 INFO  
> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Shutting down 
> remote daemon.
> 2018-09-13 18:00:52,936 INFO  
> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remote daemon 
> shut down; proceeding with flushing remote transports.
> 2018-09-13 18:00:52,960 INFO  
> akka.remote.RemoteActorRefProvider$RemotingTerminator         - Remoting shut 
> down.
> 
> ```
> Best,
> Paul Lam

Reply via email to