Hi,
I am working on Flink 1.19.1, so I guess I cannot use the SQL connector as
that's to be released for 2.1.0
If I directly try to use the flink-state-processor-api, my very first
question is how do I know the uuid for each keyed state ?
Is it the step name?
as in

events
.keyBy(new MyKeySelector())
.process(new MyStatsProcessor(), Types.POJO(StatsData.class))
.name("Proc");

If now I want to access keyed states for MyStatsProcessor then I need to
access it using:

savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
ReaderFunction());

Is this the right way to access it ?

Thanks
Sachin


On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote:

> Hi,
>
> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1]
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data
>
>
> --
>     Best!
>     Xuyang
>
>
> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote:
>
> Hi,
> So I have a flink application which stores state (RocksDB state backend)
> in S3 with the following directory structure:
>
> s3://{bucket}/flink-checkpoints/    /{job-id}        |        + --shared/     
>    + --taskowned/        + --chk-<num>/
>
>
> I have my job pipeline defined like:
>
>     final DataStream<Data> events = env.fromSource(..., "Src");
>
>     SingleOutputStreamOperator<StatsData> statsData =
>         events
>             .keyBy(new MyKeySelector())
>             .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>             .name("Proc");
>
>     statsData
>         .addSink(new MySink<>(...))
>         .name("Sink");
>
>     env.execute("Exec");
>
>
> The MyStatsProcessor has keyed states defined as:
>
>     state1 =
>         getRuntimeContext().getState(new ValueStateDescriptor<>("state1", 
> Types.POJO(StateOne.class)));
>     state2 =
>         getRuntimeContext().getState(new ValueStateDescriptor<>("state2", 
> Types.POJO(StateTwo.class)));
>
>
> So my question is how can I read any checkpoint state. I see this API 
> flink-state-processor-api.
> Can I use the same here, if so how do I instantiate it:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader 
> savepoint = SavepointReader.read(env, 
> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>);
>
> Is this the correct way to use this API for reading a checkpoint ?
>
> Please note I have also enabled:
> state.backend.incremental: 'true'
> state.backend.local-recovery: 'true'
> state.backend.changelog.enabled: 'true'
> state.backend.changelog.storage: filesystem
> dstl.dfs.base-path: s3://{bucket}/changelog
> dstl.dfs.compression.enabled: 'true'
>
> Now after say I am able to create the reader how do I inspect a particular
> keyed state.
> I see a function called readKeyedState but I am unsure as to what uuid I
> need to pass to read a particular state?
> Would something like this work:
>
> DataStream<KeyedState> keyedState = savepoint.readKeyedState(
> OperatorIdentifier.forUid("Proc"), new ReaderFunction());
> And now in my KeyedState class, I can access state1 and state2.
>
> Would this work?
>
> Please let me know if I am on the right track or this is something not
> possible to read checkpointed states via any external application for
> debugging.
>
> Thanks
> Sachin
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to