Hi, I am working on Flink 1.19.1, so I guess I cannot use the SQL connector as that's to be released for 2.1.0 If I directly try to use the flink-state-processor-api, my very first question is how do I know the uuid for each keyed state ? Is it the step name? as in
events .keyBy(new MyKeySelector()) .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) .name("Proc"); If now I want to access keyed states for MyStatsProcessor then I need to access it using: savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new ReaderFunction()); Is this the right way to access it ? Thanks Sachin On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote: > Hi, > > FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1] > > > [1] > https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data > > > -- > Best! > Xuyang > > > At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote: > > Hi, > So I have a flink application which stores state (RocksDB state backend) > in S3 with the following directory structure: > > s3://{bucket}/flink-checkpoints/ /{job-id} | + --shared/ > + --taskowned/ + --chk-<num>/ > > > I have my job pipeline defined like: > > final DataStream<Data> events = env.fromSource(..., "Src"); > > SingleOutputStreamOperator<StatsData> statsData = > events > .keyBy(new MyKeySelector()) > .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) > .name("Proc"); > > statsData > .addSink(new MySink<>(...)) > .name("Sink"); > > env.execute("Exec"); > > > The MyStatsProcessor has keyed states defined as: > > state1 = > getRuntimeContext().getState(new ValueStateDescriptor<>("state1", > Types.POJO(StateOne.class))); > state2 = > getRuntimeContext().getState(new ValueStateDescriptor<>("state2", > Types.POJO(StateTwo.class))); > > > So my question is how can I read any checkpoint state. I see this API > flink-state-processor-api. > Can I use the same here, if so how do I instantiate it: > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader > savepoint = SavepointReader.read(env, > "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>); > > Is this the correct way to use this API for reading a checkpoint ? > > Please note I have also enabled: > state.backend.incremental: 'true' > state.backend.local-recovery: 'true' > state.backend.changelog.enabled: 'true' > state.backend.changelog.storage: filesystem > dstl.dfs.base-path: s3://{bucket}/changelog > dstl.dfs.compression.enabled: 'true' > > Now after say I am able to create the reader how do I inspect a particular > keyed state. > I see a function called readKeyedState but I am unsure as to what uuid I > need to pass to read a particular state? > Would something like this work: > > DataStream<KeyedState> keyedState = savepoint.readKeyedState( > OperatorIdentifier.forUid("Proc"), new ReaderFunction()); > And now in my KeyedState class, I can access state1 and state2. > > Would this work? > > Please let me know if I am on the right track or this is something not > possible to read checkpointed states via any external application for > debugging. > > Thanks > Sachin > > > > > > > > > > > > > > > > > > > > > > >