Thanks for the help Arvid. Going to make these changes now and expect this
will address the issue we've had.

On Mon, Sep 6, 2021 at 12:03 PM Arvid Heise <ar...@apache.org> wrote:

> Sorry for not immediately writing that: yes you should just throw an
> exception. Flink will then try to close everything and restarts the job if
> the job is configured for restarts. [1]
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/
>
> On Mon, Sep 6, 2021 at 6:59 PM Jordan Hurwich <jhurw...@pulsasensors.com>
> wrote:
>
>> Thanks very much for the quick response, Arvid.
>>
>> I'm removing our calls to Runtime.getRuntime().exit(1) now. If something
>> goes wrong when setting up the task, what's the preferred way to graceful
>> shutdown so the Taskmanager can restart the job successfully? Should I just
>> allow an Exception to propagate up, or is there another method I should
>> call instead of exit()?
>>
>> On Mon, Sep 6, 2021 at 9:41 AM Arvid Heise <ar...@apache.org> wrote:
>>
>>> Hi Jordan,
>>>
>>> Please never call Runtime.getRuntime().exit(1). This will rob Flink of
>>> any way of graceful shutdown and recovery. Since you mentioned EC2 (and not
>>> a managed service such as ECS or K8S), I'm also assuming that the task
>>> manager process is never restarted and that's why you always end up with
>>> too few resources.
>>>
>>> If you have any other way of restarting the task manager, please let me
>>> know and I would look more closely into the logs.
>>>
>>>
>>>
>>> On Sun, Sep 5, 2021 at 8:02 PM Jordan Hurwich <jhurw...@pulsasensors.com>
>>> wrote:
>>>
>>>> Hi Flink Users,
>>>> We're having an issue where Flink jobs get stuck in the "RESTARTING"
>>>> state reliably the first night after our deployments with logs from the
>>>> Jobmanager reporting "Could not fulfill resource requirements of job
>>>> <job-id>. Free slots: 0". This is surprising because we expect the
>>>> restarting job to reuse its existing slot, and because even after
>>>> increasing from our minimum required 2 slots to 6 this issue appears to
>>>> still occur though $ flink list indicates only 2 jobs.
>>>>
>>>> Manually stopping all jobs and the jobmanager, and restarting each,
>>>> reliably addresses the issue and it does not appear to occur again. We
>>>> suspect some currently unknown root cause, but are particularly interested
>>>> in ensuring the Flink jobs can automatically recover from this state
>>>> without manual intervention.
>>>>
>>>> Details below with logs and flink-conf.yaml attached. Your support and
>>>> feedback is greatly appreciated,
>>>> Jordan, Pulsa Inc
>>>>
>>>>    - $ flink --version: Version: 1.13.2, Commit ID: 5f007ff
>>>>
>>>>    - What we observe
>>>>    - In this example we observe the issue beginning at about
>>>>       2021-09-05 03:20:34, logs for "standalonesession" and "taskexecutor" 
>>>> for
>>>>       the relevant time period attached, more available on request.
>>>>       - The earliest unique indication of the issue (at 2021-09-05
>>>>       03:20:34) appears to be a failure for one of our two jobs 
>>>> ("Measure") to
>>>>       open a Postgres connection, the other "Signal" job does not rely on
>>>>       Postgres. Logs attached in taskexecutor.out
>>>>          - This occurs in the open() of our custom RichSinkFunction
>>>>          "PGSink", where the exception is caught and we call
>>>>          Runtime.getRuntime().exit(1)
>>>>       - Moments before the issue starts (at 2021-09-05 03:19:19) we
>>>>       observe the TaskManager canceling the previous instance of this job, 
>>>> as
>>>>       indicated in taskexecutor.log
>>>>       - This corresponds with logs in standalonesession.log of "Trying
>>>>          to recover from a global failure...FlinkRuntimeException: Exceeded
>>>>          checkpoint tolerable failure threshold"
>>>>          - standalonesession.log indicates this exception happens
>>>>          multiple times before this event but in the earlier cases 
>>>> automatic recover
>>>>          seems to work correctly.
>>>>       - Moments after the issue starts (at 2021-09-05 03:20:37) in
>>>>       standalonesession.log we see another occurrence of "Exceeded
>>>>       checkpoint tolerable failure threshold" with considerably
>>>>       different following logs than the previous.
>>>>          - At about this time and following we observe logs in
>>>>          standalongsession.log reporting "akka...NettyTransport...Remote
>>>>          connection to [null] failed with java.net.ConnectException: 
>>>> Connection
>>>>          refused: /172.30.172.30:40763" repeatedly
>>>>       - In standalonesession.log we then see one occurrence of 
>>>> "...TimeoutException:
>>>>       Heartbeat of TaskManager with id 172.30.172.30:40763-75a3ae
>>>>       timed out" (at 2021-09-05 03:21:19) - this exception never
>>>>       appears to be repeated.
>>>>       - Immediately following that exception the repeating failure
>>>>       loop seems to begin:
>>>>          - The first log of the loop, in standalonesession.log,
>>>>          appears to be "Discarding the results produced by task
>>>>          execution 905e1..." 905e1... corresponds with the 
>>>> "TimeoutException:
>>>>          Heartbeat..." exception
>>>>          - That last log before repeating is "...CompletionException:
>>>>          ...NoResourceAvailableException: Could not acquire the minimum 
>>>> required
>>>>          resources", and this line will be followed "Discarding the
>>>>          results..." with the hash id matching this exception's.
>>>>          - This loop then repeats indefinitely.
>>>>       - During the issue, $ flink list on each machine reports the
>>>>       following:
>>>>       ------------------ Running/Restarting Jobs -------------------
>>>>       04.09.2021 16:03:39 : 3143a6c3ab0a5f58130384c13b2eca45 :
>>>>       FlinkSignalProcessor (RESTARTING)
>>>>       04.09.2021 16:45:08 : 4d531101945f0726432f73f37a38ee48 :
>>>>       FlinkMeasureProcessor (RESTARTING)
>>>>
>>>>
>>>>    - When we observe the issue
>>>>       - On each deployment we use Terraform to deploy a cluster of
>>>>       "funnel" machines on AWS EC2, each running two Flink jobs, 
>>>> "...Measure..."
>>>>       and "...Signal...", that pull from some upstream RabbitMQ queue and 
>>>> output
>>>>       into our Postgres db and Redis cache respectively.
>>>>       - After deployment we always observe these jobs are functioning
>>>>       properly in the RUNNING state on each machine, but some time the 
>>>> night
>>>>       after deployment we observe that the jobs become nonfunctional, 
>>>> reporting
>>>>       RESTARTING indefinitely.
>>>>       - This appears to occur to both jobs on every machine in the
>>>>       cluster simultaneously, implying some currently unknown root cause.
>>>>       - Stopping the jobs and jobmanager on each machine, and
>>>>       restarting them, reliably fixes the issue and it does not occur 
>>>> again after
>>>>       that first night following deployment.
>>>>
>>>>

Reply via email to