Hi Sachin.
When you define your pipeline, there is an option to assign a UID to each task
in the job graph. If you do not, Flink will auto-generate UID for you – each
time you run the pipeline.
It is highly recommended to define your own UIDs, since any re-run would assign
new UIDs and effectively make your save-points useless (they would refer to
tasks that can no longer be identified correctly). So, something like this:
events
.keyBy(new MyKeySelector())
.process(new MyStatsProcessor(), Types.POJO(StatsData.class))
.name("Proc")
.uid("My-first-Proc")
.description("This is my first Proc");
Nix,.
From: Sachin Mittal <[email protected]>
Date: Friday, February 21, 2025 at 8:40 AM
To: Xuyang <[email protected]>
Cc: user <[email protected]>
Subject: Re: How can we read checkpoint data for debugging state
Hi,
I am working on Flink 1.19.1, so I guess I cannot use the SQL connector as
that's to be released for 2.1.0
If I directly try to use the flink-state-processor-api, my very first question
is how do I know the uuid for each keyed state ?
Is it the step name?
as in
events
.keyBy(new MyKeySelector())
.process(new MyStatsProcessor(), Types.POJO(StatsData.class))
.name("Proc");
If now I want to access keyed states for MyStatsProcessor then I need to access
it using:
savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
ReaderFunction());
Is this the right way to access it ?
Thanks
Sachin
On Fri, Feb 21, 2025 at 11:09 AM Xuyang
<[email protected]<mailto:[email protected]>> wrote:
Hi,
FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1]
[1]
https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data
--
Best!
Xuyang
At 2025-02-21 12:11:02, "Sachin Mittal"
<[email protected]<mailto:[email protected]>> wrote:
Hi,
So I have a flink application which stores state (RocksDB state backend) in S3
with the following directory structure:
s3://{bucket}/flink-checkpoints/
/{job-id}
|
+ --shared/
+ --taskowned/
+ --chk-<num>/
I have my job pipeline defined like:
final DataStream<Data> events = env.fromSource(..., "Src");
SingleOutputStreamOperator<StatsData> statsData =
events
.keyBy(new MyKeySelector())
.process(new MyStatsProcessor(), Types.POJO(StatsData.class))
.name("Proc");
statsData
.addSink(new MySink<>(...))
.name("Sink");
env.execute("Exec");
The MyStatsProcessor has keyed states defined as:
state1 =
getRuntimeContext().getState(new ValueStateDescriptor<>("state1",
Types.POJO(StateOne.class)));
state2 =
getRuntimeContext().getState(new ValueStateDescriptor<>("state2",
Types.POJO(StateTwo.class)));
So my question is how can I read any checkpoint state. I see this API
flink-state-processor-api.
Can I use the same here, if so how do I instantiate it:
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
SavepointReader savepoint = SavepointReader.read(env,
"s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>);
Is this the correct way to use this API for reading a checkpoint ?
Please note I have also enabled:
state.backend.incremental: 'true'
state.backend.local-recovery: 'true'
state.backend.changelog.enabled: 'true'
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: s3://{bucket}/changelog
dstl.dfs.compression.enabled: 'true'
Now after say I am able to create the reader how do I inspect a particular
keyed state.
I see a function called readKeyedState but I am unsure as to what uuid I need
to pass to read a particular state?
Would something like this work:
DataStream<KeyedState> keyedState =
savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
ReaderFunction());
And now in my KeyedState class, I can access state1 and state2.
Would this work?
Please let me know if I am on the right track or this is something not possible
to read checkpointed states via any external application for debugging.
Thanks
Sachin