Hi, Looks like this class is very minimally implemented. What exactly does the State Processor API support ? Looking at that class it seems to only support prioritizedOperatorSubtaskState.
Are there any plans to support changelogs in future. In view of this, what are my options? I wanted to debug the keyed state one of my operators writes to, to understand if there is a state leak and why this state becomes very huge. Thanks Sachin On Wed, Feb 26, 2025 at 5:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > State processor API is now not supporting changelog. Please see here [1]. > > [1] > https://github.com/apache/flink/blob/69559fb5d231d704633fed807773cd1853601862/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java#L127 > > G > > > On Wed, Feb 26, 2025 at 11:26 AM Sachin Mittal <sjmit...@gmail.com> wrote: > >> Hello Folks, >> I am trying to read the flink checkpointed state and I am getting this >> error: >> >> Caused by: java.io.IOException: Failed to restore state backend >> at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build( >> StreamOperatorContextBuilder.java:140) >> at org.apache.flink.state.api.input.KeyedStateInputFormat.open( >> KeyedStateInputFormat.java:176) >> at org.apache.flink.state.api.input.KeyedStateInputFormat.open( >> KeyedStateInputFormat.java:66) >> at org.apache.flink.streaming.api.functions.source. >> InputFormatSourceFunction.run(InputFormatSourceFunction.java:92) >> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource >> .java:181) >> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource >> .java:78) >> at org.apache.flink.streaming.runtime.tasks. >> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:340 >> ) >> Caused by: java.lang.Exception: Exception while creating >> StreamOperatorStateContext. >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.streamOperatorStateContext( >> StreamTaskStateInitializerImpl.java:294) >> at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build( >> StreamOperatorContextBuilder.java:129) >> ... 6 more >> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed >> state backend for 31 >> ba21805c9537cdf994f701a2b6f2ee_31ba21805c9537cdf994f701a2b6f2ee_(1/1) >> from any of the 1 provided restore options. >> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >> .createAndRestore(BackendRestorerProcedure.java:165) >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.keyedStatedBackend( >> StreamTaskStateInitializerImpl.java:399) >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.streamOperatorStateContext( >> StreamTaskStateInitializerImpl.java:180) >> ... 7 more >> Caused by: java.lang.NullPointerException >> at org.apache.flink.state.changelog.restore. >> ChangelogBackendRestoreOperation.readBackendHandle( >> ChangelogBackendRestoreOperation.java:104) >> at org.apache.flink.state.changelog.restore. >> ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation >> .java:78) >> at org.apache.flink.state.changelog.DeactivatedChangelogStateBackend >> .restore(DeactivatedChangelogStateBackend.java:70) >> at org.apache.flink.state.changelog.AbstractChangelogStateBackend >> .createKeyedStateBackend(AbstractChangelogStateBackend.java:81) >> at org.apache.flink.streaming.api.operators. >> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3( >> StreamTaskStateInitializerImpl.java:393) >> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >> .attemptCreateAndRestore(BackendRestorerProcedure.java:173) >> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >> .createAndRestore(BackendRestorerProcedure.java:137) >> >> My code to read the state is like: >> >> SavepointReader savepoint = >> SavepointReader.read(env, >> "s3://{bucket}/flink-checkpoints/{jobId}/chk-{num}/_metadata", new >> EmbeddedRocksDBStateBackend(true)); >> >> DataStream<State> keyedState = >> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new >> StateReaderFunction(), Types.LONG, Types.POJO(State.class)); >> >> keyedState.print(); >> >> env.execute("Analysis"); >> >> >> Any idea as to what could be going wrong. >> Also note this is my checkpoint config: >> state.backend.type: rocksdb >> state.checkpoints.dir: s3://{bucket}/flink-checkpoints >> state.backend.incremental: 'true' >> state.backend.local-recovery: 'true' >> state.backend.changelog.enabled: 'true' >> state.backend.changelog.storage: filesystem >> dstl.dfs.base-path: s3:/{bucket}/changelog >> dstl.dfs.compression.enabled: 'true' >> >> Thanks >> Sachin >> >> >> >> >> >> On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >> wrote: >> >>> The UID must match in the Flink app `events.uid("my-uid")` and in the >>> reader app `forUid("my-uid")`. >>> In general it's a good practice to set uid in the Flink app for each and >>> every operator, otherwise Flink generates an almost random number for it. >>> >>> When you don't know the generated uid then no worry there are nifty >>> tricks to find it out. >>> >>> BR, >>> G >>> >>> >>> On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> I am working on Flink 1.19.1, so I guess I cannot use the SQL connector >>>> as that's to be released for 2.1.0 >>>> If I directly try to use the flink-state-processor-api, my very first >>>> question is how do I know the uuid for each keyed state ? >>>> Is it the step name? >>>> as in >>>> >>>> events >>>> .keyBy(new MyKeySelector()) >>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >>>> .name("Proc"); >>>> >>>> If now I want to access keyed states for MyStatsProcessor then I need >>>> to access it using: >>>> >>>> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new >>>> ReaderFunction()); >>>> >>>> Is this the right way to access it ? >>>> >>>> Thanks >>>> Sachin >>>> >>>> >>>> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote: >>>> >>>>> Hi, >>>>> >>>>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint >>>>> data[1] >>>>> >>>>> >>>>> [1] >>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data >>>>> >>>>> >>>>> -- >>>>> Best! >>>>> Xuyang >>>>> >>>>> >>>>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote: >>>>> >>>>> Hi, >>>>> So I have a flink application which stores state (RocksDB state >>>>> backend) in S3 with the following directory structure: >>>>> >>>>> s3://{bucket}/flink-checkpoints/ /{job-id} | + --shared/ >>>>> + --taskowned/ + --chk-<num>/ >>>>> >>>>> >>>>> I have my job pipeline defined like: >>>>> >>>>> final DataStream<Data> events = env.fromSource(..., "Src"); >>>>> >>>>> SingleOutputStreamOperator<StatsData> statsData = >>>>> events >>>>> .keyBy(new MyKeySelector()) >>>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >>>>> .name("Proc"); >>>>> >>>>> statsData >>>>> .addSink(new MySink<>(...)) >>>>> .name("Sink"); >>>>> >>>>> env.execute("Exec"); >>>>> >>>>> >>>>> The MyStatsProcessor has keyed states defined as: >>>>> >>>>> state1 = >>>>> getRuntimeContext().getState(new ValueStateDescriptor<>("state1", >>>>> Types.POJO(StateOne.class))); >>>>> state2 = >>>>> getRuntimeContext().getState(new ValueStateDescriptor<>("state2", >>>>> Types.POJO(StateTwo.class))); >>>>> >>>>> >>>>> So my question is how can I read any checkpoint state. I see this API >>>>> flink-state-processor-api. >>>>> Can I use the same here, if so how do I instantiate it: >>>>> >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader >>>>> savepoint = SavepointReader.read(env, >>>>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>); >>>>> >>>>> Is this the correct way to use this API for reading a checkpoint ? >>>>> >>>>> Please note I have also enabled: >>>>> state.backend.incremental: 'true' >>>>> state.backend.local-recovery: 'true' >>>>> state.backend.changelog.enabled: 'true' >>>>> state.backend.changelog.storage: filesystem >>>>> dstl.dfs.base-path: s3://{bucket}/changelog >>>>> dstl.dfs.compression.enabled: 'true' >>>>> >>>>> Now after say I am able to create the reader how do I inspect a >>>>> particular keyed state. >>>>> I see a function called readKeyedState but I am unsure as to what uuid >>>>> I need to pass to read a particular state? >>>>> Would something like this work: >>>>> >>>>> DataStream<KeyedState> keyedState = savepoint.readKeyedState( >>>>> OperatorIdentifier.forUid("Proc"), new ReaderFunction()); >>>>> And now in my KeyedState class, I can access state1 and state2. >>>>> >>>>> Would this work? >>>>> >>>>> Please let me know if I am on the right track or this is something not >>>>> possible to read checkpointed states via any external application for >>>>> debugging. >>>>> >>>>> Thanks >>>>> Sachin >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>>