Hello Hivemind,

We are running a stateful streaming job. Each task manager instance hosts
around ~100GB of data. During restart of task managers we encountered
following errors, because of which the job is not able to restart.
Initially we thought it might be due to failing status checks of attached
EBS volumes or burst balance exhaustion but AWS console is not indicating
any issue with EBS volumes. Is there anything that else that we need to
look at which can potentially cause this exception? Also it's quite unclear
what exactly is the cause of the exception, any help on that would be much
appreciated.

Flink version: 1.12.2_scala_2.11
Environment: Kubernetes on AWS
Volume Type: EBS, gp2 300GiB

*ERROR
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder []
- Caught unexpected exception.*
*java.nio.channels.ClosedChannelException: null*
* at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) ~[?:?]*
* at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
* at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
* at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
* at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
* at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
~[?:?]*
* at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]*
* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]*
* at java.lang.Thread.run(Thread.java:830) [?:?]*
*2021-03-19 15:26:10,385 WARN
 org.apache.flink.streaming.api.operators.BackendRestorerProcedure [] -
Exception while restoring keyed state backend for
KeyedCoProcessOperator_55a6c4a5d36b0124ad78cbf6bd864bba_(2/8) from
alternative (1/1), will retry while more alternatives are available.*
*org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
exception.*
* at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:362)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:587)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.contrib.streaming.state.RocksDBStateBackend.createKeyedStateBackend(RocksDBStateBackend.java:93)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$1(StreamTaskStateInitializerImpl.java:328)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.attemptCreateAndRestore(BackendRestorerProcedure.java:168)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.api.operators.BackendRestorerProcedure.createAndRestore(BackendRestorerProcedure.java:135)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.keyedStatedBackend(StreamTaskStateInitializerImpl.java:345)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:163)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:272)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:427)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$2(StreamTask.java:543)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:533)
[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:573)
[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:755)
[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at org.apache.flink.runtime.taskmanager.Task.run(Task.java:570)
[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at java.lang.Thread.run(Thread.java:830) [?:?]*
*Caused by: java.nio.channels.ClosedChannelException*
* at sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150) ~[?:?]*
* at sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:266) ~[?:?]*
* at java.nio.channels.Channels.writeFullyImpl(Channels.java:74) ~[?:?]*
* at java.nio.channels.Channels.writeFully(Channels.java:97) ~[?:?]*
* at java.nio.channels.Channels$1.write(Channels.java:172) ~[?:?]*
* at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:141)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.lambda$createDownloadRunnables$0(RocksDBStateDownloader.java:110)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
org.apache.flink.util.function.ThrowingRunnable.lambda$unchecked$0(ThrowingRunnable.java:49)
~[flink-dist_2.12-1.12.2.jar:1.12.2]*
* at
java.util.concurrent.CompletableFuture$AsyncRun.run(CompletableFuture.java:1807)
~[?:?]*
* at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
~[?:?]*
* at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
~[?:?]*


-
Dhanesh Arole

Reply via email to