Sorry for not immediately writing that: yes you should just throw an exception. Flink will then try to close everything and restarts the job if the job is configured for restarts. [1]
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/execution/task_failure_recovery/ On Mon, Sep 6, 2021 at 6:59 PM Jordan Hurwich <jhurw...@pulsasensors.com> wrote: > Thanks very much for the quick response, Arvid. > > I'm removing our calls to Runtime.getRuntime().exit(1) now. If something > goes wrong when setting up the task, what's the preferred way to graceful > shutdown so the Taskmanager can restart the job successfully? Should I just > allow an Exception to propagate up, or is there another method I should > call instead of exit()? > > On Mon, Sep 6, 2021 at 9:41 AM Arvid Heise <ar...@apache.org> wrote: > >> Hi Jordan, >> >> Please never call Runtime.getRuntime().exit(1). This will rob Flink of >> any way of graceful shutdown and recovery. Since you mentioned EC2 (and not >> a managed service such as ECS or K8S), I'm also assuming that the task >> manager process is never restarted and that's why you always end up with >> too few resources. >> >> If you have any other way of restarting the task manager, please let me >> know and I would look more closely into the logs. >> >> >> >> On Sun, Sep 5, 2021 at 8:02 PM Jordan Hurwich <jhurw...@pulsasensors.com> >> wrote: >> >>> Hi Flink Users, >>> We're having an issue where Flink jobs get stuck in the "RESTARTING" >>> state reliably the first night after our deployments with logs from the >>> Jobmanager reporting "Could not fulfill resource requirements of job >>> <job-id>. Free slots: 0". This is surprising because we expect the >>> restarting job to reuse its existing slot, and because even after >>> increasing from our minimum required 2 slots to 6 this issue appears to >>> still occur though $ flink list indicates only 2 jobs. >>> >>> Manually stopping all jobs and the jobmanager, and restarting each, >>> reliably addresses the issue and it does not appear to occur again. We >>> suspect some currently unknown root cause, but are particularly interested >>> in ensuring the Flink jobs can automatically recover from this state >>> without manual intervention. >>> >>> Details below with logs and flink-conf.yaml attached. Your support and >>> feedback is greatly appreciated, >>> Jordan, Pulsa Inc >>> >>> - $ flink --version: Version: 1.13.2, Commit ID: 5f007ff >>> >>> - What we observe >>> - In this example we observe the issue beginning at about 2021-09-05 >>> 03:20:34, logs for "standalonesession" and "taskexecutor" for the >>> relevant >>> time period attached, more available on request. >>> - The earliest unique indication of the issue (at 2021-09-05 >>> 03:20:34) appears to be a failure for one of our two jobs ("Measure") >>> to >>> open a Postgres connection, the other "Signal" job does not rely on >>> Postgres. Logs attached in taskexecutor.out >>> - This occurs in the open() of our custom RichSinkFunction >>> "PGSink", where the exception is caught and we call >>> Runtime.getRuntime().exit(1) >>> - Moments before the issue starts (at 2021-09-05 03:19:19) we >>> observe the TaskManager canceling the previous instance of this job, >>> as >>> indicated in taskexecutor.log >>> - This corresponds with logs in standalonesession.log of "Trying >>> to recover from a global failure...FlinkRuntimeException: Exceeded >>> checkpoint tolerable failure threshold" >>> - standalonesession.log indicates this exception happens >>> multiple times before this event but in the earlier cases >>> automatic recover >>> seems to work correctly. >>> - Moments after the issue starts (at 2021-09-05 03:20:37) in >>> standalonesession.log we see another occurrence of "Exceeded >>> checkpoint tolerable failure threshold" with considerably >>> different following logs than the previous. >>> - At about this time and following we observe logs in >>> standalongsession.log reporting "akka...NettyTransport...Remote >>> connection to [null] failed with java.net.ConnectException: >>> Connection >>> refused: /172.30.172.30:40763" repeatedly >>> - In standalonesession.log we then see one occurrence of >>> "...TimeoutException: >>> Heartbeat of TaskManager with id 172.30.172.30:40763-75a3ae timed >>> out" (at 2021-09-05 03:21:19) - this exception never appears to >>> be repeated. >>> - Immediately following that exception the repeating failure loop >>> seems to begin: >>> - The first log of the loop, in standalonesession.log, appears >>> to be "Discarding the results produced by task execution >>> 905e1..." 905e1... corresponds with the "TimeoutException: >>> Heartbeat..." exception >>> - That last log before repeating is "...CompletionException: >>> ...NoResourceAvailableException: Could not acquire the minimum >>> required >>> resources", and this line will be followed "Discarding the >>> results..." with the hash id matching this exception's. >>> - This loop then repeats indefinitely. >>> - During the issue, $ flink list on each machine reports the >>> following: >>> ------------------ Running/Restarting Jobs ------------------- >>> 04.09.2021 16:03:39 : 3143a6c3ab0a5f58130384c13b2eca45 : >>> FlinkSignalProcessor (RESTARTING) >>> 04.09.2021 16:45:08 : 4d531101945f0726432f73f37a38ee48 : >>> FlinkMeasureProcessor (RESTARTING) >>> >>> >>> - When we observe the issue >>> - On each deployment we use Terraform to deploy a cluster of >>> "funnel" machines on AWS EC2, each running two Flink jobs, >>> "...Measure..." >>> and "...Signal...", that pull from some upstream RabbitMQ queue and >>> output >>> into our Postgres db and Redis cache respectively. >>> - After deployment we always observe these jobs are functioning >>> properly in the RUNNING state on each machine, but some time the night >>> after deployment we observe that the jobs become nonfunctional, >>> reporting >>> RESTARTING indefinitely. >>> - This appears to occur to both jobs on every machine in the >>> cluster simultaneously, implying some currently unknown root cause. >>> - Stopping the jobs and jobmanager on each machine, and >>> restarting them, reliably fixes the issue and it does not occur again >>> after >>> that first night following deployment. >>> >>>