Hi,

the quarantining is not really solvable 1.4.x without restarting the Flink
component. Thus, I would recommend upgrading to the latest Flink (1.6.0,
1.6.1 will be released later this week) version.

In order to tell what would be the shorter route I would need to know a bit
more details about the problems you are facing.

Cheers,
Till

On Mon, Sep 17, 2018 at 9:24 PM Subramanya Suresh <ssur...@salesforce.com>
wrote:

> Hi Till,
> Update on this. We are still weeding past 1.6.0 setup and run. In a
> separate thread, we are running into issues, and sense more on the horizon
> before we get it working.
> We are under some tight timelines, so want to ask how confident you are
> that the above would be fixed in 1.6.0 ? and that trying to fix it in 1.4.2
> is the longer route.
>
> Sincerely,
>
> On Wed, Sep 12, 2018 at 6:49 PM, Subramanya Suresh <ssur...@salesforce.com
> > wrote:
>
>> Hi Till,
>> *After taskmanager.exit-on-fatal-akka-error: true` *
>> I do not see any unreachable: [akka.tcp://fl...@blahabc.sfdc.net:123]
>> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am not
>> sure if that is a coincidence or a direct impact of this change.
>>
>> Quick update, I can confirm our issue still happens even with the flag
>> being true. With a proper restart strategy (rate based, that gives it
>> enough time) it can recover from container failures like the first case
>> below, but not able to recover from "detected unreachable" issues like the
>> second case below.
>>
>> We are currently using the below configuration. So I guess the only
>> options left are to increase the heartbeat.pause or move to 1.6 as you
>> suggested
>>
>>
>> akka.client.timeout: 600s
>> akka.ask.timeout: 600s
>> akka.lookup.timeout: 600s
>> akka.watch.heartbeat.pause: 120s
>>
>>
>> ___________________________________________________
>> 8-09-11 16:30:44,861 INFO org.apache.flink.yarn.YarnFlinkResourceManager
>> - Container container_e29_1536261974019_1134_01_000036 failed. Exit status:
>> -100
>> 2018-09-11 16 <2018091116>:30:44,862 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager - Diagnostics for container
>> container_e29_1536261974019_1134_01_000036 in state COMPLETE :
>> exitStatus=-100 diagnostics=Container released on a *lost* node
>> 2018-09-11 16 <2018091116>:30:44,862 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager - Total number of failed
>> containers so far: 1
>> 2018-09-11 16 <2018091116>:30:44,862 INFO
>> org.apache.flink.yarn.YarnFlinkResourceManager - Requesting new TaskManager
>> container with 20000 megabytes memory. Pending requests: 1
>> 2018-09-11 16 <2018091116>:30:44,862 INFO
>> org.apache.flink.yarn.YarnJobManager - Task manager
>> akka.tcp://flink@hellow-world2-13:41157/user/taskmanager terminated.
>> at
>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>> 2018-09-11 16 <2018091116>:30:44,868 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - Job
>> streaming-searches-prod (1af29a616cba4bd76f920f7c80189535) switched from
>> state RUNNING to FAILING.
>> at
>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>> 2018-09-11 16 <2018091116>:30:49,700 WARN
>> akka.remote.ReliableDeliverySupervisor - Association with remote system
>> [akka.tcp://flink@hello-world2-13:41157] has failed, address is now
>> gated for [5000] ms. Reason: [Disassociated].
>> This container 2-13, just has a received
>> 2018-09-11 16 <2018091116>:30:47,195 INFO
>> org.apache.flink.yarn.YarnTaskManagerRunnerFactory - RECEIVED SIGNAL 15:
>> SIGTERM. Shutting down as requested.
>> ___________________________________________________
>> But then failed with the same old
>> 018-09-11 16:42:58,395 WARN akka.remote.RemoteWatcher - Detected
>> unreachable: [akka.tcp://flink@hello-world3-3:44607]
>> 2018-09-11 16 <2018091116>:42:58,409 INFO
>> org.apache.flink.yarn.YarnJobManager - Task manager akka.tcp://flink@
>> hello-world3-3/user/taskmanager terminated.
>> at
>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>> ___________________________________________________
>>
>> On Thu, Sep 6, 2018 at 9:08 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Subramanya,
>>>
>>> if the container is still running and the TM can simply not connect to
>>> the JobManager, then the ResourceManager does not see a problem. The RM
>>> things in terms of containers and as long as n containers are running, it
>>> won't start new ones. That's the reason why the TM should exit in order to
>>> terminate the container.
>>>
>>> Have you tried using a newer Flink version? Lately we have reworked a
>>> good part of Flink's distributed architecture and added resource elasticity
>>> (starting with Flink 1.5).
>>>
>>> Cheers,
>>> Till
>>>
>>> On Thu, Sep 6, 2018 at 4:58 AM Subramanya Suresh <ssur...@salesforce.com>
>>> wrote:
>>>
>>>> Hi,
>>>> With some additional research,
>>>>
>>>> *Before the flag*
>>>> I realized for failed containers (that exited for a specific  we still
>>>> were Requesting new TM container and launching TM). But for the "Detected
>>>> unreachable: [akka.tcp://fl...@blahabc.sfdc.net:123]" issue I do not
>>>> see the container marked as failed and a subsequent request for TM.
>>>>
>>>> *After taskmanager.exit-on-fatal-akka-error: true` *
>>>> I do not see any unreachable: [akka.tcp://fl...@blahabc.sfdc.net:123]
>>>> since we turned on taskmanager.exit-on-fatal-akka-error: true, I am
>>>> not sure if that is a coincidence or a direct impact of this change.
>>>>
>>>> *Our Issue:*
>>>> I realized we are still exiting the application, i.e. failing when the
>>>> containers are lost. The reason for this is before
>>>> org.apache.flink.yarn.YarnFlinkResouceManager is able to acquire a new
>>>> container TM, launch TM and it is reported as started, the 
>>>> org.apache.flink.runtime.jobmanager.scheduler
>>>> throws a NoResourceAvailableException that causes a failure. In our case we
>>>> had fixed restart strategy with 5, and we are running out of it because of
>>>> this. I am looking to solve this with a FailureRateRestartStrategy over 2
>>>> minutes interval (10 second restart delay, >12 failures), that lets the TM
>>>> come back (takes about 50 seconds).
>>>>
>>>> *Flink Bug*
>>>> But I cannot help but think why there is no interaction between the
>>>> ResourceManager and JobManager, i.e. why is the jobmanager continuing with
>>>> the processing despite not having the required TMs ?
>>>>
>>>> Logs to substantiate what I said above (only relevant).
>>>>
>>>> 018-09-03 06:17:13,932 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Container
>>>> container_e28_1535135887442_1381_01_000097 completed successfully with
>>>> diagnostics: Container released by application
>>>>
>>>>
>>>> 2018-09-03 06:34:19,214 WARN  akka.remote.ReliableDeliverySupervisor
>>>>                     - Association with remote system [akka.tcp://
>>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>> now gated for [5000] ms. Reason: [Disassociated]
>>>> 2018-09-03 06:34:19,214 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Container
>>>> container_e27_1535135887442_1381_01_000102 failed. Exit status: -102
>>>> 2018-09-03 06:34:19,215 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics
>>>> for container container_e27_1535135887442_1381_01_000102 in state COMPLETE
>>>> : exitStatus=-102 diagnostics=Container preempted by scheduler
>>>> 2018-09-03 06:34:19,215 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Total
>>>> number of failed containers so far: 1
>>>> 2018-09-03 06:34:19,215 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Requesting
>>>> new TaskManager container with 20000 megabytes memory. Pending requests: 1
>>>> 2018-09-03 06:34:19,216 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                     - Task manager akka.tcp://
>>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219/user/taskmanager
>>>> terminated.
>>>> 2018-09-03 06:34:19,218 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state RUNNING to FAILING.
>>>> java.lang.Exception: TaskManager was lost/killed:
>>>> container_e27_1535135887442_1381_01_000102 @
>>>> hello-world9-27-crz.ops.sfdc.net (dataPort=40423)
>>>> 2018-09-03 06:34:19,466 INFO
>>>> org.apache.flink.runtime.instance.InstanceManager             -
>>>> Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220.
>>>> Number of registered task managers 144. Number of available slots 720
>>>>
>>>> 2018-09-03 06:34:24,717 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>>>> new container: container_e28_1535135887442_1381_01_000147 - Remaining
>>>> pending container requests: 0
>>>> 2018-09-03 06:34:24,717 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>>>> TaskManager in container ContainerInLaunch @ 1535956464717: Container:
>>>> [ContainerId: container_e28_1535135887442_1381_01_000147, NodeId:
>>>> hello-world9-27-crz.ops.sfdc.net:8041, NodeHttpAddress:
>>>> hello-world9-27-crz.ops.sfdc.net:8042, Resource: <memory:20480,
>>>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
>>>> 11.11.35.220:8041 }, ] on host hello-world9-27-crz.ops.sfdc.net
>>>> 2018-09-03 06:34:29,256 WARN  akka.remote.ReliableDeliverySupervisor
>>>>                     - Association with remote system [akka.tcp://
>>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>>> 2018-09-03 06:34:34,284 WARN  akka.remote.transport.netty.NettyTransport
>>>>                   - Remote connection to [null] failed with
>>>> java.net.ConnectException: Connection refused:
>>>> hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219
>>>> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>>>>                     - Association with remote system [akka.tcp://
>>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>>> 2018-09-03 06:34:34,540 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state RESTARTING to CREATED.
>>>>
>>>> 2018-09-03 06:34:34,284 WARN  akka.remote.ReliableDeliverySupervisor
>>>>                     - Association with remote system [akka.tcp://
>>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219] has failed, address is
>>>> now gated for [5000] ms. Reason: [Association failed with [akka.tcp://
>>>> fl...@hello-world9-27-crz.ops.sfdc.net:46219]] Caused by: [Connection
>>>> refused: hello-world9-27-crz.ops.sfdc.net/11.11.35.220:46219]
>>>> 2018-09-03 06:34:34,540 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state RESTARTING to CREATED.
>>>> 2018-09-03 06:34:35,044 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state CREATED to RUNNING.
>>>> 2018-09-03 06:34:35,195 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state RUNNING to FAILING.
>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>> Not enough free slots available to run the job. You can decrease the
>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>> configuration. Task to schedule: < Attempt #3 (Source: Custom Source ->
>>>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
>>>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
>>>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'),
>>>> =(UPPER(RuleMatch), _UTF-16LE'OSQUERY'), =(UPPER(RuleMatch),
>>>> _UTF-16LE'ILLUMIO')), OR(cidrMatch(IPSource, _UTF-16LE'10.0.0.0/8'),
>>>> cidrMatch(IPSource, _UTF-16LE'192.168.0.0/16')), IS
>>>> NULL(csvLookup(_UTF-16LE'a353A000000jGZb_whitelist.csv', _UTF
>>>>
>>>>
>>>>
>>>>
>>>> 2018-09-03 06:34:39,248 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - TaskManager
>>>> container_e28_1535135887442_1381_01_000147 has started.
>>>> 2018-09-03 06:34:45,235 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Container
>>>> container_e28_1535135887442_1381_01_000147 failed. Exit status: -102
>>>> 2018-09-03 06:34:45,235 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Diagnostics
>>>> for container container_e28_1535135887442_1381_01_000147 in state COMPLETE
>>>> : exitStatus=-102 diagnostics=Container preempted by scheduler
>>>> 2018-09-03 06:34:45,236 INFO  org.apache.flink.yarn.YarnJobManager
>>>>                     - Task manager akka.tcp://
>>>> fl...@hello-world9-27-crz.ops.sfdc.net:41966/user/taskmanager
>>>> terminated.
>>>> 2018-09-03 06:34:45,236 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Total
>>>> number of failed containers so far: 2
>>>> 2018-09-03 06:34:45,236 INFO
>>>> org.apache.flink.runtime.instance.InstanceManager             -
>>>> Unregistered task manager hello-world9-27-crz.ops.sfdc.net/11.11.35.220.
>>>> Number of registered task managers 144. Number of available slots 720.
>>>>
>>>> 2018-09-03 06:34:45,236 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>>>> new container: container_e28_1535135887442_1381_01_000202 - Remaining
>>>> pending container requests: 0
>>>> 2018-09-03 06:34:45,236 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Launching
>>>> TaskManager in container ContainerInLaunch @ 1535956485236: Container:
>>>> [ContainerId: container_e28_1535135887442_1381_01_000202, NodeId:
>>>> hello-world4-31-crz.ops.sfdc.net:8041, NodeHttpAddress:
>>>> hello-world4-31-crz.ops.sfdc.net:8042, Resource: <memory:20480,
>>>> vCores:5>, Priority: 0, Token: Token { kind: ContainerToken, service:
>>>> 11.11.34.160:8041 }, ] on host hello-world4-31-crz.ops.sfdc.net
>>>> 2018-09-03 06:34:45,241 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Received
>>>> new container: container_e28_1535135887442_1381_01_000203 - Remaining
>>>> pending container requests: 0
>>>> 2018-09-03 06:34:45,241 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Returning
>>>> excess container container_e28_1535135887442_1381_01_000203
>>>>
>>>> Notice there is no TaskManager
>>>> container_e28_1535135887442_1381_01_000202 has started.
>>>> I see
>>>> 2018-09-03 06:34:56,894 INFO
>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
>>>> streaming-searches-prod (96d3b4f60a80a898f44f87c5b06f6981) switched from
>>>> state FAILING to FAILED.
>>>> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
>>>> Not enough free slots available to run the job. You can decrease the
>>>> operator parallelism or increase the number of slots per TaskManager in the
>>>> configuration. Task to schedule: < Attempt #5 (Source: Custom Source ->
>>>> (Map -> where: (AND(OR(=(UPPER(RuleMatch), _UTF-16LE'BRO'),
>>>> =(UPPER(RuleMatch), _UTF-16LE'PAN'), =(UPPER(RuleMatch),
>>>> _UTF-16LE'OLYMPUS'), =(UPPER(RuleMatch), _UTF-16LE'BIT9'), =(UPPER(R
>>>>
>>>>
>>>> 2018-09-03 06:34:57,005 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                - Shutting
>>>> down cluster with status FAILED : The monitored job with ID
>>>> 96d3b4f60a80a898f44f87c5b06f6981 has failed to complete.
>>>> 2018-09-03 06:34:57,007 INFO
>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>> Unregistering application from the YARN Resource Manager
>>>>
>>>>
>>>>
>>>>
>>>> On Fri, Aug 31, 2018 at 4:00 PM, Subramanya Suresh <
>>>> ssur...@salesforce.com> wrote:
>>>>
>>>>> Thanks TIll,
>>>>> I do not see any Akka related messages in that Taskmanager after the
>>>>> initial startup. It seemed like all is well. So after the remotewatcher
>>>>> detects it unreachable and the TaskManager unregisters it, I do not see 
>>>>> any
>>>>> other activity in the JobManager with regards to reallocation etc.
>>>>> - Does the quarantining of the TaskManager not happen until  the
>>>>> exit-on-fatal-akka-error is turned on ?
>>>>> - Does the JobManager or the TaskManager try to reconnect to each
>>>>> other again ? Is there a different setting for it ?
>>>>> - Does the JobManager not reallocate a TaskManager despite it being
>>>>> unregistered, until the TaskManager exits ? I think it should, especially
>>>>> if it is not trying to establish a connection again.
>>>>>
>>>>> I will give the flag a try.
>>>>>
>>>>> Sincerely,
>>>>>
>>>>>
>>>>> On Fri, Aug 31, 2018 at 2:53 AM, Till Rohrmann <trohrm...@apache.org>
>>>>> wrote:
>>>>>
>>>>>> Could you check whether akka.tcp://
>>>>>> fl...@blahabc.sfdc.net:123/user/taskmanager is still running? E.g.
>>>>>> tries to reconnect to the JobManager? If this is the case, then the
>>>>>> container is still running and the YarnFlinkResourceManager thinks that
>>>>>> everything is alright. You can activate that a TaskManager kills itself 
>>>>>> if
>>>>>> it gets quarantined by setting `taskmanager.exit-on-fatal-akka-error: 
>>>>>> true`
>>>>>> in the `flink-conf.yaml`.
>>>>>>
>>>>>> Cheers,
>>>>>> Till
>>>>>>
>>>>>> On Fri, Aug 31, 2018 at 10:43 AM Subramanya Suresh <
>>>>>> ssur...@salesforce.com> wrote:
>>>>>>
>>>>>>> Hi Till,
>>>>>>> Greatly appreciate your reply.
>>>>>>> We use version 1.4.2. I do not see nothing unusual in the logs for
>>>>>>> TM that was lost. Note: I have looked at many such failures and see the
>>>>>>> same below pattern.
>>>>>>>
>>>>>>> The JM logs above had most of what I had, but the below is what I
>>>>>>> have when I search for flink.yarn (we have huge logs otherwise, given 
>>>>>>> the
>>>>>>> amount of SQL queries we run). The gist is Akka detecs unreachable, TM
>>>>>>> marked lost and unregistered by JM, operators start failing with
>>>>>>> NoResourceAvailableException since there was one less TM, 5 retry 
>>>>>>> attempts
>>>>>>> later job goes down.
>>>>>>>
>>>>>>> ………….
>>>>>>>
>>>>>>> 2018-08-29 23:02:41,216 INFO
>>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>>>>> TaskManager container_e27_1535135887442_0906_01_000124 has started.
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,095 INFO
>>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>>>>> TaskManager container_e27_1535135887442_0906_01_000159 has started.
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,409 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Submitting job
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,429 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Using restart strategy
>>>>>>> FixedDelayRestartStrategy(maxNumberRestartAttempts=5,
>>>>>>> delayBetweenRestartAttempts=10000) for ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,486 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Running initialization on master for job
>>>>>>> streaming-searches-prod (ab7e96a1659fbb4bfb3c6cd9bccb0335).
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,487 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Successfully ran initialization on master
>>>>>>> in 0 ms.
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,684 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Using application-defined state backend
>>>>>>> for checkpoint/savepoint metadata: File State Backend @
>>>>>>> hdfs://security-temp/savedSearches/checkpoint.
>>>>>>>
>>>>>>> 2018-08-29 23:02:50,920 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Scheduling job
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 (streaming-searches-prod).
>>>>>>>
>>>>>>> 2018-08-29 23:12:05,240 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Attempting to recover all jobs.
>>>>>>>
>>>>>>> 2018-08-29 23:12:05,716 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - There are 1 jobs to recover. Starting the
>>>>>>> job recovery.
>>>>>>>
>>>>>>> 2018-08-29 23:12:05,806 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Attempting to recover job
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335.
>>>>>>>
>>>>>>> 2018-08-29 23:12:07,308 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Ignoring job recovery for
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335, because it is already submitted.
>>>>>>>
>>>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Task manager akka.tcp://flink@
>>>>>>> blahabc.sfdc.net:123/user/taskmanager terminated.
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>
>>>>>>> at
>>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>
>>>>>>> 2018-08-29 23:13:58,645 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Received message for non-existing
>>>>>>> checkpoint 1
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,126 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Job with ID
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 is in terminal state FAILED. Shutting 
>>>>>>> down
>>>>>>> session
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,128 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Stopping JobManager with final
>>>>>>> application status FAILED and diagnostics: The monitored job with ID
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,129 INFO
>>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>>>>> Shutting down cluster with status FAILED : The monitored job with ID
>>>>>>> ab7e96a1659fbb4bfb3c6cd9bccb0335 has failed to complete.
>>>>>>>
>>>>>>> 2018-08-29 23:15:26,146 INFO
>>>>>>> org.apache.flink.yarn.YarnFlinkResourceManager                -
>>>>>>> Unregistering application from the YARN Resource Manager
>>>>>>>
>>>>>>> 2018-08-29 23:15:31,363 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Deleting yarn application files under
>>>>>>> hdfs://security-temp/user/sec-data-app/.flink/application_1535135887442_0906.
>>>>>>>
>>>>>>> 2018-08-29 23:15:31,370 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Stopping JobManager akka.tcp://flink@
>>>>>>> blahxyz.sfdc.net <http://blahabc.sfdc.net/>:1235/user/jobmanager.
>>>>>>>
>>>>>>> 2018-08-29 23:15:41,225 ERROR org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Actor system shut down timed out.
>>>>>>>
>>>>>>> 2018-08-29 23:15:41,226 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>                         - Shutdown completed. Stopping JVM.
>>>>>>>
>>>>>>>
>>>>>>> On Thu, Aug 30, 2018 at 1:55 AM, Till Rohrmann <trohrm...@apache.org
>>>>>>> > wrote:
>>>>>>>
>>>>>>>> Hi Subramanya,
>>>>>>>>
>>>>>>>> in order to help you I need a little bit of context. Which version
>>>>>>>> of Flink are you running? The configuration yarn.reallocate-failed is
>>>>>>>> deprecated since version Flink 1.1 and does not have an effect anymore.
>>>>>>>>
>>>>>>>> What would be helpful is to get the full jobmanager log from you.
>>>>>>>> If the YarnFlinkResourceManager gets notified that a container has 
>>>>>>>> failed,
>>>>>>>> it should restart this container (it will do this 145 times). So if the
>>>>>>>> YarnFlinkResourceManager does not get notified about a completed 
>>>>>>>> container,
>>>>>>>> then this might indicate that the container is still running. So it 
>>>>>>>> would
>>>>>>>> be good to check what the logs of 
>>>>>>>> container_e27_1535135887442_0906_01_000039
>>>>>>>> say.
>>>>>>>>
>>>>>>>> Moreover, do you see the same problem occur when using the latest
>>>>>>>> release Flink 1.6.0?
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Till
>>>>>>>>
>>>>>>>> On Thu, Aug 30, 2018 at 8:48 AM Subramanya Suresh <
>>>>>>>> ssur...@salesforce.com> wrote:
>>>>>>>>
>>>>>>>>> Hi, we are seeing a weird issue where one TaskManager is lost and
>>>>>>>>> then never re-allocated and subsequently operators fail with
>>>>>>>>> NoResourceAvailableException and after 5 restarts (we have FixedDelay
>>>>>>>>> restarts of 5) the application goes down.
>>>>>>>>>
>>>>>>>>>    - We have explicitly set *yarn.reallocate-failed: *true and
>>>>>>>>>    have not specified the yarn.maximum-failed-containers (and see
>>>>>>>>>    “org.apache.flink.yarn.YarnApplicationMasterRunner             -
>>>>>>>>>    YARN application tolerates 145 failed TaskManager containers 
>>>>>>>>> before giving
>>>>>>>>>    up” in the logs).
>>>>>>>>>    - After the initial startup where all 145 TaskManagers are
>>>>>>>>>    requested I never see any logs saying “Requesting new TaskManager
>>>>>>>>>    container” to reallocate failed container.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 2018-08-29 23:13:56,655 WARN  akka.remote.RemoteWatcher
>>>>>>>>>                           - Detected unreachable: [akka.tcp://
>>>>>>>>> fl...@blahabc.sfdc.net:123]
>>>>>>>>>
>>>>>>>>> 2018-08-29 23:13:57,364 INFO  org.apache.flink.yarn.YarnJobManager
>>>>>>>>>                         - Task manager akka.tcp://
>>>>>>>>> fl...@blahabc.sfdc.net:123/user/taskmanager terminated.
>>>>>>>>>
>>>>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>>>>> container_e27_1535135887442_0906_01_000039 @ blahabc.sfdc.net
>>>>>>>>> (dataPort=124)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>>>>>>>
>>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>>>>>>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>>>>>>>>>
>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>>>
>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>>>>>>>>>
>>>>>>>>> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:122)
>>>>>>>>>
>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> akka.actor.dungeon.DeathWatch$class.receivedTerminated(DeathWatch.scala:46)
>>>>>>>>>
>>>>>>>>> at akka.actor.ActorCell.receivedTerminated(ActorCell.scala:374)
>>>>>>>>>
>>>>>>>>> at akka.actor.ActorCell.autoReceiveMessage(ActorCell.scala:511)
>>>>>>>>>
>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:494)
>>>>>>>>>
>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>>>>>>>>>
>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>>>>>>>>>
>>>>>>>>> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>>>
>>>>>>>>> java.lang.Exception: TaskManager was lost/killed:
>>>>>>>>> container_e27_1535135887442_0906_01_000039 @
>>>>>>>>> blahabc.sfdc.net:42414 (dataPort=124)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SimpleSlot.releaseSlot(SimpleSlot.java:217)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SlotSharingGroupAssignment.releaseSharedSlot(SlotSharingGroupAssignment.java:523)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.SharedSlot.releaseSlot(SharedSlot.java:192)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.Instance.markDead(Instance.java:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.instance.InstanceManager.unregisterTaskManager(InstanceManager.java:212)
>>>>>>>>>
>>>>>>>>> at org.apache.flink.runtime.jobmanager.JobManager.org
>>>>>>>>> $apache$flink$runtime$jobmanager$JobManager$$handleTaskManagerTerminated(JobManager.scala:1198)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:1096)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.clusterframework.ContaineredJobManager$$anonfun$handleContainerMessage$1.applyOrElse(ContaineredJobManager.scala:107)
>>>>>>>>>
>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnShutdown$1.applyOrElse(YarnJobManager.scala:110)
>>>>>>>>>
>>>>>>>>> at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:49)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>>>>>>>>>
>>>>>>>>> at
>>>>>>>>> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>>>>>>>>>
>>>>>>>>>

Reply via email to