Hi Vishal,

it seems that the following is happening: You triggered the cancel with
savepoint command from via the REST call. This command is an asynchronous
operation which produces a result (the savepoint path). In order to deliver
asynchronous results to the caller, Flink waits before shutting down until
they are delivered or until it times out after 5 minutes. I assume that you
don't request the savepoint path from Flink via the returned URL from the
original request. This could either happen if you kill the CLI before its
done or if you have written your own method to trigger this operation.

I guess we could add a flag for asynchronous operations which tells Flink
that their results don't need to get delivered to some client. If you would
like to have such a feature, then please open a JIRA issue for it.

Cheers,
Till

On Wed, Apr 24, 2019 at 3:49 AM Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> Anyione ?
>
>
>
> I think there some race condition .  These are the TM logs.. I am puzzled
> b'coz in a larger pipe ( there are about 32 lots on 8 replicas and it works
>
>
>
>
> 2019-04-24 01:16:20,889 DEBUG
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> Releasing local state under allocation id 5a853ef886e1c599f86b9503306fffd2.
>
> 2019-04-24 01:16:20,894 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Close
> JobManager connection for job 00000000000000000000000000000000.
>
> org.apache.flink.util.FlinkException: Stopping JobMaster for job
> EventCountJob(00000000000000000000000000000000).
>
> at org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:355)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 2019-04-24 01:16:20,895 INFO
> org.apache.flink.runtime.taskexecutor.JobLeaderService        - Cannot
> reconnect to job 00000000000000000000000000000000 because it is not
> registered.
>
> 2019-04-24 01:16:21,053 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> heartbeat request from e61c2b7d992f151936e21db1ca06666d.
>
> 2019-04-24 01:16:22,136 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x25add5478fb2ec6 after 0ms
>
> 2019-04-24 01:16:31,052 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
> heartbeat request from e61c2b7d992f151936e21db1ca06666d.
>
> 2019-04-24 01:16:35,483 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
> ping response for sessionid: 0x25add5478fb2ec6 after 0ms
>
> On Tue, Apr 23, 2019 at 3:11 PM Vishal Santoshi <vishal.santo...@gmail.com>
> wrote:
>
>> I see this in the TM pod
>>
>> 2019-04-23 19:08:41,828 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>>
>> 2019-04-23 19:08:47,543 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
>> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>
>> 2019-04-23 19:08:55,175 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x15cc7f3d88466a5 after 1ms
>>
>> 2019-04-23 19:08:57,548 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
>> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>
>> 2019-04-23 19:09:07,543 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
>> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>
>> 2019-04-23 19:09:08,523 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>>
>> 2019-04-23 19:09:17,542 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
>> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>
>> 2019-04-23 19:09:21,871 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>>
>> 2019-04-23 19:09:27,543 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
>> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>
>> 2019-04-23 19:09:35,218 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x15cc7f3d88466a5 after 0ms
>>
>> 2019-04-23 19:09:37,542 DEBUG
>> org.apache.flink.runtime.taskexecutor.TaskExecutor            - Received
>> heartbeat request from 6b7dd7b5032c089bff8a77f75de65c22.
>>
>>
>>
>> JM log has analogous..
>>
>>
>> 2019-04-23 19:10:49,218 DEBUG
>> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  - Got
>> ping response for sessionid: 0x25add5478fb2e7c after 0ms
>>
>>
>>
>> Does that ring a bell ?
>>
>>
>>
>> On Tue, Apr 23, 2019 at 2:04 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Adding the DEBUG  logs from the time I call a REST based cancel with
>>> save point...
>>>
>>> On Tue, Apr 23, 2019 at 2:01 PM Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
>>>> Though looking at
>>>> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/StandaloneResourceManager.java#L88
>>>>  it
>>>> does seem that the last log . is expected.
>>>>
>>>> Not sure what part is hanging... I have more logs I can share...
>>>>
>>>> On Tue, Apr 23, 2019 at 1:59 PM Vishal Santoshi <
>>>> vishal.santo...@gmail.com> wrote:
>>>>
>>>>> I am seeing this weird issue where I do a save point with cancel on a
>>>>> job on k8s and it hangs for 5 minutes ( no INFO logs ) and then exits with
>>>>> code of 2.
>>>>>
>>>>>
>>>>> 2019-04-23 17:36:31,372 INFO
>>>>> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint  -
>>>>> Shutting down rest endpoint.
>>>>>
>>>>> 2019-04-23 17:36:31,374 INFO
>>>>> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService
>>>>> - Stopping ZooKeeperLeaderRetrievalService
>>>>> /leader/resource_manager_lock.
>>>>>
>>>>> 2019-04-23 17:36:31,377 INFO
>>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      -
>>>>> Suspending SlotPool.
>>>>>
>>>>> 2019-04-23 17:36:31,378 DEBUG
>>>>> org.apache.flink.runtime.jobmaster.JobMaster                  - Close
>>>>> ResourceManager connection 181a4fd61044033a2ea32e384096247f.
>>>>>
>>>>> org.apache.flink.util.FlinkException: JobManager is shutting down.
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:365)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>>>>>
>>>>> at
>>>>> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>>>>>
>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>
>>>>> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>>>>>
>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>
>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>>>>>
>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>
>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>
>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>
>>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>
>>>>> at
>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>
>>>>> 2019-04-23 17:36:31,381 INFO
>>>>> org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl      -
>>>>> Stopping SlotPool.
>>>>>
>>>>> 2019-04-23 17:36:31,381 INFO
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Disconnect job manager a41a5dceae5ad3664ff1f0b79f3e47ef
>>>>> @akka.tcp://flink@kafka-to-prometheus:6123/user/jobmanager_0 for job
>>>>> 00000000000000000000000000000000 from the resource manager.
>>>>>
>>>>> 2019-04-23 17:36:31,385 INFO
>>>>> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService
>>>>> - Stopping ZooKeeperLeaderElectionService
>>>>> ZooKeeperLeaderElectionService{leaderPath='/leader/00000000000000000000000000000000/job_manager_lock'}.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> and after 5 minutes ..
>>>>>
>>>>>
>>>>>
>>>>> 019-04-23 17:41:32,187 DEBUG
>>>>> org.apache.flink.shaded.netty4.io.netty.buffer.PoolThreadCache  -
>>>>> Freed 8 thread-local buffer(s) from thread: Finalizer
>>>>>
>>>>> 2019-04-23 17:41:32,198 INFO
>>>>> org.apache.flink.runtime.rpc.akka.AkkaRpcService              -
>>>>> Stopped Akka RPC service.
>>>>>
>>>>> 2019-04-23 17:41:32,200 INFO  
>>>>> org.apache.flink.runtime.entrypoint.ClusterEntrypoint
>>>>>         - Terminating cluster entrypoint process
>>>>> StandaloneJobClusterEntryPoint with exit code 2.
>>>>>
>>>>> java.util.concurrent.TimeoutException
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.concurrent.FutureUtils$Timeout.run(FutureUtils.java:942)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.concurrent.DirectExecutorService.execute(DirectExecutorService.java:211)
>>>>>
>>>>> at
>>>>> org.apache.flink.runtime.concurrent.FutureUtils.lambda$orTimeout$11(FutureUtils.java:360)
>>>>>
>>>>> at
>>>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> In the interim, I get this at a regular clip
>>>>>
>>>>>
>>>>>
>>>>> 2019-04-23 17:37:02,452 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  -
>>>>> Release TaskExecutor 3752235c49428b94a0520f04266973eb because it exceeded
>>>>> the idle timeout.
>>>>>
>>>>> 2019-04-23 17:37:02,453 DEBUG
>>>>> org.apache.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Worker 68c5fbd67ac2bbe6fc35ed068ce1c4b1 could not be stopped.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Any ideas as to whether it is this inability to shut down the Worker
>>>>> that is causing this issue ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards.
>>>>>
>>>>

Reply via email to