The plan is to support everything but the question is when. Related short term options I would suggest to turn off changelog in the reader and try to read the state as-is.
G On Wed, Feb 26, 2025 at 1:22 PM Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > Looks like this class is very minimally implemented. What exactly does the > State Processor API support ? > Looking at that class it seems to only > support prioritizedOperatorSubtaskState. > > Are there any plans to support changelogs in future. > > In view of this, what are my options? > > I wanted to debug the keyed state one of my operators writes to, to > understand if there is a state leak and why this state becomes very huge. > > Thanks > Sachin > > > On Wed, Feb 26, 2025 at 5:01 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> State processor API is now not supporting changelog. Please see here [1]. >> >> [1] >> https://github.com/apache/flink/blob/69559fb5d231d704633fed807773cd1853601862/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/SavepointTaskStateManager.java#L127 >> >> G >> >> >> On Wed, Feb 26, 2025 at 11:26 AM Sachin Mittal <sjmit...@gmail.com> >> wrote: >> >>> Hello Folks, >>> I am trying to read the flink checkpointed state and I am getting this >>> error: >>> >>> Caused by: java.io.IOException: Failed to restore state backend >>> at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build( >>> StreamOperatorContextBuilder.java:140) >>> at org.apache.flink.state.api.input.KeyedStateInputFormat.open( >>> KeyedStateInputFormat.java:176) >>> at org.apache.flink.state.api.input.KeyedStateInputFormat.open( >>> KeyedStateInputFormat.java:66) >>> at org.apache.flink.streaming.api.functions.source. >>> InputFormatSourceFunction.run(InputFormatSourceFunction.java:92) >>> at org.apache.flink.streaming.api.operators.StreamSource.run( >>> StreamSource.java:181) >>> at org.apache.flink.streaming.api.operators.StreamSource.run( >>> StreamSource.java:78) >>> at org.apache.flink.streaming.runtime.tasks. >>> SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java: >>> 340) >>> Caused by: java.lang.Exception: Exception while creating >>> StreamOperatorStateContext. >>> at org.apache.flink.streaming.api.operators. >>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>> StreamTaskStateInitializerImpl.java:294) >>> at org.apache.flink.state.api.input.StreamOperatorContextBuilder.build( >>> StreamOperatorContextBuilder.java:129) >>> ... 6 more >>> Caused by: org.apache.flink.util.FlinkException: Could not restore >>> keyed state backend for 31 >>> ba21805c9537cdf994f701a2b6f2ee_31ba21805c9537cdf994f701a2b6f2ee_(1/1) >>> from any of the 1 provided restore options. >>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>> .createAndRestore(BackendRestorerProcedure.java:165) >>> at org.apache.flink.streaming.api.operators. >>> StreamTaskStateInitializerImpl.keyedStatedBackend( >>> StreamTaskStateInitializerImpl.java:399) >>> at org.apache.flink.streaming.api.operators. >>> StreamTaskStateInitializerImpl.streamOperatorStateContext( >>> StreamTaskStateInitializerImpl.java:180) >>> ... 7 more >>> Caused by: java.lang.NullPointerException >>> at org.apache.flink.state.changelog.restore. >>> ChangelogBackendRestoreOperation.readBackendHandle( >>> ChangelogBackendRestoreOperation.java:104) >>> at org.apache.flink.state.changelog.restore. >>> ChangelogBackendRestoreOperation.restore( >>> ChangelogBackendRestoreOperation.java:78) >>> at org.apache.flink.state.changelog.DeactivatedChangelogStateBackend >>> .restore(DeactivatedChangelogStateBackend.java:70) >>> at org.apache.flink.state.changelog.AbstractChangelogStateBackend >>> .createKeyedStateBackend(AbstractChangelogStateBackend.java:81) >>> at org.apache.flink.streaming.api.operators. >>> StreamTaskStateInitializerImpl.lambda$keyedStatedBackend$3( >>> StreamTaskStateInitializerImpl.java:393) >>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>> .attemptCreateAndRestore(BackendRestorerProcedure.java:173) >>> at org.apache.flink.streaming.api.operators.BackendRestorerProcedure >>> .createAndRestore(BackendRestorerProcedure.java:137) >>> >>> My code to read the state is like: >>> >>> SavepointReader savepoint = >>> SavepointReader.read(env, >>> "s3://{bucket}/flink-checkpoints/{jobId}/chk-{num}/_metadata", new >>> EmbeddedRocksDBStateBackend(true)); >>> >>> DataStream<State> keyedState = >>> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new >>> StateReaderFunction(), Types.LONG, Types.POJO(State.class)); >>> >>> keyedState.print(); >>> >>> env.execute("Analysis"); >>> >>> >>> Any idea as to what could be going wrong. >>> Also note this is my checkpoint config: >>> state.backend.type: rocksdb >>> state.checkpoints.dir: s3://{bucket}/flink-checkpoints >>> state.backend.incremental: 'true' >>> state.backend.local-recovery: 'true' >>> state.backend.changelog.enabled: 'true' >>> state.backend.changelog.storage: filesystem >>> dstl.dfs.base-path: s3:/{bucket}/changelog >>> dstl.dfs.compression.enabled: 'true' >>> >>> Thanks >>> Sachin >>> >>> >>> >>> >>> >>> On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com> >>> wrote: >>> >>>> The UID must match in the Flink app `events.uid("my-uid")` and in the >>>> reader app `forUid("my-uid")`. >>>> In general it's a good practice to set uid in the Flink app for each >>>> and every operator, otherwise Flink generates an almost random number for >>>> it. >>>> >>>> When you don't know the generated uid then no worry there are nifty >>>> tricks to find it out. >>>> >>>> BR, >>>> G >>>> >>>> >>>> On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com> >>>> wrote: >>>> >>>>> Hi, >>>>> I am working on Flink 1.19.1, so I guess I cannot use the SQL >>>>> connector as that's to be released for 2.1.0 >>>>> If I directly try to use the flink-state-processor-api, my very first >>>>> question is how do I know the uuid for each keyed state ? >>>>> Is it the step name? >>>>> as in >>>>> >>>>> events >>>>> .keyBy(new MyKeySelector()) >>>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >>>>> .name("Proc"); >>>>> >>>>> If now I want to access keyed states for MyStatsProcessor then I need >>>>> to access it using: >>>>> >>>>> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new >>>>> ReaderFunction()); >>>>> >>>>> Is this the right way to access it ? >>>>> >>>>> Thanks >>>>> Sachin >>>>> >>>>> >>>>> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint >>>>>> data[1] >>>>>> >>>>>> >>>>>> [1] >>>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data >>>>>> >>>>>> >>>>>> -- >>>>>> Best! >>>>>> Xuyang >>>>>> >>>>>> >>>>>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote: >>>>>> >>>>>> Hi, >>>>>> So I have a flink application which stores state (RocksDB state >>>>>> backend) in S3 with the following directory structure: >>>>>> >>>>>> s3://{bucket}/flink-checkpoints/ /{job-id} | + >>>>>> --shared/ + --taskowned/ + --chk-<num>/ >>>>>> >>>>>> >>>>>> I have my job pipeline defined like: >>>>>> >>>>>> final DataStream<Data> events = env.fromSource(..., "Src"); >>>>>> >>>>>> SingleOutputStreamOperator<StatsData> statsData = >>>>>> events >>>>>> .keyBy(new MyKeySelector()) >>>>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >>>>>> .name("Proc"); >>>>>> >>>>>> statsData >>>>>> .addSink(new MySink<>(...)) >>>>>> .name("Sink"); >>>>>> >>>>>> env.execute("Exec"); >>>>>> >>>>>> >>>>>> The MyStatsProcessor has keyed states defined as: >>>>>> >>>>>> state1 = >>>>>> getRuntimeContext().getState(new >>>>>> ValueStateDescriptor<>("state1", Types.POJO(StateOne.class))); >>>>>> state2 = >>>>>> getRuntimeContext().getState(new >>>>>> ValueStateDescriptor<>("state2", Types.POJO(StateTwo.class))); >>>>>> >>>>>> >>>>>> So my question is how can I read any checkpoint state. I see this API >>>>>> flink-state-processor-api. >>>>>> Can I use the same here, if so how do I instantiate it: >>>>>> >>>>>> StreamExecutionEnvironment env = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader >>>>>> savepoint = SavepointReader.read(env, >>>>>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to >>>>>> use>); >>>>>> >>>>>> Is this the correct way to use this API for reading a checkpoint ? >>>>>> >>>>>> Please note I have also enabled: >>>>>> state.backend.incremental: 'true' >>>>>> state.backend.local-recovery: 'true' >>>>>> state.backend.changelog.enabled: 'true' >>>>>> state.backend.changelog.storage: filesystem >>>>>> dstl.dfs.base-path: s3://{bucket}/changelog >>>>>> dstl.dfs.compression.enabled: 'true' >>>>>> >>>>>> Now after say I am able to create the reader how do I inspect a >>>>>> particular keyed state. >>>>>> I see a function called readKeyedState but I am unsure as to what >>>>>> uuid I need to pass to read a particular state? >>>>>> Would something like this work: >>>>>> >>>>>> DataStream<KeyedState> keyedState = savepoint.readKeyedState( >>>>>> OperatorIdentifier.forUid("Proc"), new ReaderFunction()); >>>>>> And now in my KeyedState class, I can access state1 and state2. >>>>>> >>>>>> Would this work? >>>>>> >>>>>> Please let me know if I am on the right track or this is something >>>>>> not possible to read checkpointed states via any external application for >>>>>> debugging. >>>>>> >>>>>> Thanks >>>>>> Sachin >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>>