Hi Subramanya, if the container is still running and the TM can simply not connect to the JobManager, then the ResourceManager does not see a problem. The RM things in terms of containers and as long as n containers are running, it won't start new ones. That's the reason why the TM should exit in order to terminate the container.
Have you tried using a newer Flink version? Lately we have reworked a good part of Flink's distributed architecture and added resource elasticity (starting with Flink 1.5). Cheers, Till On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <ssur...@salesforce.com> wrote: > Hi, > With some additional research, > > *Before the flag* > I realized for failed containers (that exited for a specific we still > were Requesting new TM container and launching TM). But for the "Detected > unreachable: [akka.tcp://fl...@blahabc.sfdc.net:123]" issue I do not see > the container marked as failed and a subsequent request for TM. > > *After taskmanager.exit-on-fatal-akka-error: true` * > I do not see any unreachable: [akka.tcp://fl...@blahabc.sfdc.net:123] > since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not > sure if that is a coincidence or a direct impact of this change. > > *Our Issue:* > I realized we are still exiting the application, i.e. failing when the > containers are lost. The reason for this is before > org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new > container TM, launch TM and it is reported as started, the > org.apache.flink.runtime.jobmanager.scheduler > throws a NoResourceAvailableException that causes a failure. In our case we > had fixed restart strategy with 5, and we are running out of it because of > this. I am looking to solve this with a FailureRateRestartStrategy over 2 > minutes interval (10 second restart delay, >12 failures), that lets the TM > come back (takes about 50 seconds). > > *Flink Bug* > But I cannot help but think why there is no interaction between the > ResourceManager and JobManager, i.e. why is the jobmanager continuing with > the processing despite not having the required TMs ? > > Logs to substantiate what I said above (only relevant). > > 018-09-03 06:17:13,932 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Container > container_e28_1535135887442_1381_01_000097 completed successfully with > diagnostics: Container released by application > > > 2018-09-03 06:34:19,214 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system [akka.tcp:// > fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now > gated for [5000] ms. Reason: [Disassociated] > 2018-09-03 06:34:19,214 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Container > container_e27_1535135887442_1381_01_000102 failed. Exit status: -102 > 2018-09-03 06:34:19,215 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics > for container container_e27_1535135887442_1381_01_000102 in state COMPLETE > : exitStatus=-102 diagnostics=Container preempted by scheduler > 2018-09-03 06:34:19,215 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Total > number of failed containers so far: 1 > 2018-09-03 06:34:19,215 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Requesting > new TaskManager container with 20000 megabytes memory. Pending requests: 1 > 2018-09-03 06:34:19,216 INFO org.apache.flink.yarn.YarnJobManager > - Task manager akka.tcp:// > fl...@hello-world9-27-crz.ops.sfdc.net:46219/user/taskmanager terminated. > 2018-09-03 06:34:19,218 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from > state RUNNING to FAILING. > java.lang.Exception: TaskManager was lost/killed: > container_e27_1535135887442_1381_01_000102 @ > hello-world9-27-crz.ops.sfdc.net (dataPort=40423) > 2018-09-03 06:34:19,466 INFO > org.apache.flink.runtime.instance.InstanceManager - > Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. > Number of registered task managers 144. Number of available slots 720 > > 2018-09-03 06:34:24,717 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Received > new container: container_e28_1535135887442_1381_01_000147 - Remaining > pending container requests: 0 > 2018-09-03 06:34:24,717 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Launching > TaskManager in container ContainerInLaunch @ 1535956464717: Container: > [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId: > hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress: > hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480, > vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: > 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net > 2018-09-03 06:34:29,256 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system [akka.tcp:// > fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now > gated for [5000] ms. Reason: [Association failed with [akka.tcp:// > fl...@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection > refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219] > 2018-09-03 06:34:34,284 WARN akka.remote.transport.netty.NettyTransport > - Remote connection to [null] failed with > java.net.ConnectException: Connection refused: > hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219 > 2018-09-03 06:34:34,284 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system [akka.tcp:// > fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now > gated for [5000] ms. Reason: [Association failed with [akka.tcp:// > fl...@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection > refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219] > 2018-09-03 06:34:34,540 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from > state RESTARTING to CREATED. > > 2018-09-03 06:34:34,284 WARN akka.remote.ReliableDeliverySupervisor > - Association with remote system [akka.tcp:// > fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is now > gated for [5000] ms. Reason: [Association failed with [akka.tcp:// > fl...@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection > refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219] > 2018-09-03 06:34:34,540 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from > state RESTARTING to CREATED. > 2018-09-03 06:34:35,044 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from > state CREATED to RUNNING. > 2018-09-03 06:34:35,195 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from > state RUNNING to FAILING. > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the > operator parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #3 (Source: Custom Source -> > (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), > =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), > _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), > =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch), > _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'), > cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS > NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF > > > > > 2018-09-03 06:34:39,248 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - TaskManager > container_e28_1535135887442_1381_01_000147 has started. > 2018-09-03 06:34:45,235 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Container > container_e28_1535135887442_1381_01_000147 failed. Exit status: -102 > 2018-09-03 06:34:45,235 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics > for container container_e28_1535135887442_1381_01_000147 in state COMPLETE > : exitStatus=-102 diagnostics=Container preempted by scheduler > 2018-09-03 06:34:45,236 INFO org.apache.flink.yarn.YarnJobManager > - Task manager akka.tcp:// > fl...@hello-world9-27-crz.ops.sfdc.net:41966/user/taskmanager terminated. > 2018-09-03 06:34:45,236 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Total > number of failed containers so far: 2 > 2018-09-03 06:34:45,236 INFO > org.apache.flink.runtime.instance.InstanceManager - > Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. > Number of registered task managers 144. Number of available slots 720. > > 2018-09-03 06:34:45,236 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Received > new container: container_e28_1535135887442_1381_01_000202 - Remaining > pending container requests: 0 > 2018-09-03 06:34:45,236 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Launching > TaskManager in container ContainerInLaunch @ 1535956485236: Container: > [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId: > hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress: > hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480, > vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: > 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net > 2018-09-03 06:34:45,241 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Received > new container: container_e28_1535135887442_1381_01_000203 - Remaining > pending container requests: 0 > 2018-09-03 06:34:45,241 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Returning > excess container container_e28_1535135887442_1381_01_000203 > > Notice there is no TaskManager container_e28_1535135887442_1381_01_000202 > has started. > I see > 2018-09-03 06:34:56,894 INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - Job > streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from > state FAILING to FAILED. > org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: > Not enough free slots available to run the job. You can decrease the > operator parallelism or increase the number of slots per TaskManager in the > configuration. Task to schedule: < Attempt #5 (Source: Custom Source -> > (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), > =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), > _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R > > > 2018-09-03 06:34:57,005 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - Shutting > down cluster with status FAILED : The monitored job with ID > 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete. > 2018-09-03 06:34:57,007 INFO > org.apache.flink.yarn.YarnFlinkResourceManager - > Unregistering application from the YARN Resource Manager > > > > > On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <ssur...@salesforce.com > > wrote: > >> Thanks TIll, >> I do not see any Akka related messages in that Taskmanager after the >> initial startup. It seemed like all is well. So after the remotewatcher >> detects it unreachable and the TaskManager unregisters it, I do not see any >> other activity in the JobManager with regards to reallocation etc. >> - Does the quarantining of the TaskManager not happen until the >> exit-on-fatal-akka-error is turned on ? >> - Does the JobManager or the TaskManager try to reconnect to each other >> again ? Is there a different setting for it ? >> - Does the JobManager not reallocate a TaskManager despite it being >> unregistered, until the TaskManager exits ? I think it should, especially >> if it is not trying to establish a connection again. >> >> I will give the flag a try. >> >> Sincerely, >> >> >> On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Could you check whether akka.tcp:// >>> fl...@blahabc.sfdc.net:123/user/taskmanager is still running? E.g. >>> tries to reconnect to the JobManager? If this is the case, then the >>> container is still running and the YarnFlinkResourceManager thinks that >>> everything is alright. You can activate that a TaskManager kills itself if >>> it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: true` >>> in the `flink-conf.yaml`. >>> >>> Cheers, >>> Till >>> >>> On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh < >>> ssur...@salesforce.com> wrote: >>> >>>> Hi Till, >>>> Greatly appreciate your reply. >>>> We use version 1.4.2. I do not see nothing unusual in the logs for TM >>>> that was lost. Note: I have looked at many such failures and see the same >>>> below pattern. >>>> >>>> The JM logs above had most of what I had, but the below is what I have >>>> when I search for flink.yarn (we have huge logs otherwise, given the amount >>>> of SQL queries we run). The gist is Akka detecs unreachable, TM marked lost >>>> and unregistered by JM, operators start failing with >>>> NoResourceAvailableException since there was one less TM, 5 retry attempts >>>> later job goes down. >>>> >>>> …………. >>>> >>>> 2018-08-29 23:02:41,216 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - >>>> TaskManager container_e27_1535135887442_0906_01_000124 has started. >>>> >>>> 2018-08-29 23:02:50,095 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - >>>> TaskManager container_e27_1535135887442_0906_01_000159 has started. >>>> >>>> 2018-08-29 23:02:50,409 INFO org.apache.flink.yarn.YarnJobManager >>>> - Submitting job >>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod). >>>> >>>> 2018-08-29 23:02:50,429 INFO org.apache.flink.yarn.YarnJobManager >>>> - Using restart strategy >>>> FixedDelayRestartStrategy(maxNumberRestartAttempts=5, >>>> delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335. >>>> >>>> 2018-08-29 23:02:50,486 INFO org.apache.flink.yarn.YarnJobManager >>>> - Running initialization on master for job >>>> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335). >>>> >>>> 2018-08-29 23:02:50,487 INFO org.apache.flink.yarn.YarnJobManager >>>> - Successfully ran initialization on master in 0 >>>> ms. >>>> >>>> 2018-08-29 23:02:50,684 INFO org.apache.flink.yarn.YarnJobManager >>>> - Using application-defined state backend for >>>> checkpoint/savepoint metadata: File State Backend @ >>>> hdfs://security-temp/savedSearches/checkpoint. >>>> >>>> 2018-08-29 23:02:50,920 INFO org.apache.flink.yarn.YarnJobManager >>>> - Scheduling job >>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod). >>>> >>>> 2018-08-29 23:12:05,240 INFO org.apache.flink.yarn.YarnJobManager >>>> - Attempting to recover all jobs. >>>> >>>> 2018-08-29 23:12:05,716 INFO org.apache.flink.yarn.YarnJobManager >>>> - There are 1 jobs to recover. Starting the job >>>> recovery. >>>> >>>> 2018-08-29 23:12:05,806 INFO org.apache.flink.yarn.YarnJobManager >>>> - Attempting to recover job >>>> ab7e96a1659fbb4bfb3c6cd9bccb0335. >>>> >>>> 2018-08-29 23:12:07,308 INFO org.apache.flink.yarn.YarnJobManager >>>> - Ignoring job recovery for >>>> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted. >>>> >>>> 2018-08-29 23:13:57,364 INFO org.apache.flink.yarn.YarnJobManager >>>> - Task manager >>>> akka.tcp://fl...@blahabc.sfdc.net:123/user/taskmanager >>>> terminated. >>>> >>>> at >>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >>>> >>>> at >>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >>>> >>>> 2018-08-29 23:13:58,645 INFO org.apache.flink.yarn.YarnJobManager >>>> - Received message for non-existing checkpoint 1 >>>> >>>> 2018-08-29 23:15:26,126 INFO org.apache.flink.yarn.YarnJobManager >>>> - Job with ID ab7e96a1659fbb4bfb3c6cd9bccb0335 >>>> is in terminal state FAILED. Shutting down session >>>> >>>> 2018-08-29 23:15:26,128 INFO org.apache.flink.yarn.YarnJobManager >>>> - Stopping JobManager with final application >>>> status FAILED and diagnostics: The monitored job with ID >>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete. >>>> >>>> 2018-08-29 23:15:26,129 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - >>>> Shutting down cluster with status FAILED : The monitored job with ID >>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete. >>>> >>>> 2018-08-29 23:15:26,146 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - >>>> Unregistering application from the YARN Resource Manager >>>> >>>> 2018-08-29 23:15:31,363 INFO org.apache.flink.yarn.YarnJobManager >>>> - Deleting yarn application files under >>>> hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906. >>>> >>>> 2018-08-29 23:15:31,370 INFO org.apache.flink.yarn.YarnJobManager >>>> - Stopping JobManager akka.tcp://flink@ >>>> blahxyz.sfdc.net <http://blahabc.sfdc.net/>:1235/user/jobmanager. >>>> >>>> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager >>>> - Actor system shut down timed out. >>>> >>>> 2018-08-29 23:15:41,226 INFO org.apache.flink.yarn.YarnJobManager >>>> - Shutdown completed. Stopping JVM. >>>> >>>> >>>> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <trohrm...@apache.org> >>>> wrote: >>>> >>>>> Hi Subramanya, >>>>> >>>>> in order to help you I need a little bit of context. Which version of >>>>> Flink are you running? The configuration yarn.reallocate-failed is >>>>> deprecated since version Flink 1.1 and does not have an effect anymore. >>>>> >>>>> What would be helpful is to get the full jobmanager log from you. If >>>>> the YarnFlinkResourceManager gets notified that a container has failed, it >>>>> should restart this container (it will do this 145 times). So if the >>>>> YarnFlinkResourceManager does not get notified about a completed >>>>> container, >>>>> then this might indicate that the container is still running. So it would >>>>> be good to check what the logs of >>>>> container_e27_1535135887442_0906_01_000039 >>>>> say. >>>>> >>>>> Moreover, do you see the same problem occur when using the latest >>>>> release Flink 1.6.0? >>>>> >>>>> Cheers, >>>>> Till >>>>> >>>>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh < >>>>> ssur...@salesforce.com> wrote: >>>>> >>>>>> Hi, we are seeing a weird issue where one TaskManager is lost and >>>>>> then never re-allocated and subsequently operators fail with >>>>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay >>>>>> restarts of 5) the application goes down. >>>>>> >>>>>> - We have explicitly set *yarn.reallocate-failed: *true and have >>>>>> not specified the yarn.maximum-failed-containers (and see >>>>>> “org.apache.flink.yarn.YarnApplicationMasterRunner - >>>>>> YARN application tolerates 145 failed TaskManager containers before >>>>>> giving >>>>>> up” in the logs). >>>>>> - After the initial startup where all 145 TaskManagers are >>>>>> requested I never see any logs saying “Requesting new TaskManager >>>>>> container” to reallocate failed container. >>>>>> >>>>>> >>>>>> 2018-08-29 23:13:56,655 WARN akka.remote.RemoteWatcher >>>>>> - Detected unreachable: [akka.tcp:// >>>>>> fl...@blahabc.sfdc.net:123] >>>>>> >>>>>> 2018-08-29 23:13:57,364 INFO org.apache.flink.yarn.YarnJobManager >>>>>> - Task manager akka.tcp:// >>>>>> fl...@blahabc.sfdc.net:123/user/taskmanager terminated. >>>>>> >>>>>> java.lang.Exception: TaskManager was lost/killed: >>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net >>>>>> (dataPort=124) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) >>>>>> >>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org >>>>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096) >>>>>> >>>>>> at >>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107) >>>>>> >>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >>>>>> >>>>>> at >>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >>>>>> >>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49) >>>>>> >>>>>> at >>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >>>>>> >>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) >>>>>> >>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122) >>>>>> >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>>>> >>>>>> at >>>>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) >>>>>> >>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374) >>>>>> >>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511) >>>>>> >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494) >>>>>> >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>>>> >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>>>> >>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> >>>>>> java.lang.Exception: TaskManager was lost/killed: >>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net:42414 >>>>>> (dataPort=124) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) >>>>>> >>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org >>>>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096) >>>>>> >>>>>> at >>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107) >>>>>> >>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >>>>>> >>>>>> at >>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >>>>>> >>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49) >>>>>> >>>>>> at >>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >>>>>> >>>>>> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) >>>>>> >>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>>>>> >>>>>> at >>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122) >>>>>> >>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>>>> >>>>>> at >>>>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) >>>>>> >>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374) >>>>>> >>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511) >>>>>> >>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494) >>>>>> >>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>>>> >>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>>>> >>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>> >>>>>> at >>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>> >>>>>> 2018-08-29 23:13:58,529 INFO >>>>>> org.apache.flink.runtime.instance.InstanceManager >>>>>> - Unregistered task manager blahabc.sfdc.net:/1.1.1.1. >>>>>> Number of registered task managers 144. Number of available slots 720. >>>>>> >>>>>> 2018-08-29 23:13:58,645 INFO org.apache.flink.yarn.YarnJobManager >>>>>> - Received message for non-existing >>>>>> checkpoint 1 >>>>>> >>>>>> 2018-08-29 23:14:39,969 INFO >>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore >>>>>> - Recovering checkpoints from ZooKeeper. >>>>>> >>>>>> 2018-08-29 23:14:39,975 INFO >>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore >>>>>> - Found 0 checkpoints in ZooKeeper. >>>>>> >>>>>> 2018-08-29 23:14:39,975 INFO >>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore >>>>>> - Trying to fetch 0 checkpoints from storage. >>>>>> >>>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >>>>>> Not enough free slots available to run the job. You can decrease the >>>>>> operator parallelism or increase the number of slots per TaskManager in >>>>>> the >>>>>> configuration. Task to schedule: < Attempt #1 >>>>>> >>>>>> >>>>>> *After 5 retries of our Sql query execution graph (we have configured >>>>>> 5 fixed delay restart), it outputs the below, * >>>>>> >>>>>> >>>>>> 2018-08-29 23:15:22,216 INFO >>>>>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator >>>>>> - Stopping checkpoint coordinator for job >>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 >>>>>> >>>>>> 2018-08-29 23:15:22,216 INFO >>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore >>>>>> - Shutting down >>>>>> >>>>>> 2018-08-29 23:15:22,225 INFO >>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCompletedCheckpointStore >>>>>> - Removing >>>>>> /prod/link/application_1535135887442_0906/checkpoints/ab7e96a1659fbb4bfb3c6cd9bccb0335 >>>>>> from ZooKeeper >>>>>> >>>>>> 2018-08-29 23:15:23,460 INFO >>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter - >>>>>> Shutting down. >>>>>> >>>>>> 2018-08-29 23:15:23,460 INFO >>>>>> org.apache.flink.runtime.checkpoint.ZooKeeperCheckpointIDCounter - >>>>>> Removing /checkpoint-counter/ab7e96a1659fbb4bfb3c6cd9bccb0335 from >>>>>> ZooKeeper >>>>>> >>>>>> 2018-08-29 23:15:24,114 INFO >>>>>> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore - >>>>>> Removed job graph ab7e96a1659fbb4bfb3c6cd9bccb0335 from ZooKeeper. >>>>>> >>>>>> 2018-08-29 23:15:26,126 INFO org.apache.flink.yarn.YarnJobManager >>>>>> - Job with ID >>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting >>>>>> down >>>>>> session >>>>>> >>>>>> 2018-08-29 23:15:26,128 INFO org.apache.flink.yarn.YarnJobManager >>>>>> - Stopping JobManager with final application >>>>>> status FAILED and diagnostics: The monitored job with ID >>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete. >>>>>> >>>>>> 2018-08-29 23:15:26,129 INFO >>>>>> org.apache.flink.yarn.YarnFlinkResourceManager >>>>>> >>>>>