State processor API is now not supporting changelog. Please see here [1].

[1]
https://github.com/apache/flink/blob/69559fb5d231d704633fed807773cd1853601862/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java#L127

G


On Wed, Feb 26, 2025 at 11:26 AM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hello Folks,
> I am trying to read the flink checkpointed state and I am getting this
> error:
>
> Caused by: java.io.IOException: Failed to restore state backend
> at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build(
> StreamOperatorContextBuilder.java:140)
> at org.apache.flink.state.api.input.KeyedStateInputFormat.open(
> KeyedStateInputFormat.java:176)
> at org.apache.flink.state.api.input.KeyedStateInputFormat.open(
> KeyedStateInputFormat.java:66)
> at org.apache.flink.streaming.api.functions.source.
> InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource
> .java:181)
> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource
> .java:78)
> at org.apache.flink.streaming.runtime.tasks.
> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:340)
> Caused by: java.lang.Exception: Exception while creating
> StreamOperatorStateContext.
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
> .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:294)
> at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build(
> StreamOperatorContextBuilder.java:129)
> ... 6 more
> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
> state backend for 31
> ba21805c9537cdf994f701a2b6f2ee_31ba21805c9537cdf994f701a2b6f2ee_(1/1) from
> any of the 1 provided restore options.
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:165)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
> .keyedStatedBackend(StreamTaskStateInitializerImpl.java:399)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
> .streamOperatorStateContext(StreamTaskStateInitializerImpl.java:180)
> ... 7 more
> Caused by: java.lang.NullPointerException
> at org.apache.flink.state.changelog.restore.
> ChangelogBackendRestoreOperation.readBackendHandle(
> ChangelogBackendRestoreOperation.java:104)
> at org.apache.flink.state.changelog.restore.
> ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation
> .java:78)
> at org.apache.flink.state.changelog.DeactivatedChangelogStateBackend
> .restore(DeactivatedChangelogStateBackend.java:70)
> at org.apache.flink.state.changelog.AbstractChangelogStateBackend
> .createKeyedStateBackend(AbstractChangelogStateBackend.java:81)
> at org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl
> .lambda$keyedStatedBackend$3(StreamTaskStateInitializerImpl.java:393)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .attemptCreateAndRestore(BackendRestorerProcedure.java:173)
> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
> .createAndRestore(BackendRestorerProcedure.java:137)
>
> My code to read the state is like:
>
> SavepointReader savepoint =
>     SavepointReader.read(env, 
> "s3://{bucket}/flink-checkpoints/{jobId}/chk-{num}/_metadata", new 
> EmbeddedRocksDBStateBackend(true));
>
> DataStream<State> keyedState =
>     savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new 
> StateReaderFunction(), Types.LONG, Types.POJO(State.class));
>
> keyedState.print();
>
> env.execute("Analysis");
>
>
> Any idea as to what could be going wrong.
> Also note this is my checkpoint config:
> state.backend.type: rocksdb
> state.checkpoints.dir: s3://{bucket}/flink-checkpoints
> state.backend.incremental: 'true'
> state.backend.local-recovery: 'true'
> state.backend.changelog.enabled: 'true'
> state.backend.changelog.storage: filesystem
> dstl.dfs.base-path: s3:/{bucket}/changelog
> dstl.dfs.compression.enabled: 'true'
>
> Thanks
> Sachin
>
>
>
>
>
> On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> The UID must match in the Flink app `events.uid("my-uid")` and in the
>> reader app `forUid("my-uid")`.
>> In general it's a good practice to set uid in the Flink app for each and
>> every operator, otherwise Flink generates an almost random number for it.
>>
>> When you don't know the generated uid then no worry there are nifty
>> tricks to find it out.
>>
>> BR,
>> G
>>
>>
>> On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com> wrote:
>>
>>> Hi,
>>> I am working on Flink 1.19.1, so I guess I cannot use the SQL connector
>>> as that's to be released for 2.1.0
>>> If I directly try to use the flink-state-processor-api, my very first
>>> question is how do I know the uuid for each keyed state ?
>>> Is it the step name?
>>> as in
>>>
>>> events
>>> .keyBy(new MyKeySelector())
>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>>> .name("Proc");
>>>
>>> If now I want to access keyed states for MyStatsProcessor then I need
>>> to access it using:
>>>
>>> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
>>> ReaderFunction());
>>>
>>> Is this the right way to access it ?
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint
>>>> data[1]
>>>>
>>>>
>>>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data
>>>>
>>>>
>>>> --
>>>>     Best!
>>>>     Xuyang
>>>>
>>>>
>>>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>> So I have a flink application which stores state (RocksDB state
>>>> backend) in S3 with the following directory structure:
>>>>
>>>> s3://{bucket}/flink-checkpoints/    /{job-id}        |        + --shared/  
>>>>       + --taskowned/        + --chk-<num>/
>>>>
>>>>
>>>> I have my job pipeline defined like:
>>>>
>>>>     final DataStream<Data> events = env.fromSource(..., "Src");
>>>>
>>>>     SingleOutputStreamOperator<StatsData> statsData =
>>>>         events
>>>>             .keyBy(new MyKeySelector())
>>>>             .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>>>>             .name("Proc");
>>>>
>>>>     statsData
>>>>         .addSink(new MySink<>(...))
>>>>         .name("Sink");
>>>>
>>>>     env.execute("Exec");
>>>>
>>>>
>>>> The MyStatsProcessor has keyed states defined as:
>>>>
>>>>     state1 =
>>>>         getRuntimeContext().getState(new ValueStateDescriptor<>("state1", 
>>>> Types.POJO(StateOne.class)));
>>>>     state2 =
>>>>         getRuntimeContext().getState(new ValueStateDescriptor<>("state2", 
>>>> Types.POJO(StateTwo.class)));
>>>>
>>>>
>>>> So my question is how can I read any checkpoint state. I see this API 
>>>> flink-state-processor-api.
>>>> Can I use the same here, if so how do I instantiate it:
>>>>
>>>> StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader 
>>>> savepoint = SavepointReader.read(env, 
>>>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>);
>>>>
>>>> Is this the correct way to use this API for reading a checkpoint ?
>>>>
>>>> Please note I have also enabled:
>>>> state.backend.incremental: 'true'
>>>> state.backend.local-recovery: 'true'
>>>> state.backend.changelog.enabled: 'true'
>>>> state.backend.changelog.storage: filesystem
>>>> dstl.dfs.base-path: s3://{bucket}/changelog
>>>> dstl.dfs.compression.enabled: 'true'
>>>>
>>>> Now after say I am able to create the reader how do I inspect a
>>>> particular keyed state.
>>>> I see a function called readKeyedState but I am unsure as to what uuid
>>>> I need to pass to read a particular state?
>>>> Would something like this work:
>>>>
>>>> DataStream<KeyedState> keyedState = savepoint.readKeyedState(
>>>> OperatorIdentifier.forUid("Proc"), new ReaderFunction());
>>>> And now in my KeyedState class, I can access state1 and state2.
>>>>
>>>> Would this work?
>>>>
>>>> Please let me know if I am on the right track or this is something not
>>>> possible to read checkpointed states via any external application for
>>>> debugging.
>>>>
>>>> Thanks
>>>> Sachin
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to