Sorry for not immediately writing that: yes you should just throw an
exception. Flink will then try to close everything and restarts the job if
the job is configured for restarts. [1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/

On Mon, Sep 6, 2021 at 6:59 PM Jordan Hurwich <jhurw...@pulsasensors.com>
wrote:

> Thanks very much for the quick response, Arvid.
>
> I'm removing our calls to Runtime.getRuntime().exit(1) now. If something
> goes wrong when setting up the task, what's the preferred way to graceful
> shutdown so the Taskmanager can restart the job successfully? Should I just
> allow an Exception to propagate up, or is there another method I should
> call instead of exit()?
>
> On Mon, Sep 6, 2021 at 9:41 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Jordan,
>>
>> Please never call Runtime.getRuntime().exit(1). This will rob Flink of
>> any way of graceful shutdown and recovery. Since you mentioned EC2 (and not
>> a managed service such as ECS or K8S), I'm also assuming that the task
>> manager process is never restarted and that's why you always end up with
>> too few resources.
>>
>> If you have any other way of restarting the task manager, please let me
>> know and I would look more closely into the logs.
>>
>>
>>
>> On Sun, Sep 5, 2021 at 8:02 PM Jordan Hurwich <jhurw...@pulsasensors.com>
>> wrote:
>>
>>> Hi Flink Users,
>>> We're having an issue where Flink jobs get stuck in the "RESTARTING"
>>> state reliably the first night after our deployments with logs from the
>>> Jobmanager reporting "Could not fulfill resource requirements of job
>>> <job-id>. Free slots: 0". This is surprising because we expect the
>>> restarting job to reuse its existing slot, and because even after
>>> increasing from our minimum required 2 slots to 6 this issue appears to
>>> still occur though $ flink list indicates only 2 jobs.
>>>
>>> Manually stopping all jobs and the jobmanager, and restarting each,
>>> reliably addresses the issue and it does not appear to occur again. We
>>> suspect some currently unknown root cause, but are particularly interested
>>> in ensuring the Flink jobs can automatically recover from this state
>>> without manual intervention.
>>>
>>> Details below with logs and flink-conf.yaml attached. Your support and
>>> feedback is greatly appreciated,
>>> Jordan, Pulsa Inc
>>>
>>>    - $ flink --version: Version: 1.13.2, Commit ID: 5f007ff
>>>
>>>    - What we observe
>>>    - In this example we observe the issue beginning at about 2021-09-05
>>>       03:20:34, logs for "standalonesession" and "taskexecutor" for the 
>>> relevant
>>>       time period attached, more available on request.
>>>       - The earliest unique indication of the issue (at 2021-09-05
>>>       03:20:34) appears to be a failure for one of our two jobs ("Measure") 
>>> to
>>>       open a Postgres connection, the other "Signal" job does not rely on
>>>       Postgres. Logs attached in taskexecutor.out
>>>          - This occurs in the open() of our custom RichSinkFunction
>>>          "PGSink", where the exception is caught and we call
>>>          Runtime.getRuntime().exit(1)
>>>       - Moments before the issue starts (at 2021-09-05 03:19:19) we
>>>       observe the TaskManager canceling the previous instance of this job, 
>>> as
>>>       indicated in taskexecutor.log
>>>       - This corresponds with logs in standalonesession.log of "Trying
>>>          to recover from a global failure...FlinkRuntimeException: Exceeded
>>>          checkpoint tolerable failure threshold"
>>>          - standalonesession.log indicates this exception happens
>>>          multiple times before this event but in the earlier cases 
>>> automatic recover
>>>          seems to work correctly.
>>>       - Moments after the issue starts (at 2021-09-05 03:20:37) in
>>>       standalonesession.log we see another occurrence of "Exceeded
>>>       checkpoint tolerable failure threshold" with considerably
>>>       different following logs than the previous.
>>>          - At about this time and following we observe logs in
>>>          standalongsession.log reporting "akka...NettyTransport...Remote
>>>          connection to [null] failed with java.net.ConnectException: 
>>> Connection
>>>          refused: /172.30.172.30:40763" repeatedly
>>>       - In standalonesession.log we then see one occurrence of 
>>> "...TimeoutException:
>>>       Heartbeat of TaskManager with id 172.30.172.30:40763-75a3ae timed
>>>       out" (at 2021-09-05 03:21:19) - this exception never appears to
>>>       be repeated.
>>>       - Immediately following that exception the repeating failure loop
>>>       seems to begin:
>>>          - The first log of the loop, in standalonesession.log, appears
>>>          to be "Discarding the results produced by task execution
>>>          905e1..." 905e1... corresponds with the "TimeoutException:
>>>          Heartbeat..." exception
>>>          - That last log before repeating is "...CompletionException:
>>>          ...NoResourceAvailableException: Could not acquire the minimum 
>>> required
>>>          resources", and this line will be followed "Discarding the
>>>          results..." with the hash id matching this exception's.
>>>          - This loop then repeats indefinitely.
>>>       - During the issue, $ flink list on each machine reports the
>>>       following:
>>>       ------------------ Running/Restarting Jobs -------------------
>>>       04.09.2021 16:03:39 : 3143a6c3ab0a5f58130384c13b2eca45 :
>>>       FlinkSignalProcessor (RESTARTING)
>>>       04.09.2021 16:45:08 : 4d531101945f0726432f73f37a38ee48 :
>>>       FlinkMeasureProcessor (RESTARTING)
>>>
>>>
>>>    - When we observe the issue
>>>       - On each deployment we use Terraform to deploy a cluster of
>>>       "funnel" machines on AWS EC2, each running two Flink jobs, 
>>> "...Measure..."
>>>       and "...Signal...", that pull from some upstream RabbitMQ queue and 
>>> output
>>>       into our Postgres db and Redis cache respectively.
>>>       - After deployment we always observe these jobs are functioning
>>>       properly in the RUNNING state on each machine, but some time the night
>>>       after deployment we observe that the jobs become nonfunctional, 
>>> reporting
>>>       RESTARTING indefinitely.
>>>       - This appears to occur to both jobs on every machine in the
>>>       cluster simultaneously, implying some currently unknown root cause.
>>>       - Stopping the jobs and jobmanager on each machine, and
>>>       restarting them, reliably fixes the issue and it does not occur again 
>>> after
>>>       that first night following deployment.
>>>
>>>

Reply via email to