Hello Folks, I am trying to read the flink checkpointed state and I am getting this error:
Caused by: java.io.IOException: Failed to restore state backend at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build( StreamOperatorContextBuilder.java:140) at org.apache.flink.state.api.input.KeyedStateInputFormat.open( KeyedStateInputFormat.java:176) at org.apache.flink.state.api.input.KeyedStateInputFormat.open( KeyedStateInputFormat.java:66) at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction .run(InputFormatSourceFunction.java:92) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource .java:181) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource .java:78) at org.apache.flink.streaming.runtime.tasks. SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:340) Caused by: java.lang.Exception: Exception while creating StreamOperatorStateContext. at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:294) at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build( StreamOperatorContextBuilder.java:129) ... 6 more Caused by: org.apache.flink.util.FlinkException: Could not restore keyed state backend for 31 ba21805c9537cdf994f701a2b6f2ee_31ba21805c9537cdf994f701a2b6f2ee_(1/1) from any of the 1 provided restore options. at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .createAndRestore(BackendRestorerProcedure.java:165) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .keyedStatedBackend(StreamTaskStateInitializerImpl.java:399) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:180) ... 7 more Caused by: java.lang.NullPointerException at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation .readBackendHandle(ChangelogBackendRestoreOperation.java:104) at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation .restore(ChangelogBackendRestoreOperation.java:78) at org.apache.flink.state.changelog.DeactivatedChangelogStateBackend .restore(DeactivatedChangelogStateBackend.java:70) at org.apache.flink.state.changelog.AbstractChangelogStateBackend .createKeyedStateBackend(AbstractChangelogStateBackend.java:81) at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl .lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:393) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .attemptCreateAndRestore(BackendRestorerProcedure.java:173) at org.apache.flink.streaming.api.operators.BackendRestorerProcedure .createAndRestore(BackendRestorerProcedure.java:137) My code to read the state is like: SavepointReader savepoint = SavepointReader.read(env, "s3://{bucket}/flink-checkpoints/{jobId}/chk-{num}/_metadata", new EmbeddedRocksDBStateBackend(true)); DataStream<State> keyedState = savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new StateReaderFunction(), Types.LONG, Types.POJO(State.class)); keyedState.print(); env.execute("Analysis"); Any idea as to what could be going wrong. Also note this is my checkpoint config: state.backend.type: rocksdb state.checkpoints.dir: s3://{bucket}/flink-checkpoints state.backend.incremental: 'true' state.backend.local-recovery: 'true' state.backend.changelog.enabled: 'true' state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3:/{bucket}/changelog dstl.dfs.compression.enabled: 'true' Thanks Sachin On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > The UID must match in the Flink app `events.uid("my-uid")` and in the > reader app `forUid("my-uid")`. > In general it's a good practice to set uid in the Flink app for each and > every operator, otherwise Flink generates an almost random number for it. > > When you don't know the generated uid then no worry there are nifty tricks > to find it out. > > BR, > G > > > On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com> wrote: > >> Hi, >> I am working on Flink 1.19.1, so I guess I cannot use the SQL connector >> as that's to be released for 2.1.0 >> If I directly try to use the flink-state-processor-api, my very first >> question is how do I know the uuid for each keyed state ? >> Is it the step name? >> as in >> >> events >> .keyBy(new MyKeySelector()) >> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >> .name("Proc"); >> >> If now I want to access keyed states for MyStatsProcessor then I need to >> access it using: >> >> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new >> ReaderFunction()); >> >> Is this the right way to access it ? >> >> Thanks >> Sachin >> >> >> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote: >> >>> Hi, >>> >>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint >>> data[1] >>> >>> >>> [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data >>> >>> >>> -- >>> Best! >>> Xuyang >>> >>> >>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote: >>> >>> Hi, >>> So I have a flink application which stores state (RocksDB state backend) >>> in S3 with the following directory structure: >>> >>> s3://{bucket}/flink-checkpoints/ /{job-id} | + --shared/ >>> + --taskowned/ + --chk-<num>/ >>> >>> >>> I have my job pipeline defined like: >>> >>> final DataStream<Data> events = env.fromSource(..., "Src"); >>> >>> SingleOutputStreamOperator<StatsData> statsData = >>> events >>> .keyBy(new MyKeySelector()) >>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >>> .name("Proc"); >>> >>> statsData >>> .addSink(new MySink<>(...)) >>> .name("Sink"); >>> >>> env.execute("Exec"); >>> >>> >>> The MyStatsProcessor has keyed states defined as: >>> >>> state1 = >>> getRuntimeContext().getState(new ValueStateDescriptor<>("state1", >>> Types.POJO(StateOne.class))); >>> state2 = >>> getRuntimeContext().getState(new ValueStateDescriptor<>("state2", >>> Types.POJO(StateTwo.class))); >>> >>> >>> So my question is how can I read any checkpoint state. I see this API >>> flink-state-processor-api. >>> Can I use the same here, if so how do I instantiate it: >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader >>> savepoint = SavepointReader.read(env, >>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>); >>> >>> Is this the correct way to use this API for reading a checkpoint ? >>> >>> Please note I have also enabled: >>> state.backend.incremental: 'true' >>> state.backend.local-recovery: 'true' >>> state.backend.changelog.enabled: 'true' >>> state.backend.changelog.storage: filesystem >>> dstl.dfs.base-path: s3://{bucket}/changelog >>> dstl.dfs.compression.enabled: 'true' >>> >>> Now after say I am able to create the reader how do I inspect a >>> particular keyed state. >>> I see a function called readKeyedState but I am unsure as to what uuid I >>> need to pass to read a particular state? >>> Would something like this work: >>> >>> DataStream<KeyedState> keyedState = savepoint.readKeyedState( >>> OperatorIdentifier.forUid("Proc"), new ReaderFunction()); >>> And now in my KeyedState class, I can access state1 and state2. >>> >>> Would this work? >>> >>> Please let me know if I am on the right track or this is something not >>> possible to read checkpointed states via any external application for >>> debugging. >>> >>> Thanks >>> Sachin >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>