Hi,
I am working on Flink 1.19.1, so I guess I cannot use the SQL connector as
that's to be released for 2.1.0
If I directly try to use the flink-state-processor-api, my very first
question is how do I know the uuid for each keyed state ?
Is it the step name?
as in
events
.keyBy(new MyKeySelector())
.process(new MyStatsProcessor(), Types.POJO(StatsData.class))
.name("Proc");
If now I want to access keyed states for MyStatsProcessor then I need to
access it using:
savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
ReaderFunction());
Is this the right way to access it ?
Thanks
Sachin
On Fri, Feb 21, 2025 at 11:09 AM Xuyang <[email protected]> wrote:
> Hi,
>
> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1]
>
>
> [1]
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data
>
>
> --
> Best!
> Xuyang
>
>
> At 2025-02-21 12:11:02, "Sachin Mittal" <[email protected]> wrote:
>
> Hi,
> So I have a flink application which stores state (RocksDB state backend)
> in S3 with the following directory structure:
>
> s3://{bucket}/flink-checkpoints/ /{job-id} | + --shared/
> + --taskowned/ + --chk-<num>/
>
>
> I have my job pipeline defined like:
>
> final DataStream<Data> events = env.fromSource(..., "Src");
>
> SingleOutputStreamOperator<StatsData> statsData =
> events
> .keyBy(new MyKeySelector())
> .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
> .name("Proc");
>
> statsData
> .addSink(new MySink<>(...))
> .name("Sink");
>
> env.execute("Exec");
>
>
> The MyStatsProcessor has keyed states defined as:
>
> state1 =
> getRuntimeContext().getState(new ValueStateDescriptor<>("state1",
> Types.POJO(StateOne.class)));
> state2 =
> getRuntimeContext().getState(new ValueStateDescriptor<>("state2",
> Types.POJO(StateTwo.class)));
>
>
> So my question is how can I read any checkpoint state. I see this API
> flink-state-processor-api.
> Can I use the same here, if so how do I instantiate it:
>
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader
> savepoint = SavepointReader.read(env,
> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>);
>
> Is this the correct way to use this API for reading a checkpoint ?
>
> Please note I have also enabled:
> state.backend.incremental: 'true'
> state.backend.local-recovery: 'true'
> state.backend.changelog.enabled: 'true'
> state.backend.changelog.storage: filesystem
> dstl.dfs.base-path: s3://{bucket}/changelog
> dstl.dfs.compression.enabled: 'true'
>
> Now after say I am able to create the reader how do I inspect a particular
> keyed state.
> I see a function called readKeyedState but I am unsure as to what uuid I
> need to pass to read a particular state?
> Would something like this work:
>
> DataStream<KeyedState> keyedState = savepoint.readKeyedState(
> OperatorIdentifier.forUid("Proc"), new ReaderFunction());
> And now in my KeyedState class, I can access state1 and state2.
>
> Would this work?
>
> Please let me know if I am on the right track or this is something not
> possible to read checkpointed states via any external application for
> debugging.
>
> Thanks
> Sachin
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>