Correction. I've actually found a place where it potentially might be creating a new operator state per checkpoint: https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L91-L105 https://github.com/apache/beam/blob/4fc924a8193bb9495c6b7ba755ced576bb8a35d5/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/stableinput/BufferingDoFnRunner.java#L141-L149
This gives me something I can investigate locally at least. On Thu, Apr 16, 2020 at 9:03 AM Stephen Patel <merli...@gmail.com> wrote: > I can't say that I ever call that directly. The beam library that I'm > using does call it in a couple places: > https://github.com/apache/beam/blob/v2.14.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L422-L429 > > But it seems to be the same descriptor every time. Is that limit per > operator? That is, can each operator host up to 32767 operator/broadcast > states? I assume that's by name? > > On Wed, Apr 15, 2020 at 10:46 PM Yun Tang <myas...@live.com> wrote: > >> Hi Stephen >> >> This is not related with RocksDB but with default on-heap operator state >> backend. From your exception stack trace, you have created too many >> operator states (more than 32767). >> How do you call context.getOperatorStateStore().getListState or >> context.getOperatorStateStore().getBroadcastState ? Did you pass a >> different operator state descriptor each time? >> >> Best >> Yun Tang >> ------------------------------ >> *From:* Stephen Patel <merli...@gmail.com> >> *Sent:* Thursday, April 16, 2020 2:09 >> *To:* user@flink.apache.org <user@flink.apache.org> >> *Subject:* Streaming Job eventually begins failing during checkpointing >> >> I've got a flink (1.8.0, emr-5.26) streaming job running on yarn. It's >> configured to use rocksdb, and checkpoint once a minute to hdfs. This job >> operates just fine for around 20 days, and then begins failing with this >> exception (it fails, restarts, and fails again, repeatedly): >> >> 2020-04-15 13:15:02,920 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 32701 @ 1586956502911 for job 9953424f21e240112dd23ab4f8320b60. >> 2020-04-15 13:15:05,762 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed >> checkpoint 32701 for job 9953424f21e240112dd23ab4f8320b60 (795385496 bytes >> in 2667 ms). >> 2020-04-15 13:16:02,919 INFO >> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering >> checkpoint 32702 @ 1586956562911 for job 9953424f21e240112dd23ab4f8320b60. >> 2020-04-15 13:16:03,147 INFO >> org.apache.flink.runtime.executiongraph.ExecutionGraph - >> <operator_name> (1/2) (f4737add01961f8b42b8eb4e791b83ba) switched from >> RUNNING to FAILED. >> AsynchronousException{java.lang.Exception: Could not materialize >> checkpoint 32702 for operator <operator_name> (1/2).} >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointExceptionHandler.tryHandleCheckpointException(StreamTask.java:1153) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:947) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:884) >> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) >> at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) >> at java.lang.Thread.run(Thread.java:748) >> Caused by: java.lang.Exception: Could not materialize checkpoint 32702 >> for operator <operator_name> (1/2). >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.handleExecutionException(StreamTask.java:942) >> ... 6 more >> Caused by: java.util.concurrent.ExecutionException: >> java.lang.IllegalArgumentException >> at java.util.concurrent.FutureTask.report(FutureTask.java:122) >> at java.util.concurrent.FutureTask.get(FutureTask.java:192) >> at >> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:394) >> at >> org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer.<init>(OperatorSnapshotFinalizer.java:53) >> at >> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:853) >> ... 5 more >> Caused by: java.lang.IllegalArgumentException >> at >> org.apache.flink.util.Preconditions.checkArgument(Preconditions.java:123) >> at >> org.apache.flink.runtime.state.OperatorBackendSerializationProxy.<init>(OperatorBackendSerializationProxy.java:68) >> at >> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:138) >> at >> org.apache.flink.runtime.state.DefaultOperatorStateBackendSnapshotStrategy$1.callInternal(DefaultOperatorStateBackendSnapshotStrategy.java:108) >> at >> org.apache.flink.runtime.state.AsyncSnapshotCallable.call(AsyncSnapshotCallable.java:75) >> at java.util.concurrent.FutureTask.run(FutureTask.java:266) >> at >> org.apache.flink.runtime.concurrent.FutureUtils.runIfNotDoneAndGet(FutureUtils.java:391) >> ... 7 more >> >> This application configured to retain external checkpoints. When I >> attempt to restart from the last successful checkpoint, it will fail with >> the same error on the first checkpoint that happens after the restart. >> >> I haven't been able to find out why this might be. The source code >> doesn't seem particularly informative to my eyes: >> https://github.com/apache/flink/blob/release-1.8.0/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorBackendSerializationProxy.java#L68 >> >> Has anyone else seen anything like this? >> >