State processor API is now not supporting changelog. Please see here [1]. [1] https://github.com/apache/flink/blob/69559fb5d231d704633fed807773cd1853601862/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java#L127
G On Wed, Feb 26, 2025 at 11:26 AM Sachin Mittal <sjmit...@gmail.com> wrote: > Hello Folks, > I am trying to read the flink checkpointed state and I am getting this > error: > > Caused by: java.io.IOException: Failed to restore state backend > at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build( > StreamOperatorContextBuilder.java:140) > at org.apache.flink.state.api.input.KeyedStateInputFormat.open( > KeyedStateInputFormat.java:176) > at org.apache.flink.state.api.input.KeyedStateInputFormat.open( > KeyedStateInputFormat.java:66) > at org.apache.flink.streaming.api.functions.source. > InputFormatSourceFunction.run(InputFormatSourceFunction.java:92) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource > .java:181) > at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource > .java:78) > at org.apache.flink.streaming.runtime.tasks. > SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:340) > Caused by: java.lang.Exception: Exception while creating > StreamOperatorStateContext. > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:294) > at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build( > StreamOperatorContextBuilder.java:129) > ... 6 more > Caused by: org.apache.flink.util.FlinkException: Could not restore keyed > state backend for 31 > ba21805c9537cdf994f701a2b6f2ee_31ba21805c9537cdf994f701a2b6f2ee_(1/1) from > any of the 1 provided restore options. > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:165) > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl > .keyedStatedBackend(StreamTaskStateInitializerImpl.java:399) > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl > .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:180) > ... 7 more > Caused by: java.lang.NullPointerException > at org.apache.flink.state.changelog.restore. > ChangelogBackendRestoreOperation.readBackendHandle( > ChangelogBackendRestoreOperation.java:104) > at org.apache.flink.state.changelog.restore. > ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation > .java:78) > at org.apache.flink.state.changelog.DeactivatedChangelogStateBackend > .restore(DeactivatedChangelogStateBackend.java:70) > at org.apache.flink.state.changelog.AbstractChangelogStateBackend > .createKeyedStateBackend(AbstractChangelogStateBackend.java:81) > at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl > .lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:393) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .attemptCreateAndRestore(BackendRestorerProcedure.java:173) > at org.apache.flink.streaming.api.operators.BackendRestorerProcedure > .createAndRestore(BackendRestorerProcedure.java:137) > > My code to read the state is like: > > SavepointReader savepoint = > SavepointReader.read(env, > "s3://{bucket}/flink-checkpoints/{jobId}/chk-{num}/_metadata", new > EmbeddedRocksDBStateBackend(true)); > > DataStream<State> keyedState = > savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new > StateReaderFunction(), Types.LONG, Types.POJO(State.class)); > > keyedState.print(); > > env.execute("Analysis"); > > > Any idea as to what could be going wrong. > Also note this is my checkpoint config: > state.backend.type: rocksdb > state.checkpoints.dir: s3://{bucket}/flink-checkpoints > state.backend.incremental: 'true' > state.backend.local-recovery: 'true' > state.backend.changelog.enabled: 'true' > state.backend.changelog.storage: filesystem > dstl.dfs.base-path: s3:/{bucket}/changelog > dstl.dfs.compression.enabled: 'true' > > Thanks > Sachin > > > > > > On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> The UID must match in the Flink app `events.uid("my-uid")` and in the >> reader app `forUid("my-uid")`. >> In general it's a good practice to set uid in the Flink app for each and >> every operator, otherwise Flink generates an almost random number for it. >> >> When you don't know the generated uid then no worry there are nifty >> tricks to find it out. >> >> BR, >> G >> >> >> On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com> wrote: >> >>> Hi, >>> I am working on Flink 1.19.1, so I guess I cannot use the SQL connector >>> as that's to be released for 2.1.0 >>> If I directly try to use the flink-state-processor-api, my very first >>> question is how do I know the uuid for each keyed state ? >>> Is it the step name? >>> as in >>> >>> events >>> .keyBy(new MyKeySelector()) >>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >>> .name("Proc"); >>> >>> If now I want to access keyed states for MyStatsProcessor then I need >>> to access it using: >>> >>> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new >>> ReaderFunction()); >>> >>> Is this the right way to access it ? >>> >>> Thanks >>> Sachin >>> >>> >>> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote: >>> >>>> Hi, >>>> >>>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint >>>> data[1] >>>> >>>> >>>> [1] >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data >>>> >>>> >>>> -- >>>> Best! >>>> Xuyang >>>> >>>> >>>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote: >>>> >>>> Hi, >>>> So I have a flink application which stores state (RocksDB state >>>> backend) in S3 with the following directory structure: >>>> >>>> s3://{bucket}/flink-checkpoints/ /{job-id} | + --shared/ >>>> + --taskowned/ + --chk-<num>/ >>>> >>>> >>>> I have my job pipeline defined like: >>>> >>>> final DataStream<Data> events = env.fromSource(..., "Src"); >>>> >>>> SingleOutputStreamOperator<StatsData> statsData = >>>> events >>>> .keyBy(new MyKeySelector()) >>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >>>> .name("Proc"); >>>> >>>> statsData >>>> .addSink(new MySink<>(...)) >>>> .name("Sink"); >>>> >>>> env.execute("Exec"); >>>> >>>> >>>> The MyStatsProcessor has keyed states defined as: >>>> >>>> state1 = >>>> getRuntimeContext().getState(new ValueStateDescriptor<>("state1", >>>> Types.POJO(StateOne.class))); >>>> state2 = >>>> getRuntimeContext().getState(new ValueStateDescriptor<>("state2", >>>> Types.POJO(StateTwo.class))); >>>> >>>> >>>> So my question is how can I read any checkpoint state. I see this API >>>> flink-state-processor-api. >>>> Can I use the same here, if so how do I instantiate it: >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader >>>> savepoint = SavepointReader.read(env, >>>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>); >>>> >>>> Is this the correct way to use this API for reading a checkpoint ? >>>> >>>> Please note I have also enabled: >>>> state.backend.incremental: 'true' >>>> state.backend.local-recovery: 'true' >>>> state.backend.changelog.enabled: 'true' >>>> state.backend.changelog.storage: filesystem >>>> dstl.dfs.base-path: s3://{bucket}/changelog >>>> dstl.dfs.compression.enabled: 'true' >>>> >>>> Now after say I am able to create the reader how do I inspect a >>>> particular keyed state. >>>> I see a function called readKeyedState but I am unsure as to what uuid >>>> I need to pass to read a particular state? >>>> Would something like this work: >>>> >>>> DataStream<KeyedState> keyedState = savepoint.readKeyedState( >>>> OperatorIdentifier.forUid("Proc"), new ReaderFunction()); >>>> And now in my KeyedState class, I can access state1 and state2. >>>> >>>> Would this work? >>>> >>>> Please let me know if I am on the right track or this is something not >>>> possible to read checkpointed states via any external application for >>>> debugging. >>>> >>>> Thanks >>>> Sachin >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>