Hi Sachin. When you define your pipeline, there is an option to assign a UID to each task in the job graph. If you do not, Flink will auto-generate UID for you – each time you run the pipeline.
It is highly recommended to define your own UIDs, since any re-run would assign new UIDs and effectively make your save-points useless (they would refer to tasks that can no longer be identified correctly). So, something like this: events .keyBy(new MyKeySelector()) .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) .name("Proc") .uid("My-first-Proc") .description("This is my first Proc"); Nix,. From: Sachin Mittal <sjmit...@gmail.com> Date: Friday, February 21, 2025 at 8:40 AM To: Xuyang <xyzhong...@163.com> Cc: user <user@flink.apache.org> Subject: Re: How can we read checkpoint data for debugging state Hi, I am working on Flink 1.19.1, so I guess I cannot use the SQL connector as that's to be released for 2.1.0 If I directly try to use the flink-state-processor-api, my very first question is how do I know the uuid for each keyed state ? Is it the step name? as in events .keyBy(new MyKeySelector()) .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) .name("Proc"); If now I want to access keyed states for MyStatsProcessor then I need to access it using: savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new ReaderFunction()); Is this the right way to access it ? Thanks Sachin On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com<mailto:xyzhong...@163.com>> wrote: Hi, FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1] [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data -- Best! Xuyang At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com<mailto:sjmit...@gmail.com>> wrote: Hi, So I have a flink application which stores state (RocksDB state backend) in S3 with the following directory structure: s3://{bucket}/flink-checkpoints/ /{job-id} | + --shared/ + --taskowned/ + --chk-<num>/ I have my job pipeline defined like: final DataStream<Data> events = env.fromSource(..., "Src"); SingleOutputStreamOperator<StatsData> statsData = events .keyBy(new MyKeySelector()) .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) .name("Proc"); statsData .addSink(new MySink<>(...)) .name("Sink"); env.execute("Exec"); The MyStatsProcessor has keyed states defined as: state1 = getRuntimeContext().getState(new ValueStateDescriptor<>("state1", Types.POJO(StateOne.class))); state2 = getRuntimeContext().getState(new ValueStateDescriptor<>("state2", Types.POJO(StateTwo.class))); So my question is how can I read any checkpoint state. I see this API flink-state-processor-api. Can I use the same here, if so how do I instantiate it: StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); SavepointReader savepoint = SavepointReader.read(env, "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>); Is this the correct way to use this API for reading a checkpoint ? Please note I have also enabled: state.backend.incremental: 'true' state.backend.local-recovery: 'true' state.backend.changelog.enabled: 'true' state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://{bucket}/changelog dstl.dfs.compression.enabled: 'true' Now after say I am able to create the reader how do I inspect a particular keyed state. I see a function called readKeyedState but I am unsure as to what uuid I need to pass to read a particular state? Would something like this work: DataStream<KeyedState> keyedState = savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new ReaderFunction()); And now in my KeyedState class, I can access state1 and state2. Would this work? Please let me know if I am on the right track or this is something not possible to read checkpointed states via any external application for debugging. Thanks Sachin