Hi, 

FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint data[1]




[1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data




--

    Best!
    Xuyang




At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote:

Hi,

So I have a flink application which stores state (RocksDB state backend) in S3 
with the following directory structure:
s3://{bucket}/flink-checkpoints/
/{job-id}|        + --shared/
        + --taskowned/
        + --chk-<num>/


I have my job pipeline defined like:
finalDataStream<Data>events = env.fromSource(..., "Src");


SingleOutputStreamOperator<StatsData> statsData =
        events
            .keyBy(newMyKeySelector())
            .process(newMyStatsProcessor(), Types.POJO(StatsData.class))
            .name("Proc");


    statsData
        .addSink(newMySink<>(...))
        .name("Sink");


    env.execute("Exec");


The MyStatsProcessor has keyed states defined as:
    state1 =
        getRuntimeContext().getState(newValueStateDescriptor<>("state1", 
Types.POJO(StateOne.class)));
    state2 =
        getRuntimeContext().getState(newValueStateDescriptor<>("state2", 
Types.POJO(StateTwo.class)));




So my question is how can I read any checkpoint state. I see this API 
flink-state-processor-api.
Can I use the same here, if so how do I instantiate it:


StreamExecutionEnvironmentenv=StreamExecutionEnvironment.getExecutionEnvironment();SavepointReadersavepoint=SavepointReader.read(env,"s3://{bucket}/flink-checkpoints/{job-id}",
 <which state backend to use>);
Is this the correct way to use this API for reading a checkpoint ?


Please note I have also enabled:
state.backend.incremental: 'true'
state.backend.local-recovery: 'true'
state.backend.changelog.enabled: 'true'
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: s3://{bucket}/changelog
dstl.dfs.compression.enabled: 'true'


Now after say I am able to create the reader how do I inspect a particular 
keyed state.
I see a function called readKeyedState but I am unsure as to what uuid I need 
to pass to read a particular state?
Would something like this work:


DataStream<KeyedState> keyedState = 
savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), 
newReaderFunction());
And now in my KeyedState class, I can access state1 and state2.


Would this work?


Please let me know if I am on the right track or this is something not possible 
to read checkpointed states via any external application for debugging.


Thanks
Sachin











































Reply via email to