Hi everyone,

I'm wondering if people have experienced issues with Taskmanager failure
recovery when dealing with a lot of state.

I'm using 1.12.0 on Kubernetes, RocksDB backend with GCS for savepoints and
checkpoints. ~150 task managers with 4 slots each.

When I run a pipeline without much state and kill one of the
taskmanagers, it takes a few minutes to recover (I see a few restarts), but
eventually when a new replacement taskmanager is registered with the
jobmanager things go back to healthy.

But when I run a pipeline with a lot of state (1TB+) and kill one of the
taskmanagers, the pipeline never recovers, even after the replacement
taskmanager has joined. It just enters an infinite loop of restarts and
failures.

On the jobmanager, I see an endless loop of state transitions: RUNNING
-> CANCELING -> CANCELED -> CREATED -> SCHEDULED -> DEPLOYING -> RUNNING.
It stays in RUNNING for a few seconds, but then transitions into FAILED
with a message like this:


22:28:07.338 [flink-akka.actor.default-dispatcher-239] INFO
 org.apache.flink.runtime.executiongraph.ExecutionGraph - <REDACTED>
(569/624) (11cb45392108bb07d65fdd0fdc6b6741) switched from RUNNING to
FAILED on 10.30.10.212:6122-ac6bba @ 10.30.10.212 (dataPort=43357).
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException:
readAddress(..) failed: Connection reset by peer (connection to '
10.30.10.53/10.30.10.53:45789')
at
org.apache.flink.runtime.io.network.netty.CreditBasedPartitionRequestClientHandler.exceptionCaught(CreditBasedPartitionRequestClientHandler.java:173)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by:
org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors$NativeIoException:
readAddress(..) failed: Connection reset by peer


Which, I guess, means a failed Taskmanager. And since there are not enough
task slots to run it goes into this endless loop again. It's never the same
Taskmanager that fails.



On the Taskmanager side, things look more interesting. I see a variety of
exceptions:


org.apache.flink.runtime.taskmanager.Task - <REDACTED> (141/624)#7
(6f3651a49344754a1e7d1fb20cf2cba3) switched from RUNNING to FAILED.
org.apache.flink.runtime.jobmaster.ExecutionGraphException: The execution
attempt 6f3651a49344754a1e7d1fb20cf2cba3 was not found.


also


WARNING: Failed read retry #1/10 for
'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'.
Sleeping...
java.nio.channels.ClosedByInterruptException
at java.base/java.nio.channels.spi.AbstractInterruptibleChannel.end(Unknown
Source)
at
java.base/java.nio.channels.Channels$ReadableByteChannelImpl.read(Unknown
Source)
at
com.google.cloud.hadoop.repackaged.gcs.com.google.cloud.hadoop.gcsio.GoogleCloudStorageReadChannel.read(GoogleCloudStorageReadChannel.java:313)
at
com.google.cloud.hadoop.fs.gcs.GoogleHadoopFSInputStream.read(GoogleHadoopFSInputStream.java:118)
at java.base/java.io.DataInputStream.read(Unknown Source)
at
org.apache.flink.runtime.fs.hdfs.HadoopDataInputStream.read(HadoopDataInputStream.java:94)
at java.base/java.io.InputStream.read(Unknown Source)
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:135)
...


and


SEVERE: Interrupted while sleeping before retry. Giving up after 1/10
retries for
'gs://<REDACTED>/flink-checkpoints/150a406a50d20e1ee77422d25ef28d52/shared/3e64cd74-4280-4c31-916a-fe981bf2306c'
20:52:46.894 [<REDACTED> (141/624)#7] ERROR
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder -
Caught unexpected exception.
java.nio.channels.ClosedChannelException: null
at sun.nio.ch.FileChannelImpl.ensureOpen(Unknown Source) ~[?:?]
at sun.nio.ch.FileChannelImpl.write(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFullyImpl(Unknown Source) ~[?:?]
at java.nio.channels.Channels.writeFully(Unknown Source) ~[?:?]
at java.nio.channels.Channels$1.write(Unknown Source) ~[?:?]
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForStateHandle(RocksDBStateDownloader.java:140)
~[flink-dist_2.12-1.12.0.jar:1.12.0]


also


20:52:46.895 [<REDACTED> (141/624)#7] WARN
 org.apache.flink.streaming.api.operators.BackendRestorerProcedure -
Exception while restoring keyed state backend for
KeyedProcessOperator_ff97494a101b44a4b7a2913028a50243_(141/624) from
alternative (1/1), will retry while more alternatives are available.
org.apache.flink.runtime.state.BackendBuildingException: Caught unexpected
exception.
at
org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder.java:328)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...


and a few of


Caused by: org.apache.flink.util.FlinkRuntimeException: Failed to download
data for state handles.
at
org.apache.flink.contrib.streaming.state.RocksDBStateDownloader.downloadDataForAllStateHandles(RocksDBStateDownloader.java:92)
~[flink-dist_2.12-1.12.0.jar:1.12.0]
...
Caused by: java.util.concurrent.ExecutionException:
java.lang.RuntimeException: no bytes written



Has anyone seen behaviour like this?


My current theory: because it needs to download a lot of state from GCS the
pipeline probably experiences some sort of GCS back-off issue (150
taskmanager x 4 slots, also 4
state.backend.rocksdb.checkpoint.transfer.thread.num), probably too many
read requests to the same GCS prefix? And I guess it doesn't finish in the
time that's expected and randomly fails. Maybe there is some kind of
timeout value I can tweak? So downloading from GCS can take time that's
necessary without failing prematurely.

Any help is very appreciated!

Reply via email to