Hi, Yaroslav

AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS
is implemented by yourself?
Would you like to share the whole log of jm?

BTW: From the following log I think the implementation has already some
retry mechanism.
>>> Interrupted while sleeping before retry. Giving up after 1/10 retries
for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d

Best,
Guowei


On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
yaroslav.tkache...@shopify.com> wrote:

> Hi everyone,
>
> I'm wondering if people have experienced issues with Taskmanager failure
> recovery when dealing with a lot of state.
>
> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
> and checkpoints. ~150 task managers with 4 slots each.
>
> When I run a pipeline without much state and kill one of the
> taskmanagers, it takes a few minutes to recover (I see a few restarts), but
> eventually when a new replacement taskmanager is registered with the
> jobmanager things go back to healthy.
>
> But when I run a pipeline with a lot of state (1TB+) and kill one of the
> taskmanagers, the pipeline never recovers, even after the replacement
> taskmanager has joined. It just enters an infinite loop of restarts and
> failures.
>
> On the jobmanager, I see an endless loop of state transitions: RUNNING
> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
> It stays in RUNNING for a few seconds, but then transitions into FAILED
> with a message like this:
>
>
> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
>  org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED>
> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
> readAddress(..) failed: Connection reset by peer (connection to '
> 10.30.10.53/10.30.10.53:45789')
> at
> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ...
> Caused by:
> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
> readAddress(..) failed: Connection reset by peer
>
>
> Which, I guess, means a failed Taskmanager. And since there are not enough
> task slots to run it goes into this endless loop again. It's never the same
> Taskmanager that fails.
>
>
>
> On the Taskmanager side, things look more interesting. I see a variety of
> exceptions:
>
>
> org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7
> (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
> org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution
> attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.
>
>
> also
>
>
> WARNING: Failed read retry #1/10 for
> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'.
> Sleeping...
> java.nio.channels.ClosedByInterruptException
> at
> java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown
> Source)
> at
> java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown
> Source)
> at
> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
> at
> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
> at java.base/java.io.DataInputStream.read(Unknown Source)
> at
> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
> at java.base/java.io.InputStream.read(Unknown Source)
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
> ...
>
>
> and
>
>
> SEVERE: Interrupted while sleeping before retry. Giving up after 1/10
> retries for
> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
> 20:52:46.894 [<REDACTED> (141/624)#7] ERROR
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
> Caught unexpected exception.
> java.nio.channels.ClosedChannelException: null
> at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
> at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
> at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
> at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
> at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>
>
> also
>
>
> 20:52:46.895 [<REDACTED> (141/624)#7] WARN
>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
> Exception while restoring keyed state backend for
> KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from
> alternative (1/1), will retry while more alternatives are available.
> org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
> exception.
> at
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ...
>
>
> and a few of
>
>
> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download
> data for state handles.
> at
> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92)
> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
> ...
> Caused by: java.util.concurrent.ExecutionException:
> java.lang.RuntimeException: no bytes written
>
>
>
> Has anyone seen behaviour like this?
>
>
> My current theory: because it needs to download a lot of state from GCS
> the pipeline probably experiences some sort of GCS back-off issue (150
> taskmanager x 4 slots, also 4
> state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many
> read requests to the same GCS prefix? And I guess it doesn't finish in the
> time that's expected and randomly fails. Maybe there is some kind of
> timeout value I can tweak? So downloading from GCS can take time that's
> necessary without failing prematurely.
>
> Any help is very appreciated!
>
>
>
>

Reply via email to