I've got a flink (1.8.0, emr-5.26) streaming job running on yarn. It's configured to use rocksdb, and checkpoint once a minute to hdfs. This job operates just fine for around 20 days, and then begins failing with this exception (it fails, restarts, and fails again, repeatedly):
2020-04-15 13:15:02,920 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60. 2020-04-15 13:15:05,762 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes in 2667 ms). 2020-04-15 13:16:02,919 INFO org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60. 2020-04-15 13:16:03,147 INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from RUNNING to FAILED. AsynchronousException{java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2).} at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.Exception: Could not materialize checkpoint 32702 for operator <operator_name> (1/2). at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) ... 6 more Caused by: java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) at org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) ... 5 more Caused by: java.lang.IllegalArgumentException at org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) at org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138) at org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) at org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) ... 7 more This application configured to retain external checkpoints. When I attempt to restart from the last successful checkpoint, it will fail with the same error on the first checkpoint that happens after the restart. I haven't been able to find out why this might be. The source code doesn't seem particularly informative to my eyes: https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68 Has anyone else seen anything like this?