Hi,
So I have a flink application which stores state (RocksDB state backend) in
S3 with the following directory structure:

s3://{bucket}/flink-checkpoints/    /{job-id}        |        +
--shared/        + --taskowned/        + --chk-<num>/


I have my job pipeline defined like:

    final DataStream<Data> events = env.fromSource(..., "Src");

    SingleOutputStreamOperator<StatsData> statsData =
        events
            .keyBy(new MyKeySelector())
            .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
            .name("Proc");

    statsData
        .addSink(new MySink<>(...))
        .name("Sink");

    env.execute("Exec");


The MyStatsProcessor has keyed states defined as:

    state1 =
        getRuntimeContext().getState(new
ValueStateDescriptor<>("state1", Types.POJO(StateOne.class)));
    state2 =
        getRuntimeContext().getState(new
ValueStateDescriptor<>("state2", Types.POJO(StateTwo.class)));


So my question is how can I read any checkpoint state. I see this API
flink-state-processor-api.
Can I use the same here, if so how do I instantiate it:

StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader
savepoint = SavepointReader.read(env,
"s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to
use>);

Is this the correct way to use this API for reading a checkpoint ?

Please note I have also enabled:
state.backend.incremental: 'true'
state.backend.local-recovery: 'true'
state.backend.changelog.enabled: 'true'
state.backend.changelog.storage: filesystem
dstl.dfs.base-path: s3://{bucket}/changelog
dstl.dfs.compression.enabled: 'true'

Now after say I am able to create the reader how do I inspect a particular
keyed state.
I see a function called readKeyedState but I am unsure as to what uuid I
need to pass to read a particular state?
Would something like this work:

DataStream<KeyedState> keyedState = savepoint.readKeyedState(
OperatorIdentifier.forUid("Proc"), new ReaderFunction());
And now in my KeyedState class, I can access state1 and state2.

Would this work?

Please let me know if I am on the right track or this is something not
possible to read checkpointed states via any external application for
debugging.

Thanks
Sachin

Reply via email to