Hi, the quarantining is not really solvable 1.4.x without restarting the Flink component. Thus, I would recommend upgrading to the latest Flink (1.6.0, 1.6.1 will be released later this week) version.
In order to tell what would be the shorter route I would need to know a bit more details about the problems you are facing. Cheers, Till On Mon, Sep 17, 2018 at 9:24 PM Subramanya Suresh <ssur...@salesforce.com> wrote: > Hi Till, > Update on this. We are still weeding past 1.6.0 setup and run. In a > separate thread, we are running into issues, and sense more on the horizon > before we get it working. > We are under some tight timelines, so want to ask how confident you are > that the above would be fixed in 1.6.0 ? and that trying to fix it in 1.4.2 > is the longer route. > > Sincerely, > > On Wed, Sep 12, 2018 at 6:49 PM, Subramanya Suresh <ssur...@salesforce.com > > wrote: > >> Hi Till, >> *After taskmanager.exit-on-fatal-akka-error: true` * >> I do not see any unreachable: [akka.tcp://fl...@blahabc.sfdc.net:123] >> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not >> sure if that is a coincidence or a direct impact of this change. >> >> Quick update, I can confirm our issue still happens even with the flag >> being true. With a proper restart strategy (rate based, that gives it >> enough time) it can recover from container failures like the first case >> below, but not able to recover from "detected unreachable" issues like the >> second case below. >> >> We are currently using the below configuration. So I guess the only >> options left are to increase the heartbeat.pause or move to 1.6 as you >> suggested >> >> >> akka.client.timeout: 600s >> akka.ask.timeout: 600s >> akka.lookup.timeout: 600s >> akka.watch.heartbeat.pause: 120s >> >> >> ___________________________________________________ >> 8-09-11 16:30:44,861 INFO org.apache.flink.yarn.YarnFlinkResourceManager >> - Container container_e29_1536261974019_1134_01_000036 failed. Exit status: >> -100 >> 2018-09-11 16 <2018091116>:30:44,862 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container >> container_e29_1536261974019_1134_01_000036 in state COMPLETE : >> exitStatus=-100 diagnostics=Container released on a *lost* node >> 2018-09-11 16 <2018091116>:30:44,862 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed >> containers so far: 1 >> 2018-09-11 16 <2018091116>:30:44,862 INFO >> org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager >> container with 20000 megabytes memory. Pending requests: 1 >> 2018-09-11 16 <2018091116>:30:44,862 INFO >> org.apache.flink.yarn.YarnJobManager - Task manager >> akka.tcp://flink@hellow-world2-13:41157/user/taskmanager terminated. >> at >> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >> 2018-09-11 16 <2018091116>:30:44,868 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >> streaming-searches-prod (1af29a616cba4bd76f920f7c80189535) switched from >> state RUNNING to FAILING. >> at >> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >> 2018-09-11 16 <2018091116>:30:49,700 WARN >> akka.remote.ReliableDeliverySupervisor - Association with remote system >> [akka.tcp://flink@hello-world2-13:41157] has failed, address is now >> gated for [5000] ms. Reason: [Disassociated]. >> This container 2-13, just has a received >> 2018-09-11 16 <2018091116>:30:47,195 INFO >> org.apache.flink.yarn.YarnTaskManagerRunnerFactory - RECEIVED SIGNAL 15: >> SIGTERM. Shutting down as requested. >> ___________________________________________________ >> But then failed with the same old >> 018-09-11 16:42:58,395 WARN akka.remote.RemoteWatcher - Detected >> unreachable: [akka.tcp://flink@hello-world3-3:44607] >> 2018-09-11 16 <2018091116>:42:58,409 INFO >> org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@ >> hello-world3-3/user/taskmanager terminated. >> at >> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >> ___________________________________________________ >> >> On Thu, Sep 6, 2018 at 9:08 AM, Till Rohrmann <trohrm...@apache.org> >> wrote: >> >>> Hi Subramanya, >>> >>> if the container is still running and the TM can simply not connect to >>> the JobManager, then the ResourceManager does not see a problem. The RM >>> things in terms of containers and as long as n containers are running, it >>> won't start new ones. That's the reason why the TM should exit in order to >>> terminate the container. >>> >>> Have you tried using a newer Flink version? Lately we have reworked a >>> good part of Flink's distributed architecture and added resource elasticity >>> (starting with Flink 1.5). >>> >>> Cheers, >>> Till >>> >>> On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <ssur...@salesforce.com> >>> wrote: >>> >>>> Hi, >>>> With some additional research, >>>> >>>> *Before the flag* >>>> I realized for failed containers (that exited for a specific we still >>>> were Requesting new TM container and launching TM). But for the "Detected >>>> unreachable: [akka.tcp://fl...@blahabc.sfdc.net:123]" issue I do not >>>> see the container marked as failed and a subsequent request for TM. >>>> >>>> *After taskmanager.exit-on-fatal-akka-error: true` * >>>> I do not see any unreachable: [akka.tcp://fl...@blahabc.sfdc.net:123] >>>> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am >>>> not sure if that is a coincidence or a direct impact of this change. >>>> >>>> *Our Issue:* >>>> I realized we are still exiting the application, i.e. failing when the >>>> containers are lost. The reason for this is before >>>> org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new >>>> container TM, launch TM and it is reported as started, the >>>> org.apache.flink.runtime.jobmanager.scheduler >>>> throws a NoResourceAvailableException that causes a failure. In our case we >>>> had fixed restart strategy with 5, and we are running out of it because of >>>> this. I am looking to solve this with a FailureRateRestartStrategy over 2 >>>> minutes interval (10 second restart delay, >12 failures), that lets the TM >>>> come back (takes about 50 seconds). >>>> >>>> *Flink Bug* >>>> But I cannot help but think why there is no interaction between the >>>> ResourceManager and JobManager, i.e. why is the jobmanager continuing with >>>> the processing despite not having the required TMs ? >>>> >>>> Logs to substantiate what I said above (only relevant). >>>> >>>> 018-09-03 06:17:13,932 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Container >>>> container_e28_1535135887442_1381_01_000097 completed successfully with >>>> diagnostics: Container released by application >>>> >>>> >>>> 2018-09-03 06:34:19,214 WARN akka.remote.ReliableDeliverySupervisor >>>> - Association with remote system [akka.tcp:// >>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is >>>> now gated for [5000] ms. Reason: [Disassociated] >>>> 2018-09-03 06:34:19,214 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Container >>>> container_e27_1535135887442_1381_01_000102 failed. Exit status: -102 >>>> 2018-09-03 06:34:19,215 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics >>>> for container container_e27_1535135887442_1381_01_000102 in state COMPLETE >>>> : exitStatus=-102 diagnostics=Container preempted by scheduler >>>> 2018-09-03 06:34:19,215 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Total >>>> number of failed containers so far: 1 >>>> 2018-09-03 06:34:19,215 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Requesting >>>> new TaskManager container with 20000 megabytes memory. Pending requests: 1 >>>> 2018-09-03 06:34:19,216 INFO org.apache.flink.yarn.YarnJobManager >>>> - Task manager akka.tcp:// >>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219/user/taskmanager >>>> terminated. >>>> 2018-09-03 06:34:19,218 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from >>>> state RUNNING to FAILING. >>>> java.lang.Exception: TaskManager was lost/killed: >>>> container_e27_1535135887442_1381_01_000102 @ >>>> hello-world9-27-crz.ops.sfdc.net (dataPort=40423) >>>> 2018-09-03 06:34:19,466 INFO >>>> org.apache.flink.runtime.instance.InstanceManager - >>>> Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. >>>> Number of registered task managers 144. Number of available slots 720 >>>> >>>> 2018-09-03 06:34:24,717 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Received >>>> new container: container_e28_1535135887442_1381_01_000147 - Remaining >>>> pending container requests: 0 >>>> 2018-09-03 06:34:24,717 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Launching >>>> TaskManager in container ContainerInLaunch @ 1535956464717: Container: >>>> [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId: >>>> hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress: >>>> hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480, >>>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: >>>> 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net >>>> 2018-09-03 06:34:29,256 WARN akka.remote.ReliableDeliverySupervisor >>>> - Association with remote system [akka.tcp:// >>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is >>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp:// >>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection >>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219] >>>> 2018-09-03 06:34:34,284 WARN akka.remote.transport.netty.NettyTransport >>>> - Remote connection to [null] failed with >>>> java.net.ConnectException: Connection refused: >>>> hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219 >>>> 2018-09-03 06:34:34,284 WARN akka.remote.ReliableDeliverySupervisor >>>> - Association with remote system [akka.tcp:// >>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is >>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp:// >>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection >>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219] >>>> 2018-09-03 06:34:34,540 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from >>>> state RESTARTING to CREATED. >>>> >>>> 2018-09-03 06:34:34,284 WARN akka.remote.ReliableDeliverySupervisor >>>> - Association with remote system [akka.tcp:// >>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is >>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp:// >>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection >>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219] >>>> 2018-09-03 06:34:34,540 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from >>>> state RESTARTING to CREATED. >>>> 2018-09-03 06:34:35,044 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from >>>> state CREATED to RUNNING. >>>> 2018-09-03 06:34:35,195 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from >>>> state RUNNING to FAILING. >>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >>>> Not enough free slots available to run the job. You can decrease the >>>> operator parallelism or increase the number of slots per TaskManager in the >>>> configuration. Task to schedule: < Attempt #3 (Source: Custom Source -> >>>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), >>>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), >>>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), >>>> =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch), >>>> _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'), >>>> cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS >>>> NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF >>>> >>>> >>>> >>>> >>>> 2018-09-03 06:34:39,248 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - TaskManager >>>> container_e28_1535135887442_1381_01_000147 has started. >>>> 2018-09-03 06:34:45,235 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Container >>>> container_e28_1535135887442_1381_01_000147 failed. Exit status: -102 >>>> 2018-09-03 06:34:45,235 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics >>>> for container container_e28_1535135887442_1381_01_000147 in state COMPLETE >>>> : exitStatus=-102 diagnostics=Container preempted by scheduler >>>> 2018-09-03 06:34:45,236 INFO org.apache.flink.yarn.YarnJobManager >>>> - Task manager akka.tcp:// >>>> fl...@hello-world9-27-crz.ops.sfdc.net:41966/user/taskmanager >>>> terminated. >>>> 2018-09-03 06:34:45,236 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Total >>>> number of failed containers so far: 2 >>>> 2018-09-03 06:34:45,236 INFO >>>> org.apache.flink.runtime.instance.InstanceManager - >>>> Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220. >>>> Number of registered task managers 144. Number of available slots 720. >>>> >>>> 2018-09-03 06:34:45,236 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Received >>>> new container: container_e28_1535135887442_1381_01_000202 - Remaining >>>> pending container requests: 0 >>>> 2018-09-03 06:34:45,236 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Launching >>>> TaskManager in container ContainerInLaunch @ 1535956485236: Container: >>>> [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId: >>>> hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress: >>>> hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480, >>>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service: >>>> 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net >>>> 2018-09-03 06:34:45,241 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Received >>>> new container: container_e28_1535135887442_1381_01_000203 - Remaining >>>> pending container requests: 0 >>>> 2018-09-03 06:34:45,241 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Returning >>>> excess container container_e28_1535135887442_1381_01_000203 >>>> >>>> Notice there is no TaskManager >>>> container_e28_1535135887442_1381_01_000202 has started. >>>> I see >>>> 2018-09-03 06:34:56,894 INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from >>>> state FAILING to FAILED. >>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: >>>> Not enough free slots available to run the job. You can decrease the >>>> operator parallelism or increase the number of slots per TaskManager in the >>>> configuration. Task to schedule: < Attempt #5 (Source: Custom Source -> >>>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'), >>>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch), >>>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R >>>> >>>> >>>> 2018-09-03 06:34:57,005 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - Shutting >>>> down cluster with status FAILED : The monitored job with ID >>>> 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete. >>>> 2018-09-03 06:34:57,007 INFO >>>> org.apache.flink.yarn.YarnFlinkResourceManager - >>>> Unregistering application from the YARN Resource Manager >>>> >>>> >>>> >>>> >>>> On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh < >>>> ssur...@salesforce.com> wrote: >>>> >>>>> Thanks TIll, >>>>> I do not see any Akka related messages in that Taskmanager after the >>>>> initial startup. It seemed like all is well. So after the remotewatcher >>>>> detects it unreachable and the TaskManager unregisters it, I do not see >>>>> any >>>>> other activity in the JobManager with regards to reallocation etc. >>>>> - Does the quarantining of the TaskManager not happen until the >>>>> exit-on-fatal-akka-error is turned on ? >>>>> - Does the JobManager or the TaskManager try to reconnect to each >>>>> other again ? Is there a different setting for it ? >>>>> - Does the JobManager not reallocate a TaskManager despite it being >>>>> unregistered, until the TaskManager exits ? I think it should, especially >>>>> if it is not trying to establish a connection again. >>>>> >>>>> I will give the flag a try. >>>>> >>>>> Sincerely, >>>>> >>>>> >>>>> On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <trohrm...@apache.org> >>>>> wrote: >>>>> >>>>>> Could you check whether akka.tcp:// >>>>>> fl...@blahabc.sfdc.net:123/user/taskmanager is still running? E.g. >>>>>> tries to reconnect to the JobManager? If this is the case, then the >>>>>> container is still running and the YarnFlinkResourceManager thinks that >>>>>> everything is alright. You can activate that a TaskManager kills itself >>>>>> if >>>>>> it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: >>>>>> true` >>>>>> in the `flink-conf.yaml`. >>>>>> >>>>>> Cheers, >>>>>> Till >>>>>> >>>>>> On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh < >>>>>> ssur...@salesforce.com> wrote: >>>>>> >>>>>>> Hi Till, >>>>>>> Greatly appreciate your reply. >>>>>>> We use version 1.4.2. I do not see nothing unusual in the logs for >>>>>>> TM that was lost. Note: I have looked at many such failures and see the >>>>>>> same below pattern. >>>>>>> >>>>>>> The JM logs above had most of what I had, but the below is what I >>>>>>> have when I search for flink.yarn (we have huge logs otherwise, given >>>>>>> the >>>>>>> amount of SQL queries we run). The gist is Akka detecs unreachable, TM >>>>>>> marked lost and unregistered by JM, operators start failing with >>>>>>> NoResourceAvailableException since there was one less TM, 5 retry >>>>>>> attempts >>>>>>> later job goes down. >>>>>>> >>>>>>> …………. >>>>>>> >>>>>>> 2018-08-29 23:02:41,216 INFO >>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager - >>>>>>> TaskManager container_e27_1535135887442_0906_01_000124 has started. >>>>>>> >>>>>>> 2018-08-29 23:02:50,095 INFO >>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager - >>>>>>> TaskManager container_e27_1535135887442_0906_01_000159 has started. >>>>>>> >>>>>>> 2018-08-29 23:02:50,409 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Submitting job >>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod). >>>>>>> >>>>>>> 2018-08-29 23:02:50,429 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Using restart strategy >>>>>>> FixedDelayRestartStrategy(maxNumberRestartAttempts=5, >>>>>>> delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335. >>>>>>> >>>>>>> 2018-08-29 23:02:50,486 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Running initialization on master for job >>>>>>> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335). >>>>>>> >>>>>>> 2018-08-29 23:02:50,487 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Successfully ran initialization on master >>>>>>> in 0 ms. >>>>>>> >>>>>>> 2018-08-29 23:02:50,684 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Using application-defined state backend >>>>>>> for checkpoint/savepoint metadata: File State Backend @ >>>>>>> hdfs://security-temp/savedSearches/checkpoint. >>>>>>> >>>>>>> 2018-08-29 23:02:50,920 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Scheduling job >>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod). >>>>>>> >>>>>>> 2018-08-29 23:12:05,240 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Attempting to recover all jobs. >>>>>>> >>>>>>> 2018-08-29 23:12:05,716 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - There are 1 jobs to recover. Starting the >>>>>>> job recovery. >>>>>>> >>>>>>> 2018-08-29 23:12:05,806 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Attempting to recover job >>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335. >>>>>>> >>>>>>> 2018-08-29 23:12:07,308 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Ignoring job recovery for >>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted. >>>>>>> >>>>>>> 2018-08-29 23:13:57,364 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Task manager akka.tcp://flink@ >>>>>>> blahabc.sfdc.net:123/user/taskmanager terminated. >>>>>>> >>>>>>> at >>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >>>>>>> >>>>>>> at >>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >>>>>>> >>>>>>> 2018-08-29 23:13:58,645 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Received message for non-existing >>>>>>> checkpoint 1 >>>>>>> >>>>>>> 2018-08-29 23:15:26,126 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Job with ID >>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting >>>>>>> down >>>>>>> session >>>>>>> >>>>>>> 2018-08-29 23:15:26,128 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Stopping JobManager with final >>>>>>> application status FAILED and diagnostics: The monitored job with ID >>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete. >>>>>>> >>>>>>> 2018-08-29 23:15:26,129 INFO >>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager - >>>>>>> Shutting down cluster with status FAILED : The monitored job with ID >>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete. >>>>>>> >>>>>>> 2018-08-29 23:15:26,146 INFO >>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager - >>>>>>> Unregistering application from the YARN Resource Manager >>>>>>> >>>>>>> 2018-08-29 23:15:31,363 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Deleting yarn application files under >>>>>>> hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906. >>>>>>> >>>>>>> 2018-08-29 23:15:31,370 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Stopping JobManager akka.tcp://flink@ >>>>>>> blahxyz.sfdc.net <http://blahabc.sfdc.net/>:1235/user/jobmanager. >>>>>>> >>>>>>> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager >>>>>>> - Actor system shut down timed out. >>>>>>> >>>>>>> 2018-08-29 23:15:41,226 INFO org.apache.flink.yarn.YarnJobManager >>>>>>> - Shutdown completed. Stopping JVM. >>>>>>> >>>>>>> >>>>>>> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <trohrm...@apache.org >>>>>>> > wrote: >>>>>>> >>>>>>>> Hi Subramanya, >>>>>>>> >>>>>>>> in order to help you I need a little bit of context. Which version >>>>>>>> of Flink are you running? The configuration yarn.reallocate-failed is >>>>>>>> deprecated since version Flink 1.1 and does not have an effect anymore. >>>>>>>> >>>>>>>> What would be helpful is to get the full jobmanager log from you. >>>>>>>> If the YarnFlinkResourceManager gets notified that a container has >>>>>>>> failed, >>>>>>>> it should restart this container (it will do this 145 times). So if the >>>>>>>> YarnFlinkResourceManager does not get notified about a completed >>>>>>>> container, >>>>>>>> then this might indicate that the container is still running. So it >>>>>>>> would >>>>>>>> be good to check what the logs of >>>>>>>> container_e27_1535135887442_0906_01_000039 >>>>>>>> say. >>>>>>>> >>>>>>>> Moreover, do you see the same problem occur when using the latest >>>>>>>> release Flink 1.6.0? >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Till >>>>>>>> >>>>>>>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh < >>>>>>>> ssur...@salesforce.com> wrote: >>>>>>>> >>>>>>>>> Hi, we are seeing a weird issue where one TaskManager is lost and >>>>>>>>> then never re-allocated and subsequently operators fail with >>>>>>>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay >>>>>>>>> restarts of 5) the application goes down. >>>>>>>>> >>>>>>>>> - We have explicitly set *yarn.reallocate-failed: *true and >>>>>>>>> have not specified the yarn.maximum-failed-containers (and see >>>>>>>>> “org.apache.flink.yarn.YarnApplicationMasterRunner - >>>>>>>>> YARN application tolerates 145 failed TaskManager containers >>>>>>>>> before giving >>>>>>>>> up” in the logs). >>>>>>>>> - After the initial startup where all 145 TaskManagers are >>>>>>>>> requested I never see any logs saying “Requesting new TaskManager >>>>>>>>> container” to reallocate failed container. >>>>>>>>> >>>>>>>>> >>>>>>>>> 2018-08-29 23:13:56,655 WARN akka.remote.RemoteWatcher >>>>>>>>> - Detected unreachable: [akka.tcp:// >>>>>>>>> fl...@blahabc.sfdc.net:123] >>>>>>>>> >>>>>>>>> 2018-08-29 23:13:57,364 INFO org.apache.flink.yarn.YarnJobManager >>>>>>>>> - Task manager akka.tcp:// >>>>>>>>> fl...@blahabc.sfdc.net:123/user/taskmanager terminated. >>>>>>>>> >>>>>>>>> java.lang.Exception: TaskManager was lost/killed: >>>>>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net >>>>>>>>> (dataPort=124) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) >>>>>>>>> >>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org >>>>>>>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096) >>>>>>>>> >>>>>>>>> at >>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107) >>>>>>>>> >>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >>>>>>>>> >>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49) >>>>>>>>> >>>>>>>>> at >>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >>>>>>>>> >>>>>>>>> at >>>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) >>>>>>>>> >>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122) >>>>>>>>> >>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526) >>>>>>>>> >>>>>>>>> at >>>>>>>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46) >>>>>>>>> >>>>>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374) >>>>>>>>> >>>>>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511) >>>>>>>>> >>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494) >>>>>>>>> >>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257) >>>>>>>>> >>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224) >>>>>>>>> >>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234) >>>>>>>>> >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>>>> >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>>>> >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>>>> >>>>>>>>> at >>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>>>> >>>>>>>>> java.lang.Exception: TaskManager was lost/killed: >>>>>>>>> container_e27_1535135887442_0906_01_000039 @ >>>>>>>>> blahabc.sfdc.net:42414 (dataPort=124) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212) >>>>>>>>> >>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org >>>>>>>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096) >>>>>>>>> >>>>>>>>> at >>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107) >>>>>>>>> >>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110) >>>>>>>>> >>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49) >>>>>>>>> >>>>>>>>> at >>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) >>>>>>>>> >>>>>>>>> at >>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) >>>>>>>>> >>>>>>>>>