Hi, Yaroslav

AFAIK Flink does not retry if the download checkpoint from the storage
fails. On the other hand the FileSystem already has this retry mechanism
already. So I think there is no need for flink to retry.
I am not very sure but from the log it seems that the gfs's retry is
interrupted by some reason. So I think we could get more insight if we
could find the first fail cause.

Best,
Guowei


On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
yaroslav.tkache...@shopify.com> wrote:

> Hi Guowei,
>
> I thought Flink can support any HDFS-compatible object store like the
> majority of Big Data frameworks. So we just added
> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
> dependencies to the classpath, after that using "gs" prefix seems to be
> possible:
>
> state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints
> state.savepoints.dir: gs://<REDACTED>/flink-savepoints
>
> And yes, I noticed that retries logging too, but I'm not sure if it's
> implemented on the Flink side or the GCS connector side? Probably need to
> dive deeper into the source code. And if it's implemented on the GCS
> connector side, will Flink wait for all the retries? That's why I asked
> about the potential timeout on the Flink side.
>
> The JM log doesn't have much besides from what I already posted. It's hard
> for me to share the whole log, but the RocksDB initialization part can be
> relevant:
>
> 16:03:41.987 [cluster-io-thread-3] INFO
>  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
> configure application-defined state backend:
> RocksDBStateBackend{checkpointStreamBackend=File State Backend
> (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints:
> 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold:
> 1048576), localRocksDbDirectories=[/rocksdb],
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
> writeBatchSize=2097152}
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
> predefined options: FLASH_SSD_OPTIMIZED.
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
> application-defined options factory:
> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
> state.backend.rocksdb.block.blocksize=16 kb,
> state.backend.rocksdb.block.cache-size=64 mb}}.
> 16:03:41.988 [cluster-io-thread-3] INFO
>  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
> state backend: RocksDBStateBackend{checkpointStreamBackend=File State
> Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints:
> 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold:
> 1048576), localRocksDbDirectories=[/rocksdb],
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
> writeBatchSize=2097152}
>
> Thanks!
>
> On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <guowei....@gmail.com> wrote:
>
>> Hi, Yaroslav
>>
>> AFAIK there is no official GCS FileSystem support in FLINK.  Does the GCS
>> is implemented by yourself?
>> Would you like to share the whole log of jm?
>>
>> BTW: From the following log I think the implementation has already some
>> retry mechanism.
>> >>> Interrupted while sleeping before retry. Giving up after 1/10 retries
>> for 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>>
>> Best,
>> Guowei
>>
>>
>> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
>> yaroslav.tkache...@shopify.com> wrote:
>>
>>> Hi everyone,
>>>
>>> I'm wondering if people have experienced issues with Taskmanager failure
>>> recovery when dealing with a lot of state.
>>>
>>> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints
>>> and checkpoints. ~150 task managers with 4 slots each.
>>>
>>> When I run a pipeline without much state and kill one of the
>>> taskmanagers, it takes a few minutes to recover (I see a few restarts), but
>>> eventually when a new replacement taskmanager is registered with the
>>> jobmanager things go back to healthy.
>>>
>>> But when I run a pipeline with a lot of state (1TB+) and kill one of the
>>> taskmanagers, the pipeline never recovers, even after the replacement
>>> taskmanager has joined. It just enters an infinite loop of restarts and
>>> failures.
>>>
>>> On the jobmanager, I see an endless loop of state transitions: RUNNING
>>> -> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
>>> It stays in RUNNING for a few seconds, but then transitions into FAILED
>>> with a message like this:
>>>
>>>
>>> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED>
>>> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
>>> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
>>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>> readAddress(..) failed: Connection reset by peer (connection to '
>>> 10.30.10.53/10.30.10.53:45789')
>>> at
>>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>>> ...
>>> Caused by:
>>> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>>> readAddress(..) failed: Connection reset by peer
>>>
>>>
>>> Which, I guess, means a failed Taskmanager. And since there are not
>>> enough task slots to run it goes into this endless loop again. It's never
>>> the same Taskmanager that fails.
>>>
>>>
>>>
>>> On the Taskmanager side, things look more interesting. I see a variety
>>> of exceptions:
>>>
>>>
>>> org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7
>>> (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
>>> org.apache.flink.runtime.jobmaster.ExecutionGraphException: The
>>> execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.
>>>
>>>
>>> also
>>>
>>>
>>> WARNING: Failed read retry #1/10 for
>>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'.
>>> Sleeping...
>>> java.nio.channels.ClosedByInterruptException
>>> at
>>> java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown
>>> Source)
>>> at
>>> java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown
>>> Source)
>>> at
>>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
>>> at
>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
>>> at java.base/java.io.DataInputStream.read(Unknown Source)
>>> at
>>> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
>>> at java.base/java.io.InputStream.read(Unknown Source)
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
>>> ...
>>>
>>>
>>> and
>>>
>>>
>>> SEVERE: Interrupted while sleeping before retry. Giving up after 1/10
>>> retries for
>>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
>>> 20:52:46.894 [<REDACTED> (141/624)#7] ERROR
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
>>> Caught unexpected exception.
>>> java.nio.channels.ClosedChannelException: null
>>> at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
>>> at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
>>> at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
>>> at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
>>> at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>>>
>>>
>>> also
>>>
>>>
>>> 20:52:46.895 [<REDACTED> (141/624)#7] WARN
>>>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
>>> Exception while restoring keyed state backend for
>>> KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from
>>> alternative (1/1), will retry while more alternatives are available.
>>> org.apache.flink.runtime.state.BackendBuildingException: Caught
>>> unexpected exception.
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328)
>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>>> ...
>>>
>>>
>>> and a few of
>>>
>>>
>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to
>>> download data for state handles.
>>> at
>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92)
>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>>> ...
>>> Caused by: java.util.concurrent.ExecutionException:
>>> java.lang.RuntimeException: no bytes written
>>>
>>>
>>>
>>> Has anyone seen behaviour like this?
>>>
>>>
>>> My current theory: because it needs to download a lot of state from GCS
>>> the pipeline probably experiences some sort of GCS back-off issue (150
>>> taskmanager x 4 slots, also 4
>>> state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many
>>> read requests to the same GCS prefix? And I guess it doesn't finish in the
>>> time that's expected and randomly fails. Maybe there is some kind of
>>> timeout value I can tweak? So downloading from GCS can take time that's
>>> necessary without failing prematurely.
>>>
>>> Any help is very appreciated!
>>>
>>>
>>>
>>>

Reply via email to