Hello Folks,
I am trying to read the flink checkpointed state and I am getting this
error:

Caused by: java.io.IOException: Failed to restore state backend
at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build(
StreamOperatorContextBuilder.java:140)
at org.apache.flink.state.api.input.KeyedStateInputFormat.open(
KeyedStateInputFormat.java:176)
at org.apache.flink.state.api.input.KeyedStateInputFormat.open(
KeyedStateInputFormat.java:66)
at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction
.run(InputFormatSourceFunction.java:92)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource
.java:181)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource
.java:78)
at org.apache.flink.streaming.runtime.tasks.
SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:340)
Caused by: java.lang.Exception: Exception while creating
StreamOperatorStateContext.
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:294)
at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build(
StreamOperatorContextBuilder.java:129)
... 6 more
Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
state backend for 31
ba21805c9537cdf994f701a2b6f2ee_31ba21805c9537cdf994f701a2b6f2ee_(1/1) from
any of the 1 provided restore options.
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:165)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.keyedStatedBackend(StreamTaskStateInitializerImpl.java:399)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.streamOperatorStateContext(StreamTaskStateInitializerImpl.java:180)
... 7 more
Caused by: java.lang.NullPointerException
at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation
.readBackendHandle(ChangelogBackendRestoreOperation.java:104)
at org.apache.flink.state.changelog.restore.ChangelogBackendRestoreOperation
.restore(ChangelogBackendRestoreOperation.java:78)
at org.apache.flink.state.changelog.DeactivatedChangelogStateBackend
.restore(DeactivatedChangelogStateBackend.java:70)
at org.apache.flink.state.changelog.AbstractChangelogStateBackend
.createKeyedStateBackend(AbstractChangelogStateBackend.java:81)
at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
.lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:393)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.attemptCreateAndRestore(BackendRestorerProcedure.java:173)
at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
.createAndRestore(BackendRestorerProcedure.java:137)

My code to read the state is like:

SavepointReader savepoint =
    SavepointReader.read(env,
"s3://{bucket}/flink-checkpoints/{jobId}/chk-{num}/_metadata", new
EmbeddedRocksDBStateBackend(true));

DataStream<State> keyedState =
    savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
StateReaderFunction(), Types.LONG, Types.POJO(State.class));

keyedState.print();

env.execute("Analysis");


Any idea as to what could be going wrong.
Also note this is my checkpoint config:
state.backend.type: rocksdb
state.checkpoints.dir: s3://{bucket}/flink-checkpoints
state.backend.incremental: 'true'
state.backend.local-recovery: 'true'
state.backend.changelog.enabled: 'true'
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: s3:/{bucket}/changelog
dstl.dfs.compression.enabled: 'true'

Thanks
Sachin





On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> The UID must match in the Flink app `events.uid("my-uid")` and in the
> reader app `forUid("my-uid")`.
> In general it's a good practice to set uid in the Flink app for each and
> every operator, otherwise Flink generates an almost random number for it.
>
> When you don't know the generated uid then no worry there are nifty tricks
> to find it out.
>
> BR,
> G
>
>
> On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> Hi,
>> I am working on Flink 1.19.1, so I guess I cannot use the SQL connector
>> as that's to be released for 2.1.0
>> If I directly try to use the flink-state-processor-api, my very first
>> question is how do I know the uuid for each keyed state ?
>> Is it the step name?
>> as in
>>
>> events
>> .keyBy(new MyKeySelector())
>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>> .name("Proc");
>>
>> If now I want to access keyed states for MyStatsProcessor then I need to
>> access it using:
>>
>> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
>> ReaderFunction());
>>
>> Is this the right way to access it ?
>>
>> Thanks
>> Sachin
>>
>>
>> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote:
>>
>>> Hi,
>>>
>>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint
>>> data[1]
>>>
>>>
>>> [1]
>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data
>>>
>>>
>>> --
>>>     Best!
>>>     Xuyang
>>>
>>>
>>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote:
>>>
>>> Hi,
>>> So I have a flink application which stores state (RocksDB state backend)
>>> in S3 with the following directory structure:
>>>
>>> s3://{bucket}/flink-checkpoints/    /{job-id}        |        + --shared/   
>>>      + --taskowned/        + --chk-<num>/
>>>
>>>
>>> I have my job pipeline defined like:
>>>
>>>     final DataStream<Data> events = env.fromSource(..., "Src");
>>>
>>>     SingleOutputStreamOperator<StatsData> statsData =
>>>         events
>>>             .keyBy(new MyKeySelector())
>>>             .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>>>             .name("Proc");
>>>
>>>     statsData
>>>         .addSink(new MySink<>(...))
>>>         .name("Sink");
>>>
>>>     env.execute("Exec");
>>>
>>>
>>> The MyStatsProcessor has keyed states defined as:
>>>
>>>     state1 =
>>>         getRuntimeContext().getState(new ValueStateDescriptor<>("state1", 
>>> Types.POJO(StateOne.class)));
>>>     state2 =
>>>         getRuntimeContext().getState(new ValueStateDescriptor<>("state2", 
>>> Types.POJO(StateTwo.class)));
>>>
>>>
>>> So my question is how can I read any checkpoint state. I see this API 
>>> flink-state-processor-api.
>>> Can I use the same here, if so how do I instantiate it:
>>>
>>> StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader 
>>> savepoint = SavepointReader.read(env, 
>>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>);
>>>
>>> Is this the correct way to use this API for reading a checkpoint ?
>>>
>>> Please note I have also enabled:
>>> state.backend.incremental: 'true'
>>> state.backend.local-recovery: 'true'
>>> state.backend.changelog.enabled: 'true'
>>> state.backend.changelog.storage: filesystem
>>> dstl.dfs.base-path: s3://{bucket}/changelog
>>> dstl.dfs.compression.enabled: 'true'
>>>
>>> Now after say I am able to create the reader how do I inspect a
>>> particular keyed state.
>>> I see a function called readKeyedState but I am unsure as to what uuid I
>>> need to pass to read a particular state?
>>> Would something like this work:
>>>
>>> DataStream<KeyedState> keyedState = savepoint.readKeyedState(
>>> OperatorIdentifier.forUid("Proc"), new ReaderFunction());
>>> And now in my KeyedState class, I can access state1 and state2.
>>>
>>> Would this work?
>>>
>>> Please let me know if I am on the right track or this is something not
>>> possible to read checkpointed states via any external application for
>>> debugging.
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>

Reply via email to