Hi Sachin, You're on the right track in general and this should work :) I know that version upgrade can be complex but it worth to do so because we've just added a ~95% performance boost for the state processor api in 1.20 [1]. As you mentioned the SQL connector is on 2.x which is not yet feasible for majority of the users but good to keep in mind.
[1] https://issues.apache.org/jira/browse/FLINK-37109 BR, G On Fri, Feb 21, 2025 at 5:11 AM Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > So I have a flink application which stores state (RocksDB state backend) > in S3 with the following directory structure: > > s3://{bucket}/flink-checkpoints/ /{job-id} | + --shared/ > + --taskowned/ + --chk-<num>/ > > > I have my job pipeline defined like: > > final DataStream<Data> events = env.fromSource(..., "Src"); > > SingleOutputStreamOperator<StatsData> statsData = > events > .keyBy(new MyKeySelector()) > .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) > .name("Proc"); > > statsData > .addSink(new MySink<>(...)) > .name("Sink"); > > env.execute("Exec"); > > > The MyStatsProcessor has keyed states defined as: > > state1 = > getRuntimeContext().getState(new ValueStateDescriptor<>("state1", > Types.POJO(StateOne.class))); > state2 = > getRuntimeContext().getState(new ValueStateDescriptor<>("state2", > Types.POJO(StateTwo.class))); > > > So my question is how can I read any checkpoint state. I see this API > flink-state-processor-api. > Can I use the same here, if so how do I instantiate it: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader > savepoint = SavepointReader.read(env, > "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>); > > Is this the correct way to use this API for reading a checkpoint ? > > Please note I have also enabled: > state.backend.incremental: 'true' > state.backend.local-recovery: 'true' > state.backend.changelog.enabled: 'true' > state.backend.changelog.storage: filesystem > dstl.dfs.base-path: s3://{bucket}/changelog > dstl.dfs.compression.enabled: 'true' > > Now after say I am able to create the reader how do I inspect a particular > keyed state. > I see a function called readKeyedState but I am unsure as to what uuid I > need to pass to read a particular state? > Would something like this work: > > DataStream<KeyedState> keyedState = savepoint.readKeyedState( > OperatorIdentifier.forUid("Proc"), new ReaderFunction()); > And now in my KeyedState class, I can access state1 and state2. > > Would this work? > > Please let me know if I am on the right track or this is something not > possible to read checkpointed states via any external application for > debugging. > > Thanks > Sachin > > > > > > > > > > > > > > > > > > > > > > >