Hi Gabor,

What kind of development work - is it the Disaggregated State Storage and
Management FLIP (
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855)
that you are referring to?

Thanks for the suggestion. I am not familiar with the state processor api
but I will have a look.

My state is not even that large(10-50 Gb). It seems like people are using
RocksDB with Tb of state but I am a bit puzzled as to how they handle
restarts from savepoint when it takes so long. Do you know?

Are there some flink configurations that I might be able to try out to
alleviate the slow restart that might be causing the exception?

Best regards,
Bjarke

On Thu, Aug 8, 2024 at 11:16 AM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> Hi Bjarke,
>
> It's normal to see longer recovery time as the state size grows. There are
> developments in progress to mitigate this problem.
>
> > Any ideas as to what could be causing this?
> I think there is no easy way to tell it, I can give just some pointers.
>
> First I would take a look at the state files with the State Processor
> API[1], I would be really surprised though since ADLS is consistent,
> only the rename with overwrite operation is not atomic what we don't do in
> Flink.
>
> When the checkpoint files are not corrupt then I would add some debug
> printouts to the exception catch area and try to find out the
> exact root cause (since several Gb state can fit into memory maybe local
> repro can be possible with debugger).
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/
>
> BR,
> G
>
>
> On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager <bjarketorna...@gmail.com>
> wrote:
>
>> Hi,
>>
>> I am doing stateful streaming (keyed coprocessing) using rocksDB. When my
>> job restarts to adjust parallelism, and restores from savepoint, I quite
>> often get the following error:
>>
>> 2024-08-07 19:43:09
>> java.lang.Exception: Exception while creating StreamOperatorStateContext.
>> at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>> StreamTaskStateInitializerImpl.java:330)
>> at org.apache.flink.streaming.api.operators.AbstractStreamOperator
>> .initializeState(AbstractStreamOperator.java:275)
>> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain
>> .initializeStateAndOpenOperators(RegularOperatorChain.java:106)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .restoreStateAndGates(StreamTask.java:858)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask
>> .lambda$restoreInternal$5(StreamTask.java:812)
>> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1
>> .call(StreamTaskActionExecutor.java:55)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(
>> StreamTask.java:812)
>> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask
>> .java:771)
>> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(
>> Task.java:970)
>> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:
>> 939)
>> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
>> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
>> at java.base/java.lang.Thread.run(Unknown Source)
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> state backend for
>> KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1/2) from any of
>> the 1 provided restore options.
>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>> .createAndRestore(BackendRestorerProcedure.java:165)
>> at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.keyedStatedBackend(
>> StreamTaskStateInitializerImpl.java:457)
>> at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>> StreamTaskStateInitializerImpl.java:203)
>> ... 12 more
>> Caused by: org.apache.flink.runtime.state.BackendBuildingException:
>> Caught unexpected exception.
>> at org.apache.flink.contrib.streaming.state.
>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
>> .java:459)
>> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
>> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:533)
>> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend
>> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:96)
>> at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(
>> StreamTaskStateInitializerImpl.java:446)
>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>> .createAndRestore(BackendRestorerProcedure.java:137)
>> ... 14 more
>> Caused by: java.io.EOFException
>> at java.base/java.io.DataInputStream.readFully(Unknown Source)
>> at java.base/java.io.DataInputStream.readFully(Unknown Source)
>> at org.apache.flink.api.common.typeutils.base.array.
>> BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer
>> .java:82)
>> at org.apache.flink.runtime.state.restore.
>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next(
>> FullSnapshotRestoreOperation.java:298)
>> at org.apache.flink.runtime.state.restore.
>> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next(
>> FullSnapshotRestoreOperation.java:273)
>> at org.apache.flink.contrib.streaming.state.restore.
>> RocksDBFullRestoreOperation.restoreKVStateData(
>> RocksDBFullRestoreOperation.java:148)
>> at org.apache.flink.contrib.streaming.state.restore.
>> RocksDBFullRestoreOperation.applyRestoreResult(
>> RocksDBFullRestoreOperation.java:128)
>> at org.apache.flink.contrib.streaming.state.restore.
>> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:102)
>> at org.apache.flink.contrib.streaming.state.
>> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder
>> .java:376)
>> ... 19 more
>>
>> However, most often the job is able to start up after several retries. It
>> seems to happen more often with higher parallelism (and higher cpu) and as
>> such with more pods.
>> Additionally, as the state grows into the Gb size it takes longer for the
>> job to start up and as such my job has longer downtime due to the many
>> retries before it successfully restores state.
>>
>> Any ideas as to what could be causing this?
>>
>> I am using Apache Flink Operator 1.8, Flink 1.20 using the official
>> Apache Flink image, Java 17, and Adls Gen2 for durable storage.
>>
>> Best regards,
>> Bjarke
>>
>

Reply via email to