Thanks for reporting and analyzing this issue Kelly. I think you are indeed
running into a Flink bug. I think the problem is the following: With Flink
1.12.0 [1] we introduced a throttling mechanism for discarding checkpoints.
The way it is implemented is that once a checkpoint is discarded it can
trigger another action. This is triggering another checkpoint in the
CheckpointCoordinator. The problem is now that we don't properly handle the
case when the CheckpointCoordinator has been stopped in the meantime (e.g.
if the job has reached a terminal state). That's why we see this
RejectedExecutionException which fails the job. This is definitely a bug
and I have created this issue [2] for fixing it. I am also pulling in Roman
who worked on this feature.

[1] https://issues.apache.org/jira/browse/FLINK-17073
[2] https://issues.apache.org/jira/browse/FLINK-20992

Cheers,
Till

On Wed, Jan 13, 2021 at 7:30 PM Kelly Smith <kell...@zillowgroup.com> wrote:

> Hi folks,
>
>
>
> I recently upgraded to Flink 1.12.0 and I’m hitting an issue where my JM
> is crashing while cancelling a job. This is causing Kubernetes readiness
> probes to fail, the JM to be restarted, and then get in a bad state while
> it tries to recover itself using ZK + a checkpoint which no longer exists.
>
>
>
> This is the only information being logged before the process exits:
>
>
>
>
>
>  *method*: uncaughtException
>    *msg*: FATAL: Thread 'cluster-io-thread-4' produced an uncaught
> exception. Stopping the process...
>    *pod*: dev-dsp-flink-canary-test-9fa6d3e7-jm-59884f579-w8r6x
>    *stack*: java.util.concurrent.RejectedExecutionException: Task
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@41554407
> rejected from 
> java.util.concurrent.ScheduledThreadPoolExecutor@5d0ec6f7[Terminated,
> pool size = 0, active threads = 0, queued tasks = 0, completed tasks =
> 25977] at
> java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
> at
> java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
> at
> java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
> at
> org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter.execute(ScheduledExecutorServiceAdapter.java:62)
> at
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.scheduleTriggerRequest(CheckpointCoordinator.java:1152)
> at
> org.apache.flink.runtime.checkpoint.CheckpointsCleaner.lambda$cleanCheckpoint$0(CheckpointsCleaner.java:58)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> at java.lang.Thread.run(Thread.java:748)
>
>
>
>
>
>
> https://github.com/apache/flink/blob/release-1.12.0/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java#L58
>
>
>
>
>
> I’m not sure how to debug this further, but it seems like an internal
> Flink bug?
>
>
>
> More info:
>
>
>    - Checkpoints are stored in S3 and I’m using the S3 connector
>    - Identical code has been running on Flink 1.11.x for months with no
>    issues
>
>
>
>
>
> Thanks,
>
> Kelly
>

Reply via email to