Hi Gabor, What kind of development work - is it the Disaggregated State Storage and Management FLIP ( https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=293046855) that you are referring to?
Thanks for the suggestion. I am not familiar with the state processor api but I will have a look. My state is not even that large(10-50 Gb). It seems like people are using RocksDB with Tb of state but I am a bit puzzled as to how they handle restarts from savepoint when it takes so long. Do you know? Are there some flink configurations that I might be able to try out to alleviate the slow restart that might be causing the exception? Best regards, Bjarke On Thu, Aug 8, 2024 at 11:16 AM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > Hi Bjarke, > > It's normal to see longer recovery time as the state size grows. There are > developments in progress to mitigate this problem. > > > Any ideas as to what could be causing this? > I think there is no easy way to tell it, I can give just some pointers. > > First I would take a look at the state files with the State Processor > API[1], I would be really surprised though since ADLS is consistent, > only the rename with overwrite operation is not atomic what we don't do in > Flink. > > When the checkpoint files are not corrupt then I would add some debug > printouts to the exception catch area and try to find out the > exact root cause (since several Gb state can fit into memory maybe local > repro can be possible with debugger). > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/libs/state_processor_api/ > > BR, > G > > > On Wed, Aug 7, 2024 at 9:46 PM Bjarke Tornager <bjarketorna...@gmail.com> > wrote: > >> Hi, >> >> I am doing stateful streaming (keyed coprocessing) using rocksDB. When my >> job restarts to adjust parallelism, and restores from savepoint, I quite >> often get the following error: >> >> 2024-08-07 19:43:09 >> java.lang.Exception: Exception while creating StreamOperatorStateContext. >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.streamOperatorStateContext( >> StreamTaskStateInitializerImpl.java:330) >> at org.apache.flink.streaming.api.operators.AbstractStreamOperator >> .initializeState(AbstractStreamOperator.java:275) >> at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain >> .initializeStateAndOpenOperators(RegularOperatorChain.java:106) >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .restoreStateAndGates(StreamTask.java:858) >> at org.apache.flink.streaming.runtime.tasks.StreamTask >> .lambda$restoreInternal$5(StreamTask.java:812) >> at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1 >> .call(StreamTaskActionExecutor.java:55) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal( >> StreamTask.java:812) >> at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask >> .java:771) >> at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring( >> Task.java:970) >> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java: >> 939) >> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763) >> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575) >> at java.base/java.lang.Thread.run(Unknown Source) >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed >> state backend for >> KeyedCoProcessOperator_30465cf1c06f2daf053ed603d38be4b6_(1/2) from any of >> the 1 provided restore options. >> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >> .createAndRestore(BackendRestorerProcedure.java:165) >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.keyedStatedBackend( >> StreamTaskStateInitializerImpl.java:457) >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.streamOperatorStateContext( >> StreamTaskStateInitializerImpl.java:203) >> ... 12 more >> Caused by: org.apache.flink.runtime.state.BackendBuildingException: >> Caught unexpected exception. >> at org.apache.flink.contrib.streaming.state. >> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >> .java:459) >> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend >> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:533) >> at org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend >> .createKeyedStateBackend(EmbeddedRocksDBStateBackend.java:96) >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3( >> StreamTaskStateInitializerImpl.java:446) >> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >> .attemptCreateAndRestore(BackendRestorerProcedure.java:173) >> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >> .createAndRestore(BackendRestorerProcedure.java:137) >> ... 14 more >> Caused by: java.io.EOFException >> at java.base/java.io.DataInputStream.readFully(Unknown Source) >> at java.base/java.io.DataInputStream.readFully(Unknown Source) >> at org.apache.flink.api.common.typeutils.base.array. >> BytePrimitiveArraySerializer.deserialize(BytePrimitiveArraySerializer >> .java:82) >> at org.apache.flink.runtime.state.restore. >> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >> FullSnapshotRestoreOperation.java:298) >> at org.apache.flink.runtime.state.restore. >> FullSnapshotRestoreOperation$KeyGroupEntriesIterator.next( >> FullSnapshotRestoreOperation.java:273) >> at org.apache.flink.contrib.streaming.state.restore. >> RocksDBFullRestoreOperation.restoreKVStateData( >> RocksDBFullRestoreOperation.java:148) >> at org.apache.flink.contrib.streaming.state.restore. >> RocksDBFullRestoreOperation.applyRestoreResult( >> RocksDBFullRestoreOperation.java:128) >> at org.apache.flink.contrib.streaming.state.restore. >> RocksDBFullRestoreOperation.restore(RocksDBFullRestoreOperation.java:102) >> at org.apache.flink.contrib.streaming.state. >> RocksDBKeyedStateBackendBuilder.build(RocksDBKeyedStateBackendBuilder >> .java:376) >> ... 19 more >> >> However, most often the job is able to start up after several retries. It >> seems to happen more often with higher parallelism (and higher cpu) and as >> such with more pods. >> Additionally, as the state grows into the Gb size it takes longer for the >> job to start up and as such my job has longer downtime due to the many >> retries before it successfully restores state. >> >> Any ideas as to what could be causing this? >> >> I am using Apache Flink Operator 1.8, Flink 1.20 using the official >> Apache Flink image, Java 17, and Adls Gen2 for durable storage. >> >> Best regards, >> Bjarke >> >