Hi,
Looks like this class is very minimally implemented. What exactly does the
State Processor API support ?
Looking at that class it seems to only
support prioritizedOperatorSubtaskState.

Are there any plans to support changelogs in future.

In view of this, what are my options?

I wanted to debug the keyed state one of my operators writes to, to
understand if there is a state leak and why this state becomes very huge.

Thanks
Sachin


On Wed, Feb 26, 2025 at 5:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
wrote:

> State processor API is now not supporting changelog. Please see here [1].
>
> [1]
> https://github.com/apache/flink/blob/69559fb5d231d704633fed807773cd1853601862/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java#L127
>
> G
>
>
> On Wed, Feb 26, 2025 at 11:26 AM Sachin Mittal <sjmit...@gmail.com> wrote:
>
>> Hello Folks,
>> I am trying to read the flink checkpointed state and I am getting this
>> error:
>>
>> Caused by: java.io.IOException: Failed to restore state backend
>> at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build(
>> StreamOperatorContextBuilder.java:140)
>> at org.apache.flink.state.api.input.KeyedStateInputFormat.open(
>> KeyedStateInputFormat.java:176)
>> at org.apache.flink.state.api.input.KeyedStateInputFormat.open(
>> KeyedStateInputFormat.java:66)
>> at org.apache.flink.streaming.api.functions.source.
>> InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource
>> .java:181)
>> at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource
>> .java:78)
>> at org.apache.flink.streaming.runtime.tasks.
>> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:340
>> )
>> Caused by: java.lang.Exception: Exception while creating
>> StreamOperatorStateContext.
>> at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>> StreamTaskStateInitializerImpl.java:294)
>> at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build(
>> StreamOperatorContextBuilder.java:129)
>> ... 6 more
>> Caused by: org.apache.flink.util.FlinkException: Could not restore keyed
>> state backend for 31
>> ba21805c9537cdf994f701a2b6f2ee_31ba21805c9537cdf994f701a2b6f2ee_(1/1)
>> from any of the 1 provided restore options.
>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>> .createAndRestore(BackendRestorerProcedure.java:165)
>> at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.keyedStatedBackend(
>> StreamTaskStateInitializerImpl.java:399)
>> at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>> StreamTaskStateInitializerImpl.java:180)
>> ... 7 more
>> Caused by: java.lang.NullPointerException
>> at org.apache.flink.state.changelog.restore.
>> ChangelogBackendRestoreOperation.readBackendHandle(
>> ChangelogBackendRestoreOperation.java:104)
>> at org.apache.flink.state.changelog.restore.
>> ChangelogBackendRestoreOperation.restore(ChangelogBackendRestoreOperation
>> .java:78)
>> at org.apache.flink.state.changelog.DeactivatedChangelogStateBackend
>> .restore(DeactivatedChangelogStateBackend.java:70)
>> at org.apache.flink.state.changelog.AbstractChangelogStateBackend
>> .createKeyedStateBackend(AbstractChangelogStateBackend.java:81)
>> at org.apache.flink.streaming.api.operators.
>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(
>> StreamTaskStateInitializerImpl.java:393)
>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>> .createAndRestore(BackendRestorerProcedure.java:137)
>>
>> My code to read the state is like:
>>
>> SavepointReader savepoint =
>>     SavepointReader.read(env, 
>> "s3://{bucket}/flink-checkpoints/{jobId}/chk-{num}/_metadata", new 
>> EmbeddedRocksDBStateBackend(true));
>>
>> DataStream<State> keyedState =
>>     savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new 
>> StateReaderFunction(), Types.LONG, Types.POJO(State.class));
>>
>> keyedState.print();
>>
>> env.execute("Analysis");
>>
>>
>> Any idea as to what could be going wrong.
>> Also note this is my checkpoint config:
>> state.backend.type: rocksdb
>> state.checkpoints.dir: s3://{bucket}/flink-checkpoints
>> state.backend.incremental: 'true'
>> state.backend.local-recovery: 'true'
>> state.backend.changelog.enabled: 'true'
>> state.backend.changelog.storage: filesystem
>> dstl.dfs.base-path: s3:/{bucket}/changelog
>> dstl.dfs.compression.enabled: 'true'
>>
>> Thanks
>> Sachin
>>
>>
>>
>>
>>
>> On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>> wrote:
>>
>>> The UID must match in the Flink app `events.uid("my-uid")` and in the
>>> reader app `forUid("my-uid")`.
>>> In general it's a good practice to set uid in the Flink app for each and
>>> every operator, otherwise Flink generates an almost random number for it.
>>>
>>> When you don't know the generated uid then no worry there are nifty
>>> tricks to find it out.
>>>
>>> BR,
>>> G
>>>
>>>
>>> On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com>
>>> wrote:
>>>
>>>> Hi,
>>>> I am working on Flink 1.19.1, so I guess I cannot use the SQL connector
>>>> as that's to be released for 2.1.0
>>>> If I directly try to use the flink-state-processor-api, my very first
>>>> question is how do I know the uuid for each keyed state ?
>>>> Is it the step name?
>>>> as in
>>>>
>>>> events
>>>> .keyBy(new MyKeySelector())
>>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>>>> .name("Proc");
>>>>
>>>> If now I want to access keyed states for MyStatsProcessor then I need
>>>> to access it using:
>>>>
>>>> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
>>>> ReaderFunction());
>>>>
>>>> Is this the right way to access it ?
>>>>
>>>> Thanks
>>>> Sachin
>>>>
>>>>
>>>> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint
>>>>> data[1]
>>>>>
>>>>>
>>>>> [1]
>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data
>>>>>
>>>>>
>>>>> --
>>>>>     Best!
>>>>>     Xuyang
>>>>>
>>>>>
>>>>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote:
>>>>>
>>>>> Hi,
>>>>> So I have a flink application which stores state (RocksDB state
>>>>> backend) in S3 with the following directory structure:
>>>>>
>>>>> s3://{bucket}/flink-checkpoints/    /{job-id}        |        + --shared/ 
>>>>>        + --taskowned/        + --chk-<num>/
>>>>>
>>>>>
>>>>> I have my job pipeline defined like:
>>>>>
>>>>>     final DataStream<Data> events = env.fromSource(..., "Src");
>>>>>
>>>>>     SingleOutputStreamOperator<StatsData> statsData =
>>>>>         events
>>>>>             .keyBy(new MyKeySelector())
>>>>>             .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>>>>>             .name("Proc");
>>>>>
>>>>>     statsData
>>>>>         .addSink(new MySink<>(...))
>>>>>         .name("Sink");
>>>>>
>>>>>     env.execute("Exec");
>>>>>
>>>>>
>>>>> The MyStatsProcessor has keyed states defined as:
>>>>>
>>>>>     state1 =
>>>>>         getRuntimeContext().getState(new ValueStateDescriptor<>("state1", 
>>>>> Types.POJO(StateOne.class)));
>>>>>     state2 =
>>>>>         getRuntimeContext().getState(new ValueStateDescriptor<>("state2", 
>>>>> Types.POJO(StateTwo.class)));
>>>>>
>>>>>
>>>>> So my question is how can I read any checkpoint state. I see this API 
>>>>> flink-state-processor-api.
>>>>> Can I use the same here, if so how do I instantiate it:
>>>>>
>>>>> StreamExecutionEnvironment env = 
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader 
>>>>> savepoint = SavepointReader.read(env, 
>>>>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>);
>>>>>
>>>>> Is this the correct way to use this API for reading a checkpoint ?
>>>>>
>>>>> Please note I have also enabled:
>>>>> state.backend.incremental: 'true'
>>>>> state.backend.local-recovery: 'true'
>>>>> state.backend.changelog.enabled: 'true'
>>>>> state.backend.changelog.storage: filesystem
>>>>> dstl.dfs.base-path: s3://{bucket}/changelog
>>>>> dstl.dfs.compression.enabled: 'true'
>>>>>
>>>>> Now after say I am able to create the reader how do I inspect a
>>>>> particular keyed state.
>>>>> I see a function called readKeyedState but I am unsure as to what uuid
>>>>> I need to pass to read a particular state?
>>>>> Would something like this work:
>>>>>
>>>>> DataStream<KeyedState> keyedState = savepoint.readKeyedState(
>>>>> OperatorIdentifier.forUid("Proc"), new ReaderFunction());
>>>>> And now in my KeyedState class, I can access state1 and state2.
>>>>>
>>>>> Would this work?
>>>>>
>>>>> Please let me know if I am on the right track or this is something not
>>>>> possible to read checkpointed states via any external application for
>>>>> debugging.
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>

Reply via email to