Hi Dhanesh, Thanks for the recommendation! I'll try it out.
On Wed, Apr 7, 2021 at 1:59 AM dhanesh arole <davcdhane...@gmail.com> wrote: > Hi Yaroslav, > > We faced similar issues in our large stateful stream processing job. I had > asked question > <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-StateBuilder-unexpected-exception-td42397.html> > about it > on a user mailing list a few days back. Based on the reply to my question, > we figured that this happens when the task manager has just come back > online and is trying to rebuild / restore its state, but meanwhile another > task manager gets restarted or killed. In this situation job manager > cancels the job, as a result all task managers also start cancelling the > tasks that they are running atm. As a part of cancellation flow, channel > buffer through which flink TM writes to the disk gets closed. But there's > already state rebuilding happening concurrently using that channelBuffer. > This causes the channelClosed exception. > > As a solution to this problem, we increased *akka.ask.timeout *to 10m. > This gives enough room to task managers to wait for rpc responses from > other task managers during restart. As a result TM becomes more lenient in > marking other TM as failed and cancelling the job in the first place. > > - > Dhanesh Arole > > > > On Tue, Apr 6, 2021 at 7:55 PM Robert Metzger <rmetz...@apache.org> wrote: > >> Hey Yaroslav, >> >> GCS is a somewhat popular filesystem that should work fine with Flink. >> >> It seems that the initial scale of a bucket is 5000 read requests per >> second (https://cloud.google.com/storage/docs/request-rate), your job >> should be at roughly the same rate (depending on how fast your job restarts >> in the restart loop). >> >> You could try to tweak the GCS configuration parameters, such as >> increasing "fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see >> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md >> for all available options) >> >> >> The "ExecutionGraphException: The execution attempt >> 6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it >> should not cause the restarts. >> >> >> On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma <guowei....@gmail.com> wrote: >> >>> Hi, Yaroslav >>> >>> AFAIK Flink does not retry if the download checkpoint from the storage >>> fails. On the other hand the FileSystem already has this retry mechanism >>> already. So I think there is no need for flink to retry. >>> I am not very sure but from the log it seems that the gfs's retry is >>> interrupted by some reason. So I think we could get more insight if we >>> could find the first fail cause. >>> >>> Best, >>> Guowei >>> >>> >>> On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko < >>> yaroslav.tkache...@shopify.com> wrote: >>> >>>> Hi Guowei, >>>> >>>> I thought Flink can support any HDFS-compatible object store like the >>>> majority of Big Data frameworks. So we just added >>>> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2" >>>> dependencies to the classpath, after that using "gs" prefix seems to be >>>> possible: >>>> >>>> state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints >>>> state.savepoints.dir: gs://<REDACTED>/flink-savepoints >>>> >>>> And yes, I noticed that retries logging too, but I'm not sure if it's >>>> implemented on the Flink side or the GCS connector side? Probably need to >>>> dive deeper into the source code. And if it's implemented on the GCS >>>> connector side, will Flink wait for all the retries? That's why I asked >>>> about the potential timeout on the Flink side. >>>> >>>> The JM log doesn't have much besides from what I already posted. It's >>>> hard for me to share the whole log, but the RocksDB initialization part can >>>> be relevant: >>>> >>>> 16:03:41.987 [cluster-io-thread-3] INFO >>>> org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to >>>> configure application-defined state backend: >>>> RocksDBStateBackend{checkpointStreamBackend=File State Backend >>>> (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: >>>> 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: >>>> 1048576), localRocksDbDirectories=[/rocksdb], >>>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, >>>> writeBatchSize=2097152} >>>> 16:03:41.988 [cluster-io-thread-3] INFO >>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using >>>> predefined options: FLASH_SSD_OPTIMIZED. >>>> 16:03:41.988 [cluster-io-thread-3] INFO >>>> org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using >>>> application-defined options factory: >>>> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4, >>>> state.backend.rocksdb.block.blocksize=16 kb, >>>> state.backend.rocksdb.block.cache-size=64 mb}}. >>>> 16:03:41.988 [cluster-io-thread-3] INFO >>>> org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined >>>> state backend: RocksDBStateBackend{checkpointStreamBackend=File State >>>> Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints: >>>> 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold: >>>> 1048576), localRocksDbDirectories=[/rocksdb], >>>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4, >>>> writeBatchSize=2097152} >>>> >>>> Thanks! >>>> >>>> On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <guowei....@gmail.com> wrote: >>>> >>>>> Hi, Yaroslav >>>>> >>>>> AFAIK there is no official GCS FileSystem support in FLINK. Does the >>>>> GCS is implemented by yourself? >>>>> Would you like to share the whole log of jm? >>>>> >>>>> BTW: From the following log I think the implementation has already >>>>> some retry mechanism. >>>>> >>> Interrupted while sleeping before retry. Giving up after 1/10 >>>>> retries for >>>>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d >>>>> >>>>> Best, >>>>> Guowei >>>>> >>>>> >>>>> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko < >>>>> yaroslav.tkache...@shopify.com> wrote: >>>>> >>>>>> Hi everyone, >>>>>> >>>>>> I'm wondering if people have experienced issues with Taskmanager >>>>>> failure recovery when dealing with a lot of state. >>>>>> >>>>>> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for >>>>>> savepoints and checkpoints. ~150 task managers with 4 slots each. >>>>>> >>>>>> When I run a pipeline without much state and kill one of the >>>>>> taskmanagers, it takes a few minutes to recover (I see a few restarts), >>>>>> but >>>>>> eventually when a new replacement taskmanager is registered with the >>>>>> jobmanager things go back to healthy. >>>>>> >>>>>> But when I run a pipeline with a lot of state (1TB+) and kill one of >>>>>> the taskmanagers, the pipeline never recovers, even after the replacement >>>>>> taskmanager has joined. It just enters an infinite loop of restarts and >>>>>> failures. >>>>>> >>>>>> On the jobmanager, I see an endless loop of state >>>>>> transitions: RUNNING -> CANCELING -> CANCELED -> CREATED -> SCHEDULED >>>>>> -> DEPLOYING -> RUNNING. It stays in RUNNING for a few seconds, but then >>>>>> transitions into FAILED with a message like this: >>>>>> >>>>>> >>>>>> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO >>>>>> org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED> >>>>>> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to >>>>>> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357). >>>>>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException: >>>>>> readAddress(..) failed: Connection reset by peer (connection to ' >>>>>> 10.30.10.53/10.30.10.53:45789') >>>>>> at >>>>>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173) >>>>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>>>>> ... >>>>>> Caused by: >>>>>> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException: >>>>>> readAddress(..) failed: Connection reset by peer >>>>>> >>>>>> >>>>>> Which, I guess, means a failed Taskmanager. And since there are not >>>>>> enough task slots to run it goes into this endless loop again. It's never >>>>>> the same Taskmanager that fails. >>>>>> >>>>>> >>>>>> >>>>>> On the Taskmanager side, things look more interesting. I see a >>>>>> variety of exceptions: >>>>>> >>>>>> >>>>>> org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7 >>>>>> (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED. >>>>>> org.apache.flink.runtime.jobmaster.ExecutionGraphException: The >>>>>> execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found. >>>>>> >>>>>> >>>>>> also >>>>>> >>>>>> >>>>>> WARNING: Failed read retry #1/10 for >>>>>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'. >>>>>> Sleeping... >>>>>> java.nio.channels.ClosedByInterruptException >>>>>> at >>>>>> java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown >>>>>> Source) >>>>>> at >>>>>> java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown >>>>>> Source) >>>>>> at >>>>>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313) >>>>>> at >>>>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118) >>>>>> at java.base/java.io.DataInputStream.read(Unknown Source) >>>>>> at >>>>>> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94) >>>>>> at java.base/java.io.InputStream.read(Unknown Source) >>>>>> at >>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135) >>>>>> ... >>>>>> >>>>>> >>>>>> and >>>>>> >>>>>> >>>>>> SEVERE: Interrupted while sleeping before retry. Giving up after 1/10 >>>>>> retries for >>>>>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c' >>>>>> 20:52:46.894 [<REDACTED> (141/624)#7] ERROR >>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder >>>>>> - >>>>>> Caught unexpected exception. >>>>>> java.nio.channels.ClosedChannelException: null >>>>>> at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?] >>>>>> at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?] >>>>>> at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?] >>>>>> at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?] >>>>>> at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?] >>>>>> at >>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140) >>>>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>>>>> >>>>>> >>>>>> also >>>>>> >>>>>> >>>>>> 20:52:46.895 [<REDACTED> (141/624)#7] WARN >>>>>> org.apache.flink.streaming.api.operators.BackendRestorerProcedure - >>>>>> Exception while restoring keyed state backend for >>>>>> KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from >>>>>> alternative (1/1), will retry while more alternatives are available. >>>>>> org.apache.flink.runtime.state.BackendBuildingException: Caught >>>>>> unexpected exception. >>>>>> at >>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328) >>>>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>>>>> ... >>>>>> >>>>>> >>>>>> and a few of >>>>>> >>>>>> >>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to >>>>>> download data for state handles. >>>>>> at >>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92) >>>>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0] >>>>>> ... >>>>>> Caused by: java.util.concurrent.ExecutionException: >>>>>> java.lang.RuntimeException: no bytes written >>>>>> >>>>>> >>>>>> >>>>>> Has anyone seen behaviour like this? >>>>>> >>>>>> >>>>>> My current theory: because it needs to download a lot of state from >>>>>> GCS the pipeline probably experiences some sort of GCS back-off issue >>>>>> (150 >>>>>> taskmanager x 4 slots, also 4 >>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many >>>>>> read requests to the same GCS prefix? And I guess it doesn't finish in >>>>>> the >>>>>> time that's expected and randomly fails. Maybe there is some kind of >>>>>> timeout value I can tweak? So downloading from GCS can take time that's >>>>>> necessary without failing prematurely. >>>>>> >>>>>> Any help is very appreciated! >>>>>> >>>>>> >>>>>> >>>>>>