The plan is to support everything but the question is when.
Related short term options I would suggest to turn off changelog in the
reader and try to read the state as-is.

G


On Wed, Feb 26, 2025 at 1:22 PM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> Looks like this class is very minimally implemented. What exactly does the
> State Processor API support ?
> Looking at that class it seems to only
> support prioritizedOperatorSubtaskState.
>
> Are there any plans to support changelogs in future.
>
> In view of this, what are my options?
>
> I wanted to debug the keyed state one of my operators writes to, to
> understand if there is a state leak and why this state becomes very huge.
>
> Thanks
> Sachin
>
>
> On Wed, Feb 26, 2025 at 5:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> State processor API is now not supporting changelog. Please see here [1].
>>
>> [1]
>> https://github.com/apache/flink/blob/69559fb5d231d704633fed807773cd1853601862/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java#L127
>>
>> G
>>
>>
>> On Wed, Feb 26, 2025 at 11:26 AM Sachin Mittal <sjmit...@gmail.com>
>> wrote:
>>
>>> Hello Folks,
>>> I am trying to read the flink checkpointed state and I am getting this
>>> error:
>>>
>>> Caused by: java.io.IOException: Failed to restore state backend
>>> at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build(
>>> StreamOperatorContextBuilder.java:140)
>>> at org.apache.flink.state.api.input.KeyedStateInputFormat.open(
>>> KeyedStateInputFormat.java:176)
>>> at org.apache.flink.state.api.input.KeyedStateInputFormat.open(
>>> KeyedStateInputFormat.java:66)
>>> at org.apache.flink.streaming.api.functions.source.
>>> InputFormatSourceFunction.run(InputFormatSourceFunction.java:92)
>>> at org.apache.flink.streaming.api.operators.StreamSource.run(
>>> StreamSource.java:181)
>>> at org.apache.flink.streaming.api.operators.StreamSource.run(
>>> StreamSource.java:78)
>>> at org.apache.flink.streaming.runtime.tasks.
>>> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:
>>> 340)
>>> Caused by: java.lang.Exception: Exception while creating
>>> StreamOperatorStateContext.
>>> at org.apache.flink.streaming.api.operators.
>>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>>> StreamTaskStateInitializerImpl.java:294)
>>> at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build(
>>> StreamOperatorContextBuilder.java:129)
>>> ... 6 more
>>> Caused by: org.apache.flink.util.FlinkException: Could not restore
>>> keyed state backend for 31
>>> ba21805c9537cdf994f701a2b6f2ee_31ba21805c9537cdf994f701a2b6f2ee_(1/1)
>>> from any of the 1 provided restore options.
>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>> .createAndRestore(BackendRestorerProcedure.java:165)
>>> at org.apache.flink.streaming.api.operators.
>>> StreamTaskStateInitializerImpl.keyedStatedBackend(
>>> StreamTaskStateInitializerImpl.java:399)
>>> at org.apache.flink.streaming.api.operators.
>>> StreamTaskStateInitializerImpl.streamOperatorStateContext(
>>> StreamTaskStateInitializerImpl.java:180)
>>> ... 7 more
>>> Caused by: java.lang.NullPointerException
>>> at org.apache.flink.state.changelog.restore.
>>> ChangelogBackendRestoreOperation.readBackendHandle(
>>> ChangelogBackendRestoreOperation.java:104)
>>> at org.apache.flink.state.changelog.restore.
>>> ChangelogBackendRestoreOperation.restore(
>>> ChangelogBackendRestoreOperation.java:78)
>>> at org.apache.flink.state.changelog.DeactivatedChangelogStateBackend
>>> .restore(DeactivatedChangelogStateBackend.java:70)
>>> at org.apache.flink.state.changelog.AbstractChangelogStateBackend
>>> .createKeyedStateBackend(AbstractChangelogStateBackend.java:81)
>>> at org.apache.flink.streaming.api.operators.
>>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3(
>>> StreamTaskStateInitializerImpl.java:393)
>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173)
>>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure
>>> .createAndRestore(BackendRestorerProcedure.java:137)
>>>
>>> My code to read the state is like:
>>>
>>> SavepointReader savepoint =
>>>     SavepointReader.read(env, 
>>> "s3://{bucket}/flink-checkpoints/{jobId}/chk-{num}/_metadata", new 
>>> EmbeddedRocksDBStateBackend(true));
>>>
>>> DataStream<State> keyedState =
>>>     savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new 
>>> StateReaderFunction(), Types.LONG, Types.POJO(State.class));
>>>
>>> keyedState.print();
>>>
>>> env.execute("Analysis");
>>>
>>>
>>> Any idea as to what could be going wrong.
>>> Also note this is my checkpoint config:
>>> state.backend.type: rocksdb
>>> state.checkpoints.dir: s3://{bucket}/flink-checkpoints
>>> state.backend.incremental: 'true'
>>> state.backend.local-recovery: 'true'
>>> state.backend.changelog.enabled: 'true'
>>> state.backend.changelog.storage: filesystem
>>> dstl.dfs.base-path: s3:/{bucket}/changelog
>>> dstl.dfs.compression.enabled: 'true'
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>>
>>>
>>>
>>> On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
>>> wrote:
>>>
>>>> The UID must match in the Flink app `events.uid("my-uid")` and in the
>>>> reader app `forUid("my-uid")`.
>>>> In general it's a good practice to set uid in the Flink app for each
>>>> and every operator, otherwise Flink generates an almost random number for
>>>> it.
>>>>
>>>> When you don't know the generated uid then no worry there are nifty
>>>> tricks to find it out.
>>>>
>>>> BR,
>>>> G
>>>>
>>>>
>>>> On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>> I am working on Flink 1.19.1, so I guess I cannot use the SQL
>>>>> connector as that's to be released for 2.1.0
>>>>> If I directly try to use the flink-state-processor-api, my very first
>>>>> question is how do I know the uuid for each keyed state ?
>>>>> Is it the step name?
>>>>> as in
>>>>>
>>>>> events
>>>>> .keyBy(new MyKeySelector())
>>>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>>>>> .name("Proc");
>>>>>
>>>>> If now I want to access keyed states for MyStatsProcessor then I need
>>>>> to access it using:
>>>>>
>>>>> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
>>>>> ReaderFunction());
>>>>>
>>>>> Is this the right way to access it ?
>>>>>
>>>>> Thanks
>>>>> Sachin
>>>>>
>>>>>
>>>>> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint
>>>>>> data[1]
>>>>>>
>>>>>>
>>>>>> [1]
>>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data
>>>>>>
>>>>>>
>>>>>> --
>>>>>>     Best!
>>>>>>     Xuyang
>>>>>>
>>>>>>
>>>>>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote:
>>>>>>
>>>>>> Hi,
>>>>>> So I have a flink application which stores state (RocksDB state
>>>>>> backend) in S3 with the following directory structure:
>>>>>>
>>>>>> s3://{bucket}/flink-checkpoints/    /{job-id}        |        + 
>>>>>> --shared/        + --taskowned/        + --chk-<num>/
>>>>>>
>>>>>>
>>>>>> I have my job pipeline defined like:
>>>>>>
>>>>>>     final DataStream<Data> events = env.fromSource(..., "Src");
>>>>>>
>>>>>>     SingleOutputStreamOperator<StatsData> statsData =
>>>>>>         events
>>>>>>             .keyBy(new MyKeySelector())
>>>>>>             .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>>>>>>             .name("Proc");
>>>>>>
>>>>>>     statsData
>>>>>>         .addSink(new MySink<>(...))
>>>>>>         .name("Sink");
>>>>>>
>>>>>>     env.execute("Exec");
>>>>>>
>>>>>>
>>>>>> The MyStatsProcessor has keyed states defined as:
>>>>>>
>>>>>>     state1 =
>>>>>>         getRuntimeContext().getState(new 
>>>>>> ValueStateDescriptor<>("state1", Types.POJO(StateOne.class)));
>>>>>>     state2 =
>>>>>>         getRuntimeContext().getState(new 
>>>>>> ValueStateDescriptor<>("state2", Types.POJO(StateTwo.class)));
>>>>>>
>>>>>>
>>>>>> So my question is how can I read any checkpoint state. I see this API 
>>>>>> flink-state-processor-api.
>>>>>> Can I use the same here, if so how do I instantiate it:
>>>>>>
>>>>>> StreamExecutionEnvironment env = 
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader 
>>>>>> savepoint = SavepointReader.read(env, 
>>>>>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to 
>>>>>> use>);
>>>>>>
>>>>>> Is this the correct way to use this API for reading a checkpoint ?
>>>>>>
>>>>>> Please note I have also enabled:
>>>>>> state.backend.incremental: 'true'
>>>>>> state.backend.local-recovery: 'true'
>>>>>> state.backend.changelog.enabled: 'true'
>>>>>> state.backend.changelog.storage: filesystem
>>>>>> dstl.dfs.base-path: s3://{bucket}/changelog
>>>>>> dstl.dfs.compression.enabled: 'true'
>>>>>>
>>>>>> Now after say I am able to create the reader how do I inspect a
>>>>>> particular keyed state.
>>>>>> I see a function called readKeyedState but I am unsure as to what
>>>>>> uuid I need to pass to read a particular state?
>>>>>> Would something like this work:
>>>>>>
>>>>>> DataStream<KeyedState> keyedState = savepoint.readKeyedState(
>>>>>> OperatorIdentifier.forUid("Proc"), new ReaderFunction());
>>>>>> And now in my KeyedState class, I can access state1 and state2.
>>>>>>
>>>>>> Would this work?
>>>>>>
>>>>>> Please let me know if I am on the right track or this is something
>>>>>> not possible to read checkpointed states via any external application for
>>>>>> debugging.
>>>>>>
>>>>>> Thanks
>>>>>> Sachin
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>

Reply via email to