Hi everyone, I'm wondering if people have experienced issues with Taskmanager failure recovery when dealing with a lot of state.
I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and checkpoints. ~150 task managers with 4 slots each. When I run a pipeline without much state and kill one of the taskmanagers, it takes a few minutes to recover (I see a few restarts), but eventually when a new replacement taskmanager is registered with the jobmanager things go back to healthy. But when I run a pipeline with a lot of state (1TB+) and kill one of the taskmanagers, the pipeline never recovers, even after the replacement taskmanager has joined. It just enters an infinite loop of restarts and failures. On the jobmanager, I see an endless loop of state transitions: RUNNING -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. It stays in RUNNING for a few seconds, but then transitions into FAILED with a message like this: 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357). org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: readAddress(..) failed: Connection reset by peer (connection to ' 10.30.10.53/10.30.10.53:45789') at org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... Caused by: org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: readAddress(..) failed: Connection reset by peer Which, I guess, means a failed Taskmanager. And since there are not enough task slots to run it goes into this endless loop again. It's never the same Taskmanager that fails. On the Taskmanager side, things look more interesting. I see a variety of exceptions: org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED. org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found. also WARNING: Failed read retry #1/10 for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. Sleeping... java.nio.channels.ClosedByInterruptException at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown Source) at java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown Source) at com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313) at com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118) at java.base/java.io.DataInputStream.read(Unknown Source) at org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94) at java.base/java.io.InputStream.read(Unknown Source) at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135) ... and SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c' 20:52:46.894 [<REDACTED> (141/624)#7] ERROR org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - Caught unexpected exception. java.nio.channels.ClosedChannelException: null at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?] at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?] at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?] at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?] at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?] at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) ~[flink-dist_2.12-1.12.0.jar:1.12.0] also 20:52:46.895 [<REDACTED> (141/624)#7] WARN org.apache.flink.streaming.api.operators.BackendRestorerProcedure - Exception while restoring keyed state backend for KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from alternative (1/1), will retry while more alternatives are available. org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected exception. at org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... and a few of Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download data for state handles. at org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) ~[flink-dist_2.12-1.12.0.jar:1.12.0] ... Caused by: java.util.concurrent.ExecutionException: java.lang.RuntimeException: no bytes written Has anyone seen behaviour like this? My current theory: because it needs to download a lot of state from GCS the pipeline probably experiences some sort of GCS back-off issue (150 taskmanager x 4 slots, also 4 state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many read requests to the same GCS prefix? And I guess it doesn't finish in the time that's expected and randomly fails. Maybe there is some kind of timeout value I can tweak? So downloading from GCS can take time that's necessary without failing prematurely. Any help is very appreciated!