Hi, Yaroslav AFAIK Flink does not retry if the download checkpoint from the storage fails. On the other hand the FileSystem already has this retry mechanism already. So I think there is no need for flink to retry. I am not very sure but from the log it seems that the gfs's retry is interrupted by some reason. So I think we could get more insight if we could find the first fail cause.
Best, Guowei On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko < yaroslav.tkache...@shopify.com> wrote: > Hi Guowei, > > I thought Flink can support any HDFS-compatible object store like the > majority of Big Data frameworks. So we just added > "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2" > dependencies to the classpath, after that using "gs" prefix seems to be > possible: > > state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints > state.savepoints.dir: gs://<REDACTED>/flink-savepoints > > And yes, I noticed that retries logging too, but I'm not sure if it's > implemented on the Flink side or the GCS connector side? Probably need to > dive deeper into the source code. And if it's implemented on the GCS > connector side, will Flink wait for all the retries? That's why I asked > about the potential timeout on the Flink side. > > The JM log doesn't have much besides from what I already posted. It's hard > for me to share the whole log, but the RocksDB initialization part can be > relevant: > > 16:03:41.987 [cluster-io-thread-3] INFO > org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to > configure application-defined state backend: > RocksDBStateBackend{checkpointStreamBackend=File State Backend > (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: > 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: > 1048576), localRocksDbDirectories=[/rocksdb], > enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, > writeBatchSize=2097152} > 16:03:41.988 [cluster-io-thread-3] INFO > org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using > predefined options: FLASH_SSD_OPTIMIZED. > 16:03:41.988 [cluster-io-thread-3] INFO > org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using > application-defined options factory: > DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4, > state.backend.rocksdb.block.blocksize=16 kb, > state.backend.rocksdb.block.cache-size=64 mb}}. > 16:03:41.988 [cluster-io-thread-3] INFO > org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined > state backend: RocksDBStateBackend{checkpointStreamBackend=File State > Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: > 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: > 1048576), localRocksDbDirectories=[/rocksdb], > enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, > writeBatchSize=2097152} > > Thanks! > > On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <guowei....@gmail.com> wrote: > >> Hi, Yaroslav >> >> AFAIK there is no official GCS FileSystem support in FLINK. Does the GCS >> is implemented by yourself? >> Would you like to share the whole log of jm? >> >> BTW: From the following log I think the implementation has already some >> retry mechanism. >> >>> Interrupted while sleeping before retry. Giving up after 1/10 retries >> for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d >> >> Best, >> Guowei >> >> >> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko < >> yaroslav.tkache...@shopify.com> wrote: >> >>> Hi everyone, >>> >>> I'm wondering if people have experienced issues with Taskmanager failure >>> recovery when dealing with a lot of state. >>> >>> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints >>> and checkpoints. ~150 task managers with 4 slots each. >>> >>> When I run a pipeline without much state and kill one of the >>> taskmanagers, it takes a few minutes to recover (I see a few restarts), but >>> eventually when a new replacement taskmanager is registered with the >>> jobmanager things go back to healthy. >>> >>> But when I run a pipeline with a lot of state (1TB+) and kill one of the >>> taskmanagers, the pipeline never recovers, even after the replacement >>> taskmanager has joined. It just enters an infinite loop of restarts and >>> failures. >>> >>> On the jobmanager, I see an endless loop of state transitions: RUNNING >>> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. >>> It stays in RUNNING for a few seconds, but then transitions into FAILED >>> with a message like this: >>> >>> >>> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO >>> org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> >>> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to >>> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357). >>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: >>> readAddress(..) failed: Connection reset by peer (connection to ' >>> 10.30.10.53/10.30.10.53:45789') >>> at >>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) >>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>> ... >>> Caused by: >>> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: >>> readAddress(..) failed: Connection reset by peer >>> >>> >>> Which, I guess, means a failed Taskmanager. And since there are not >>> enough task slots to run it goes into this endless loop again. It's never >>> the same Taskmanager that fails. >>> >>> >>> >>> On the Taskmanager side, things look more interesting. I see a variety >>> of exceptions: >>> >>> >>> org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 >>> (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED. >>> org.apache.flink.runtime.jobmaster.ExecutionGraphException: The >>> execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found. >>> >>> >>> also >>> >>> >>> WARNING: Failed read retry #1/10 for >>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. >>> Sleeping... >>> java.nio.channels.ClosedByInterruptException >>> at >>> java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown >>> Source) >>> at >>> java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown >>> Source) >>> at >>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313) >>> at >>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118) >>> at java.base/java.io.DataInputStream.read(Unknown Source) >>> at >>> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94) >>> at java.base/java.io.InputStream.read(Unknown Source) >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135) >>> ... >>> >>> >>> and >>> >>> >>> SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 >>> retries for >>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c' >>> 20:52:46.894 [<REDACTED> (141/624)#7] ERROR >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - >>> Caught unexpected exception. >>> java.nio.channels.ClosedChannelException: null >>> at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?] >>> at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?] >>> at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?] >>> at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?] >>> at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?] >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) >>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>> >>> >>> also >>> >>> >>> 20:52:46.895 [<REDACTED> (141/624)#7] WARN >>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure - >>> Exception while restoring keyed state backend for >>> KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from >>> alternative (1/1), will retry while more alternatives are available. >>> org.apache.flink.runtime.state.BackendBuildingException: Caught >>> unexpected exception. >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) >>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>> ... >>> >>> >>> and a few of >>> >>> >>> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to >>> download data for state handles. >>> at >>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) >>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>> ... >>> Caused by: java.util.concurrent.ExecutionException: >>> java.lang.RuntimeException: no bytes written >>> >>> >>> >>> Has anyone seen behaviour like this? >>> >>> >>> My current theory: because it needs to download a lot of state from GCS >>> the pipeline probably experiences some sort of GCS back-off issue (150 >>> taskmanager x 4 slots, also 4 >>> state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many >>> read requests to the same GCS prefix? And I guess it doesn't finish in the >>> time that's expected and randomly fails. Maybe there is some kind of >>> timeout value I can tweak? So downloading from GCS can take time that's >>> necessary without failing prematurely. >>> >>> Any help is very appreciated! >>> >>> >>> >>>