Hi Matthias, Thanks for taking to help us with this.
You are right there were lots of task cancellations, as this exception causes the job to get restarted, triggering cancellations. - Dhanesh Arole On Tue, Mar 23, 2021 at 9:27 AM Matthias Pohl <matth...@ververica.com> wrote: > Hi Danesh, > thanks for reaching out to the Flink community. Checking the code, it > looks like the OutputStream is added to a CloseableRegistry before writing > to it [1]. > > My suspicion is - based on the exception cause - that the > CloseableRegistry got triggered while restoring the state. I tried to track > down the source of the CloseableRegistry. It looks like it's handed down > from the StreamTask [2]. > > The StreamTask closes the CloseableRegistry either when cancelling is > triggered or in the class' finalize method. Have you checked the logs to > see whether there was some task cancellation logged? > > Best, > Matthias > > [1] > https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloader.java#L132 > [2] > https://github.com/apache/flink/blob/c6997c97c575d334679915c328792b8a3067cfb5/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java#L269 > > On Fri, Mar 19, 2021 at 5:07 PM dhanesh arole <davcdhane...@gmail.com> > wrote: > >> Hello Hivemind, >> >> We are running a stateful streaming job. Each task manager instance hosts >> around ~100GB of data. During restart of task managers we encountered >> following errors, because of which the job is not able to restart. >> Initially we thought it might be due to failing status checks of attached >> EBS volumes or burst balance exhaustion but AWS console is not indicating >> any issue with EBS volumes. Is there anything that else that we need to >> look at which can potentially cause this exception? Also it's quite unclear >> what exactly is the cause of the exception, any help on that would be much >> appreciated. >> >> Flink version: 1.12.2_scala_2.11 >> Environment: Kubernetes on AWS >> Volume Type: EBS, gp2 300GiB >> >> *ERROR >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder [] >> - Caught unexpected exception.* >> *java.nio.channels.ClosedChannelException: null* >> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) >> ~[?:?]* >> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]* >> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]* >> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]* >> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]* >> * at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) >> ~[?:?]* >> * at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> ~[?:?]* >> * at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> ~[?:?]* >> * at java.lang.Thread.run(Thread.java:830) [?:?]* >> *2021-03-19 15:26:10,385 WARN >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] - >> Exception while restoring keyed state backend for >> KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from >> alternative (1/1), will retry while more alternatives are available.* >> *org.apache.flink.runtime.state.BackendBuildingException: Caught >> unexpected exception.* >> * at >> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50) >> [flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533) >> [flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573) >> [flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755) >> [flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570) >> [flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at java.lang.Thread.run(Thread.java:830) [?:?]* >> *Caused by: java.nio.channels.ClosedChannelException* >> * at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) >> ~[?:?]* >> * at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]* >> * at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]* >> * at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]* >> * at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]* >> * at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49) >> ~[flink-dist_2.12-1.12.2.jar:1.12.2]* >> * at >> java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807) >> ~[?:?]* >> * at >> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) >> ~[?:?]* >> * at >> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) >> ~[?:?]* >> >> >> - >> Dhanesh Arole >> >>