Hi, Yaroslav AFAIK there is no official GCS FileSystem support in FLINK. Does the GCS is implemented by yourself? Would you like to share the whole log of jm?
BTW: From the following log I think the implementation has already some retry mechanism. >>> Interrupted while sleeping before retry. Giving up after 1/10 retries for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d Best, Guowei On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko < yaroslav.tkache...@shopify.com> wrote: > Hi everyone, > > I'm wondering if people have experienced issues with Taskmanager failure > recovery when dealing with a lot of state. > > I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints > and checkpoints. ~150 task managers with 4 slots each. > > When I run a pipeline without much state and kill one of the > taskmanagers, it takes a few minutes to recover (I see a few restarts), but > eventually when a new replacement taskmanager is registered with the > jobmanager things go back to healthy. > > But when I run a pipeline with a lot of state (1TB+) and kill one of the > taskmanagers, the pipeline never recovers, even after the replacement > taskmanager has joined. It just enters an infinite loop of restarts and > failures. > > On the jobmanager, I see an endless loop of state transitions: RUNNING > -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. > It stays in RUNNING for a few seconds, but then transitions into FAILED > with a message like this: > > > 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> > (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to > FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357). > org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: > readAddress(..) failed: Connection reset by peer (connection to ' > 10.30.10.53/10.30.10.53:45789') > at > org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] > ... > Caused by: > org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: > readAddress(..) failed: Connection reset by peer > > > Which, I guess, means a failed Taskmanager. And since there are not enough > task slots to run it goes into this endless loop again. It's never the same > Taskmanager that fails. > > > > On the Taskmanager side, things look more interesting. I see a variety of > exceptions: > > > org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 > (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED. > org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution > attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found. > > > also > > > WARNING: Failed read retry #1/10 for > 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. > Sleeping... > java.nio.channels.ClosedByInterruptException > at > java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown > Source) > at > java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown > Source) > at > com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313) > at > com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118) > at java.base/java.io.DataInputStream.read(Unknown Source) > at > org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94) > at java.base/java.io.InputStream.read(Unknown Source) > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135) > ... > > > and > > > SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 > retries for > 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c' > 20:52:46.894 [<REDACTED> (141/624)#7] ERROR > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - > Caught unexpected exception. > java.nio.channels.ClosedChannelException: null > at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?] > at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?] > at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?] > at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?] > at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?] > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] > > > also > > > 20:52:46.895 [<REDACTED> (141/624)#7] WARN > org.apache.flink.streaming.api.operators.BackendRestorerProcedure - > Exception while restoring keyed state backend for > KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from > alternative (1/1), will retry while more alternatives are available. > org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected > exception. > at > org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] > ... > > > and a few of > > > Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download > data for state handles. > at > org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) > ~[flink-dist_2.12-1.12.0.jar:1.12.0] > ... > Caused by: java.util.concurrent.ExecutionException: > java.lang.RuntimeException: no bytes written > > > > Has anyone seen behaviour like this? > > > My current theory: because it needs to download a lot of state from GCS > the pipeline probably experiences some sort of GCS back-off issue (150 > taskmanager x 4 slots, also 4 > state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many > read requests to the same GCS prefix? And I guess it doesn't finish in the > time that's expected and randomly fails. Maybe there is some kind of > timeout value I can tweak? So downloading from GCS can take time that's > necessary without failing prematurely. > > Any help is very appreciated! > > > >