Hi Sachin,

You're on the right track in general and this should work :)
I know that version upgrade can be complex but it worth to do so because
we've just added a ~95% performance boost for the state processor api in
1.20 [1].
As you mentioned the SQL connector is on 2.x which is not yet feasible for
majority of the users but good to keep in mind.

[1] https://issues.apache.org/jira/browse/FLINK-37109

BR,
G


On Fri, Feb 21, 2025 at 5:11 AM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> So I have a flink application which stores state (RocksDB state backend)
> in S3 with the following directory structure:
>
> s3://{bucket}/flink-checkpoints/    /{job-id}        |        + --shared/     
>    + --taskowned/        + --chk-<num>/
>
>
> I have my job pipeline defined like:
>
>     final DataStream<Data> events = env.fromSource(..., "Src");
>
>     SingleOutputStreamOperator<StatsData> statsData =
>         events
>             .keyBy(new MyKeySelector())
>             .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>             .name("Proc");
>
>     statsData
>         .addSink(new MySink<>(...))
>         .name("Sink");
>
>     env.execute("Exec");
>
>
> The MyStatsProcessor has keyed states defined as:
>
>     state1 =
>         getRuntimeContext().getState(new ValueStateDescriptor<>("state1", 
> Types.POJO(StateOne.class)));
>     state2 =
>         getRuntimeContext().getState(new ValueStateDescriptor<>("state2", 
> Types.POJO(StateTwo.class)));
>
>
> So my question is how can I read any checkpoint state. I see this API 
> flink-state-processor-api.
> Can I use the same here, if so how do I instantiate it:
>
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader 
> savepoint = SavepointReader.read(env, 
> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>);
>
> Is this the correct way to use this API for reading a checkpoint ?
>
> Please note I have also enabled:
> state.backend.incremental: 'true'
> state.backend.local-recovery: 'true'
> state.backend.changelog.enabled: 'true'
> state.backend.changelog.storage: filesystem
> dstl.dfs.base-path: s3://{bucket}/changelog
> dstl.dfs.compression.enabled: 'true'
>
> Now after say I am able to create the reader how do I inspect a particular
> keyed state.
> I see a function called readKeyedState but I am unsure as to what uuid I
> need to pass to read a particular state?
> Would something like this work:
>
> DataStream<KeyedState> keyedState = savepoint.readKeyedState(
> OperatorIdentifier.forUid("Proc"), new ReaderFunction());
> And now in my KeyedState class, I can access state1 and state2.
>
> Would this work?
>
> Please let me know if I am on the right track or this is something not
> possible to read checkpointed states via any external application for
> debugging.
>
> Thanks
> Sachin
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>

Reply via email to