Hi Dhanesh,

Thanks for the recommendation! I'll try it out.

On Wed, Apr 7, 2021 at 1:59 AM dhanesh arole <davcdhane...@gmail.com> wrote:

> Hi Yaroslav,
>
> We faced similar issues in our large stateful stream processing job. I had
> asked question
> <http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/RocksDB-StateBuilder-unexpected-exception-td42397.html>
>  about it
> on a user mailing list a few days back. Based on the reply to my question,
> we figured that this happens when the task manager has just come back
> online and is trying to rebuild / restore its state, but meanwhile another
> task manager gets restarted or killed. In this situation job manager
> cancels the job, as a result all task managers also start cancelling the
> tasks that they are running atm. As a part of cancellation flow, channel
> buffer through which flink TM writes to the disk gets closed. But there's
> already state rebuilding happening concurrently using that channelBuffer.
> This causes the channelClosed exception.
>
> As a solution to this problem, we increased *akka.ask.timeout *to 10m.
> This gives enough room to task managers to wait for rpc responses from
> other task managers during restart. As a result TM becomes more lenient in
> marking other TM as failed and cancelling the job in the first place.
>
> -
> Dhanesh Arole
>
>
>
> On Tue, Apr 6, 2021 at 7:55 PM Robert Metzger <rmetz...@apache.org> wrote:
>
>> Hey Yaroslav,
>>
>> GCS is a somewhat popular filesystem that should work fine with Flink.
>>
>> It seems that the initial scale of a bucket is 5000 read requests per
>> second (https://cloud.google.com/storage/docs/request-rate), your job
>> should be at roughly the same rate (depending on how fast your job restarts
>> in the restart loop).
>>
>> You could try to tweak the GCS configuration parameters, such as
>> increasing "fs.gs.http.read-timeout" and "fs.gs.http.max.retry". (see
>> https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md
>> for all available options)
>>
>>
>> The "ExecutionGraphException: The execution attempt
>> 6f3651a49344754a1e7d1fb20cf2cba3 was not found." error looks weird, but it
>> should not cause the restarts.
>>
>>
>> On Fri, Apr 2, 2021 at 2:15 AM Guowei Ma <guowei....@gmail.com> wrote:
>>
>>> Hi, Yaroslav
>>>
>>> AFAIK Flink does not retry if the download checkpoint from the storage
>>> fails. On the other hand the FileSystem already has this retry mechanism
>>> already. So I think there is no need for flink to retry.
>>> I am not very sure but from the log it seems that the gfs's retry is
>>> interrupted by some reason. So I think we could get more insight if we
>>> could find the first fail cause.
>>>
>>> Best,
>>> Guowei
>>>
>>>
>>> On Fri, Apr 2, 2021 at 12:07 AM Yaroslav Tkachenko <
>>> yaroslav.tkache...@shopify.com> wrote:
>>>
>>>> Hi Guowei,
>>>>
>>>> I thought Flink can support any HDFS-compatible object store like the
>>>> majority of Big Data frameworks. So we just added
>>>> "flink-shaded-hadoop-2-uber" and "gcs-connector-latest-hadoop2"
>>>> dependencies to the classpath, after that using "gs" prefix seems to be
>>>> possible:
>>>>
>>>> state.checkpoints.dir: gs://<REDACTED>/flink-checkpoints
>>>> state.savepoints.dir: gs://<REDACTED>/flink-savepoints
>>>>
>>>> And yes, I noticed that retries logging too, but I'm not sure if it's
>>>> implemented on the Flink side or the GCS connector side? Probably need to
>>>> dive deeper into the source code. And if it's implemented on the GCS
>>>> connector side, will Flink wait for all the retries? That's why I asked
>>>> about the potential timeout on the Flink side.
>>>>
>>>> The JM log doesn't have much besides from what I already posted. It's
>>>> hard for me to share the whole log, but the RocksDB initialization part can
>>>> be relevant:
>>>>
>>>> 16:03:41.987 [cluster-io-thread-3] INFO
>>>>  org.apache.flink.runtime.jobmaster.JobMaster - Using job/cluster config to
>>>> configure application-defined state backend:
>>>> RocksDBStateBackend{checkpointStreamBackend=File State Backend
>>>> (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints:
>>>> 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>>>> 1048576), localRocksDbDirectories=[/rocksdb],
>>>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
>>>> writeBatchSize=2097152}
>>>> 16:03:41.988 [cluster-io-thread-3] INFO
>>>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>>>> predefined options: FLASH_SSD_OPTIMIZED.
>>>> 16:03:41.988 [cluster-io-thread-3] INFO
>>>>  org.apache.flink.contrib.streaming.state.RocksDBStateBackend - Using
>>>> application-defined options factory:
>>>> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=4,
>>>> state.backend.rocksdb.block.blocksize=16 kb,
>>>> state.backend.rocksdb.block.cache-size=64 mb}}.
>>>> 16:03:41.988 [cluster-io-thread-3] INFO
>>>>  org.apache.flink.runtime.jobmaster.JobMaster - Using application-defined
>>>> state backend: RocksDBStateBackend{checkpointStreamBackend=File State
>>>> Backend (checkpoints: 'gs://<REDACTED>/flink-checkpoints', savepoints:
>>>> 'gs://<REDACTED>/flink-savepoints', asynchronous: TRUE, fileStateThreshold:
>>>> 1048576), localRocksDbDirectories=[/rocksdb],
>>>> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=4,
>>>> writeBatchSize=2097152}
>>>>
>>>> Thanks!
>>>>
>>>> On Thu, Apr 1, 2021 at 2:30 AM Guowei Ma <guowei....@gmail.com> wrote:
>>>>
>>>>> Hi, Yaroslav
>>>>>
>>>>> AFAIK there is no official GCS FileSystem support in FLINK.  Does the
>>>>> GCS is implemented by yourself?
>>>>> Would you like to share the whole log of jm?
>>>>>
>>>>> BTW: From the following log I think the implementation has already
>>>>> some retry mechanism.
>>>>> >>> Interrupted while sleeping before retry. Giving up after 1/10
>>>>> retries for
>>>>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d
>>>>>
>>>>> Best,
>>>>> Guowei
>>>>>
>>>>>
>>>>> On Thu, Apr 1, 2021 at 12:50 PM Yaroslav Tkachenko <
>>>>> yaroslav.tkache...@shopify.com> wrote:
>>>>>
>>>>>> Hi everyone,
>>>>>>
>>>>>> I'm wondering if people have experienced issues with Taskmanager
>>>>>> failure recovery when dealing with a lot of state.
>>>>>>
>>>>>> I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for
>>>>>> savepoints and checkpoints. ~150 task managers with 4 slots each.
>>>>>>
>>>>>> When I run a pipeline without much state and kill one of the
>>>>>> taskmanagers, it takes a few minutes to recover (I see a few restarts), 
>>>>>> but
>>>>>> eventually when a new replacement taskmanager is registered with the
>>>>>> jobmanager things go back to healthy.
>>>>>>
>>>>>> But when I run a pipeline with a lot of state (1TB+) and kill one of
>>>>>> the taskmanagers, the pipeline never recovers, even after the replacement
>>>>>> taskmanager has joined. It just enters an infinite loop of restarts and
>>>>>> failures.
>>>>>>
>>>>>> On the jobmanager, I see an endless loop of state
>>>>>> transitions: RUNNING -> CANCELING -> CANCELED -> CREATED -> SCHEDULED
>>>>>> -> DEPLOYING -> RUNNING. It stays in RUNNING for a few seconds, but then
>>>>>> transitions into FAILED with a message like this:
>>>>>>
>>>>>>
>>>>>> 22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
>>>>>>  org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED>
>>>>>> (569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
>>>>>> FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
>>>>>> org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
>>>>>> readAddress(..) failed: Connection reset by peer (connection to '
>>>>>> 10.30.10.53/10.30.10.53:45789')
>>>>>> at
>>>>>> org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
>>>>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>>>>>> ...
>>>>>> Caused by:
>>>>>> org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
>>>>>> readAddress(..) failed: Connection reset by peer
>>>>>>
>>>>>>
>>>>>> Which, I guess, means a failed Taskmanager. And since there are not
>>>>>> enough task slots to run it goes into this endless loop again. It's never
>>>>>> the same Taskmanager that fails.
>>>>>>
>>>>>>
>>>>>>
>>>>>> On the Taskmanager side, things look more interesting. I see a
>>>>>> variety of exceptions:
>>>>>>
>>>>>>
>>>>>> org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7
>>>>>> (6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
>>>>>> org.apache.flink.runtime.jobmaster.ExecutionGraphException: The
>>>>>> execution attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.
>>>>>>
>>>>>>
>>>>>> also
>>>>>>
>>>>>>
>>>>>> WARNING: Failed read retry #1/10 for
>>>>>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'.
>>>>>> Sleeping...
>>>>>> java.nio.channels.ClosedByInterruptException
>>>>>> at
>>>>>> java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown
>>>>>> Source)
>>>>>> at
>>>>>> java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown
>>>>>> Source)
>>>>>> at
>>>>>> com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
>>>>>> at
>>>>>> com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
>>>>>> at java.base/java.io.DataInputStream.read(Unknown Source)
>>>>>> at
>>>>>> org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
>>>>>> at java.base/java.io.InputStream.read(Unknown Source)
>>>>>> at
>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
>>>>>> ...
>>>>>>
>>>>>>
>>>>>> and
>>>>>>
>>>>>>
>>>>>> SEVERE: Interrupted while sleeping before retry. Giving up after 1/10
>>>>>> retries for
>>>>>> 'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
>>>>>> 20:52:46.894 [<REDACTED> (141/624)#7] ERROR
>>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder 
>>>>>> -
>>>>>> Caught unexpected exception.
>>>>>> java.nio.channels.ClosedChannelException: null
>>>>>> at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
>>>>>> at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
>>>>>> at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
>>>>>> at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
>>>>>> at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
>>>>>> at
>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
>>>>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>>>>>>
>>>>>>
>>>>>> also
>>>>>>
>>>>>>
>>>>>> 20:52:46.895 [<REDACTED> (141/624)#7] WARN
>>>>>>  org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
>>>>>> Exception while restoring keyed state backend for
>>>>>> KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from
>>>>>> alternative (1/1), will retry while more alternatives are available.
>>>>>> org.apache.flink.runtime.state.BackendBuildingException: Caught
>>>>>> unexpected exception.
>>>>>> at
>>>>>> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328)
>>>>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>>>>>> ...
>>>>>>
>>>>>>
>>>>>> and a few of
>>>>>>
>>>>>>
>>>>>> Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to
>>>>>> download data for state handles.
>>>>>> at
>>>>>> org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92)
>>>>>> ~[flink-dist_2.12-1.12.0.jar:1.12.0]
>>>>>> ...
>>>>>> Caused by: java.util.concurrent.ExecutionException:
>>>>>> java.lang.RuntimeException: no bytes written
>>>>>>
>>>>>>
>>>>>>
>>>>>> Has anyone seen behaviour like this?
>>>>>>
>>>>>>
>>>>>> My current theory: because it needs to download a lot of state from
>>>>>> GCS the pipeline probably experiences some sort of GCS back-off issue 
>>>>>> (150
>>>>>> taskmanager x 4 slots, also 4
>>>>>> state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many
>>>>>> read requests to the same GCS prefix? And I guess it doesn't finish in 
>>>>>> the
>>>>>> time that's expected and randomly fails. Maybe there is some kind of
>>>>>> timeout value I can tweak? So downloading from GCS can take time that's
>>>>>> necessary without failing prematurely.
>>>>>>
>>>>>> Any help is very appreciated!
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to