Hi Bowen,

sorry for my late answer. I dug through some of the logs and it seems that
you have the following problem:

   1.

   Once in a while the Kinesis producer fails with a
   UserRecordFailedException saying “Expired while waiting in HttpClient queue
   Record has reached expiration”. This seems to be a problem on the Kinesis
   side. This will trigger the task failure and the cancellation of all other
   tasks as well.
   2.

   Somehow Flink does not manage to cancel all tasks within a period of 180
   seconds. This value is configurable via task.cancellation.timeout (unit
   ms) via the Flink configuration. It looks a bit like you have a lot of
   logging going on, because the the code is waiting for example on
   Category.java:204 and other log4j methods. This could, however also cover
   the true issue. What you could do is to try out a different logging backend
   such as logback [1], for example.
   3.

   The failing cancellation is a fatal error which leads to the termination
   of the TaskManager. This will be notified by the YarnResourceManager and it
   will restart the container. This goes on until it reaches the number of
   maximum failed containers. This value can be configured via
   yarn.maximum-failed-containers. Per default it is the number of initial
   containers you requested. If you set this value to -1, then it will
   never fail and always restart failed containers. Once the maximum is
   reached, Flink terminates the Yarn application.

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/best_practices.html#using-logback-instead-of-log4j

In order to further debug the problem, which version of Flink are you using
and maybe you could provide us with the debug log level logs of the
TaskManagers.

Cheers,
Till
​

On Fri, Aug 11, 2017 at 5:37 AM, Bowen Li <bowen...@offerupnow.com> wrote:

> Hi Till,
>     Any idea why it happened? I've tried different configurations for
> configuring our Flink cluster, but the cluster always fails after 4 or 5
> hours.
>
>     According to the log, looks like the total number of slots becomes 0
> at the end, and YarnClusterClient shuts down application master as a
> result. Why the slots are not released? Or are they actually crushed and
> thus no longer available?
>
> I'm trying to deploy the first Flink cluster within out company. And this
> issue is slowing us down from proving that Flink actually works for us.
> We'd appreciate your help on it!
>
> Thanks,
> Bowen
>
> On Wed, Aug 9, 2017 at 1:33 PM, Bowen Li <bowen...@offerupnow.com> wrote:
>
>> Hi Till,
>>     Thanks for taking this issue.
>>
>>     We are not comfortable sending logs to a email list which is this
>> open. I'll send logs to you.
>>
>> Thanks,
>> Bowen
>>
>>
>> On Wed, Aug 9, 2017 at 2:46 AM, Till Rohrmann <trohrm...@apache.org>
>> wrote:
>>
>>> Hi Bowen,
>>>
>>> if I'm not mistaken, then Flink's current Yarn implementation does not
>>> actively releases containers. The `YarnFlinkResourceManager` is started
>>> with a fixed number of containers it always tries to acquire. If a
>>> container should die, then it will request a new one.
>>>
>>> In case of a failure all slots should be freed and then they should be
>>> subject to rescheduling the new tasks. Thus, it is not necessarily the case
>>> that 12 new slots will be used unless the old slots are no longer available
>>> (failure of a TM). Therefore, it sounds like a bug what you are describing.
>>> Could you share the logs with us?
>>>
>>> Cheers,
>>> Till
>>>
>>> On Wed, Aug 9, 2017 at 9:32 AM, Bowen Li <bowen...@offerupnow.com>
>>> wrote:
>>>
>>>> Hi guys,
>>>>     I was running a Flink job (12 parallelism) on an EMR cluster with
>>>> 48 YARN slots. When the job starts, I can see from Flink UI that the job
>>>> took 12 slots, and 36 slots were left available.
>>>>
>>>>     I would expect that when the job fails, it would restart from
>>>> checkpointing by taking another 12 slots and freeing the original 12 
>>>> slots. *Well,
>>>> I observed that the job took new slots but never free original slots. The
>>>> Flink job ended up killed by YARN because there's no available slots
>>>> anymore.*
>>>>
>>>>      Here's the command I ran Flink job:
>>>>
>>>>      ```
>>>>      flink run -m yarn-cluster -yn 6 -ys 8 -ytm 40000  xxx.jar
>>>>      ```
>>>>
>>>>      Does anyone know what's going wrong?
>>>>
>>>> Thanks,
>>>> Bowen
>>>>
>>>
>>>
>>
>

Reply via email to