Hey Yaroslav, GCS is a somewhat popular filesystem that should work fine with Flink.
It seems that the initial scale of a bucket is 5000 read requests per second (https://cloud.google.com/storage/docs/request-rate), your job should be at roughly the same rate (depending on how fast your job restarts in the restart loop). You could try to tweak the GCS configuration parameters, such as increasing "fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md for all available options) The "ExecutionGraphException: The execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it should not cause the restarts. On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma <guowei....@gmail.com> wrote: > Hi, Yaroslav > > AFAIK Flink does not retry if the download checkpoint from the storage > fails. On the other hand the FileSystem already has this retry mechanism > already. So I think there is no need for flink to retry. > I am not very sure but from the log it seems that the gfs's retry is > interrupted by some reason. So I think we could get more insight if we > could find the first fail cause. > > Best, > Guowei > > > On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko < > yaroslav.tkache...@shopify.com> wrote: > >> Hi Guowei, >> >> I thought Flink can support any HDFS-compatible object store like the >> majority of Big Data frameworks. So we just added >> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2" >> dependencies to the classpath, after that using "gs" prefix seems to be >> possible: >> >> state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints >> state.savepoints.dir: gs://<REDACTED>/flink-savepoints >> >> And yes, I noticed that retries logging too, but I'm not sure if it's >> implemented on the Flink side or the GCS connector side? Probably need to >> dive deeper into the source code. And if it's implemented on the GCS >> connector side, will Flink wait for all the retries? That's why I asked >> about the potential timeout on the Flink side. >> >> The JM log doesn't have much besides from what I already posted. It's >> hard for me to share the whole log, but the RocksDB initialization part can >> be relevant: >> >> 16:03:41.987 [cluster-io-thread-3] INFO >> org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to >> configure application-defined state backend: >> RocksDBStateBackend{checkpointStreamBackend=File State Backend >> (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: >> 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: >> 1048576), localRocksDbDirectories=[/rocksdb], >> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, >> writeBatchSize=2097152} >> 16:03:41.988 [cluster-io-thread-3] INFO >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using >> predefined options: FLASH_SSD_OPTIMIZED. >> 16:03:41.988 [cluster-io-thread-3] INFO >> org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using >> application-defined options factory: >> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4, >> state.backend.rocksdb.block.blocksize=16 kb, >> state.backend.rocksdb.block.cache-size=64 mb}}. >> 16:03:41.988 [cluster-io-thread-3] INFO >> org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined >> state backend: RocksDBStateBackend{checkpointStreamBackend=File State >> Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: >> 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: >> 1048576), localRocksDbDirectories=[/rocksdb], >> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, >> writeBatchSize=2097152} >> >> Thanks! >> >> On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi, Yaroslav >>> >>> AFAIK there is no official GCS FileSystem support in FLINK. Does the >>> GCS is implemented by yourself? >>> Would you like to share the whole log of jm? >>> >>> BTW: From the following log I think the implementation has already some >>> retry mechanism. >>> >>> Interrupted while sleeping before retry. Giving up after 1/10 >>> retries for >>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d >>> >>> Best, >>> Guowei >>> >>> >>> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko < >>> yaroslav.tkache...@shopify.com> wrote: >>> >>>> Hi everyone, >>>> >>>> I'm wondering if people have experienced issues with Taskmanager >>>> failure recovery when dealing with a lot of state. >>>> >>>> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints >>>> and checkpoints. ~150 task managers with 4 slots each. >>>> >>>> When I run a pipeline without much state and kill one of the >>>> taskmanagers, it takes a few minutes to recover (I see a few restarts), but >>>> eventually when a new replacement taskmanager is registered with the >>>> jobmanager things go back to healthy. >>>> >>>> But when I run a pipeline with a lot of state (1TB+) and kill one of >>>> the taskmanagers, the pipeline never recovers, even after the replacement >>>> taskmanager has joined. It just enters an infinite loop of restarts and >>>> failures. >>>> >>>> On the jobmanager, I see an endless loop of state transitions: RUNNING >>>> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING. >>>> It stays in RUNNING for a few seconds, but then transitions into FAILED >>>> with a message like this: >>>> >>>> >>>> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO >>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> >>>> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to >>>> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357). >>>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: >>>> readAddress(..) failed: Connection reset by peer (connection to ' >>>> 10.30.10.53/10.30.10.53:45789') >>>> at >>>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) >>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>>> ... >>>> Caused by: >>>> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: >>>> readAddress(..) failed: Connection reset by peer >>>> >>>> >>>> Which, I guess, means a failed Taskmanager. And since there are not >>>> enough task slots to run it goes into this endless loop again. It's never >>>> the same Taskmanager that fails. >>>> >>>> >>>> >>>> On the Taskmanager side, things look more interesting. I see a variety >>>> of exceptions: >>>> >>>> >>>> org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 >>>> (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED. >>>> org.apache.flink.runtime.jobmaster.ExecutionGraphException: The >>>> execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found. >>>> >>>> >>>> also >>>> >>>> >>>> WARNING: Failed read retry #1/10 for >>>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. >>>> Sleeping... >>>> java.nio.channels.ClosedByInterruptException >>>> at >>>> java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown >>>> Source) >>>> at >>>> java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown >>>> Source) >>>> at >>>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313) >>>> at >>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118) >>>> at java.base/java.io.DataInputStream.read(Unknown Source) >>>> at >>>> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94) >>>> at java.base/java.io.InputStream.read(Unknown Source) >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135) >>>> ... >>>> >>>> >>>> and >>>> >>>> >>>> SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 >>>> retries for >>>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c' >>>> 20:52:46.894 [<REDACTED> (141/624)#7] ERROR >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder - >>>> Caught unexpected exception. >>>> java.nio.channels.ClosedChannelException: null >>>> at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?] >>>> at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?] >>>> at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?] >>>> at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?] >>>> at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?] >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) >>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>>> >>>> >>>> also >>>> >>>> >>>> 20:52:46.895 [<REDACTED> (141/624)#7] WARN >>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure - >>>> Exception while restoring keyed state backend for >>>> KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from >>>> alternative (1/1), will retry while more alternatives are available. >>>> org.apache.flink.runtime.state.BackendBuildingException: Caught >>>> unexpected exception. >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) >>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>>> ... >>>> >>>> >>>> and a few of >>>> >>>> >>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to >>>> download data for state handles. >>>> at >>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) >>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>>> ... >>>> Caused by: java.util.concurrent.ExecutionException: >>>> java.lang.RuntimeException: no bytes written >>>> >>>> >>>> >>>> Has anyone seen behaviour like this? >>>> >>>> >>>> My current theory: because it needs to download a lot of state from GCS >>>> the pipeline probably experiences some sort of GCS back-off issue (150 >>>> taskmanager x 4 slots, also 4 >>>> state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many >>>> read requests to the same GCS prefix? And I guess it doesn't finish in the >>>> time that's expected and randomly fails. Maybe there is some kind of >>>> timeout value I can tweak? So downloading from GCS can take time that's >>>> necessary without failing prematurely. >>>> >>>> Any help is very appreciated! >>>> >>>> >>>> >>>>