Another thought on the container failure:

in 1.1, the user code is loaded dynamically whenever a Task is started.
That means that on every task restart the code is reloaded. For that to
work proper, class unloading needs to happen, or the permgen will
eventually overflow.

It can happen that class unloading is prevented if the user functions do
leave references around as "GC roots", which may be threads, or references
in registries, etc.

In Flink 1.2, YARN will put the user code into the application classpath,
so code needs not be reloaded on every restart. That should solve that
issue.
To "simulate" that behavior in Flink 1.1, put your application code jars
into the "lib" folder

Best,
Stephan


On Thu, Jan 5, 2017 at 1:15 PM, Yury Ruchin <yuri.ruc...@gmail.com> wrote:

> Hi,
>
> I've faced a similar issue recently. Hope sharing my findings will help.
> The problem can be split into 2 parts:
>
> *Source of container failures*
> The logs you provided indicate that YARN kills its containers for
> exceeding memory limits. Important point here is that memory limit = JVM
> heap memory + off-heap memory. So if off-heap memory usage is high, YARN
> may kill containers despite JVM heap consumption is fine. To solve this
> issue, Flink reserves a share of container memory for off-heap memory. How
> much will be reserved is controlled by yarn.heap-cutoff-ratio and
> yarn.heap-cutoff-min configuration. By default 25% of the requested
> container memory will be reserved for off-heap. This is seems to be a good
> start, but one should experiment and tune to meet their job specifics.
>
> It's also worthwhile to figure out who consumes off-heap memory. Is it
> Flink managed memory moved off heap (taskmanager.memory.off-heap = true)?
> Is it some external library allocating something off heap? Is it your own
> code?
>
> *How Flink handles task manager failures*
> Whenever a task manager fails, the Flink jobmanager decides whether it
> should:
> - reallocate failed task manager container
> - fail application entirely
> These decisions can be guided by certain configuration (
> https://ci.apache.org/projects/flink/flink-docs-release-1.
> 1/setup/yarn_setup.html#recovery-behavior-of-flink-on-yarn). With default
> settings, job manager does reallocate task manager containers up to the
> point when N failures have been observed, where N is the number of
> requested task managers. After that the application is stopped.
>
> According to the logs, you have a finite number in yarn.maximum-failed-
> containers (11, as I can see from the logs - this may be set by Flink if
> not provided explicitly). On 12th container restart, jobmanager gives up
> and the application stops. I'm not sure why it keeps reporting not enough
> slots after that point. In my experience this may happen when job eats up
> all the available slots, so that after container failure its tasks cannot
> be restarted in other (live) containers. But I believe once the decision to
> stop the application is made, there should not be any further attempts to
> restart the job, hence no logs like those. Hopefully, someone else will
> explain this to us :)
>
> In my case I made jobmanager restart containers infinitely by setting 
> yarn.maximum-failed-containers
> = -1, so that taskmanager failure never results in application
> death. Note this is unlikely a good choice for a batch job.
>
> Regards,
> Yury
>
> 2017-01-05 3:21 GMT+03:00 Shannon Carey <sca...@expedia.com>:
>
>> In Flink 1.1.3 on emr-5.2.0, I've experienced a particular problem twice
>> and I'm wondering if anyone has some insight about it.
>>
>> In both cases, we deployed a job that fails very frequently (within
>> 15s-1m of launch). Eventually, the Flink cluster dies.
>>
>> The sequence of events looks something like this:
>>
>>    - bad job is launched
>>    - bad job fails & is restarted many times (I didn't have the
>>    "failure-rate" restart strategy configuration right)
>>    - Task manager logs: org.apache.flink.yarn.YarnTaskManagerRunner
>>    (SIGTERM handler): RECEIVED SIGNAL 15: SIGTERM. Shutting down as 
>> requested.
>>    - At this point, the YARN resource manager also logs the container
>>    failure
>>    - More logs: Container ResourceID{resourceId='contain
>>    er_1481658997383_0003_01_000013'} failed. Exit status: Pmem limit
>>    exceeded (-104)
>>    - Diagnostics for container ResourceID{resourceId='contain
>>    er_1481658997383_0003_01_000013'} in state COMPLETE : exitStatus=Pmem
>>    limit exceeded (-104) diagnostics=Container [pid=21246,containerID=contain
>>    er_1481658997383_0003_01_000013] is running beyond physical memory
>>    limits. Current usage: 5.6 GB of 5.6 GB physical memory used; 9.6 GB of
>>    28.1 GB virtual memory used. Killing container.
>>    Container killed on request. Exit code is 143
>>    Container exited with a non-zero exit code 143
>>    Total number of failed containers so far: 12
>>    Stopping YARN session because the number of failed containers (12)
>>    exceeded the maximum failed containers (11). This number is controlled by
>>    the 'yarn.maximum-failed-containers' configuration setting. By
>>    default its the number of requested containers.
>>    - From here onward, the logs repeatedly show that jobs fail to
>>    restart due to "org.apache.flink.runtime.jobm
>>    anager.scheduler.NoResourceAvailableException: Not enough free slots
>>    available to run the job. You can decrease the operator parallelism or
>>    increase the number of slots per TaskManager in the configuration. Task to
>>    schedule: < Attempt #68 (Source: …) @ (unassigned) - [SCHEDULED] > with
>>    groupID < 73191c171abfff61fb5102c161274145 > in sharing group <
>>    SlotSharingGroup [73191c171abfff61fb5102c161274145,
>>    19596f7834805c8409c419f0edab1f1b] >. Resources available to
>>    scheduler: Number of instances=0, total number of slots=0, available
>>    slots=0"
>>    - Eventually, Flink stops for some reason (with another SIGTERM
>>    message), presumably because of YARN
>>
>> Does anyone have an idea why a bad job repeatedly failing would
>> eventually result in the Flink cluster dying?
>>
>> Any idea why I'd get "Pmem limit exceeded" or "Not enough free slots
>> available to run the job"? The JVM heap usage and the free memory on the
>> machines both look reasonable in my monitoring dashboards. Could it
>> possibly be a memory leak due to classloading or something?
>>
>> Thanks for any help or suggestions you can provide! I am hoping that the
>> "failure-rate" restart strategy will help avoid this issue in the future,
>> but I'd also like to understand what's making the cluster die so that I can
>> prevent it.
>>
>> -Shannon
>>
>
>

Reply via email to