Hi Sachin.

When you define your pipeline, there is an option to assign a UID to each task 
in the job graph. If you do not, Flink will auto-generate UID for you – each 
time you run the pipeline.

It is highly recommended to define your own UIDs, since any re-run would assign 
new UIDs and effectively make your save-points useless (they would refer to 
tasks that can no longer be identified correctly). So, something like this:

events
.keyBy(new MyKeySelector())
.process(new MyStatsProcessor(), Types.POJO(StatsData.class))
    .name("Proc")
    .uid("My-first-Proc")
    .description("This is my first Proc");

Nix,.

From: Sachin Mittal <sjmit...@gmail.com>
Date: Friday, February 21, 2025 at 8:40 AM
To: Xuyang <xyzhong...@163.com>
Cc: user <user@flink.apache.org>
Subject: Re: How can we read checkpoint data for debugging state
Hi,
I am working on Flink 1.19.1, so I guess I cannot use the SQL connector as 
that's to be released for 2.1.0
If I directly try to use the flink-state-processor-api, my very first question 
is how do I know the uuid for each keyed state ?
Is it the step name?
as in

events
.keyBy(new MyKeySelector())
.process(new MyStatsProcessor(), Types.POJO(StatsData.class))
.name("Proc");

If now I want to access keyed states for MyStatsProcessor then I need to access 
it using:

savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new 
ReaderFunction());

Is this the right way to access it ?

Thanks
Sachin


On Fri, Feb 21, 2025 at 11:09 AM Xuyang 
<xyzhong...@163.com<mailto:xyzhong...@163.com>> wrote:

Hi,

FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1]



[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data



--
    Best!
    Xuyang



At 2025-02-21 12:11:02, "Sachin Mittal" 
<sjmit...@gmail.com<mailto:sjmit...@gmail.com>> wrote:
Hi,
So I have a flink application which stores state (RocksDB state backend) in S3 
with the following directory structure:

s3://{bucket}/flink-checkpoints/

    /{job-id}

        |

        + --shared/

        + --taskowned/

        + --chk-<num>/



I have my job pipeline defined like:

    final DataStream<Data> events = env.fromSource(..., "Src");



    SingleOutputStreamOperator<StatsData> statsData =

        events

            .keyBy(new MyKeySelector())

            .process(new MyStatsProcessor(), Types.POJO(StatsData.class))

            .name("Proc");



    statsData

        .addSink(new MySink<>(...))

        .name("Sink");



    env.execute("Exec");



The MyStatsProcessor has keyed states defined as:

    state1 =

        getRuntimeContext().getState(new ValueStateDescriptor<>("state1", 
Types.POJO(StateOne.class)));

    state2 =

        getRuntimeContext().getState(new ValueStateDescriptor<>("state2", 
Types.POJO(StateTwo.class)));





So my question is how can I read any checkpoint state. I see this API 
flink-state-processor-api.

Can I use the same here, if so how do I instantiate it:



StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();

SavepointReader savepoint = SavepointReader.read(env, 
"s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>);

Is this the correct way to use this API for reading a checkpoint ?



Please note I have also enabled:

            state.backend.incremental: 'true'

            state.backend.local-recovery: 'true'

            state.backend.changelog.enabled: 'true'

            state.backend.changelog.storage: filesystem

            dstl.dfs.base-path: s3://{bucket}/changelog

            dstl.dfs.compression.enabled: 'true'



Now after say I am able to create the reader how do I inspect a particular 
keyed state.

I see a function called readKeyedState but I am unsure as to what uuid I need 
to pass to read a particular state?

Would something like this work:



DataStream<KeyedState> keyedState = 
savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new 
ReaderFunction());

And now in my KeyedState class, I can access state1 and state2.



Would this work?



Please let me know if I am on the right track or this is something not possible 
to read checkpointed states via any external application for debugging.



Thanks

Sachin












































Reply via email to