All in all Flink generates and stores uid hashes in state which is a
cryptic value.
Savepoint metadata can be read the following way:

var metadata = SavepointLoader.loadSavepointMetadata(path);

for (var operatorState: metadata.getOperatorStates()) {

  operatorState.getOperatorID();

}


The task is to go through all the cryptic UIDs and try to read it as your
data :)
This has been enhanced in 2.x, namely one can access the human readable
name and uid values too...

BR,
G


On Fri, Feb 21, 2025 at 11:20 AM Sachin Mittal <sjmit...@gmail.com> wrote:

> Hi,
> I think now I realize the importance of setting uid. Perhaps in Flink docs
> they should lay emphasis on always setting up uid to your operator, just
> like we set name.
> Unfortunately for my current pipeline which is in production, there is no
> uid set.
>
> Is there a way I can find these uids?
>
> Thanks
> Sachin
>
>
> On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com>
> wrote:
>
>> The UID must match in the Flink app `events.uid("my-uid")` and in the
>> reader app `forUid("my-uid")`.
>> In general it's a good practice to set uid in the Flink app for each and
>> every operator, otherwise Flink generates an almost random number for it.
>>
>> When you don't know the generated uid then no worry there are nifty
>> tricks to find it out.
>>
>> BR,
>> G
>>
>>
>> On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com> wrote:
>>
>>> Hi,
>>> I am working on Flink 1.19.1, so I guess I cannot use the SQL connector
>>> as that's to be released for 2.1.0
>>> If I directly try to use the flink-state-processor-api, my very first
>>> question is how do I know the uuid for each keyed state ?
>>> Is it the step name?
>>> as in
>>>
>>> events
>>> .keyBy(new MyKeySelector())
>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>>> .name("Proc");
>>>
>>> If now I want to access keyed states for MyStatsProcessor then I need
>>> to access it using:
>>>
>>> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new
>>> ReaderFunction());
>>>
>>> Is this the right way to access it ?
>>>
>>> Thanks
>>> Sachin
>>>
>>>
>>> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint
>>>> data[1]
>>>>
>>>>
>>>> [1]
>>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data
>>>>
>>>>
>>>> --
>>>>     Best!
>>>>     Xuyang
>>>>
>>>>
>>>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote:
>>>>
>>>> Hi,
>>>> So I have a flink application which stores state (RocksDB state
>>>> backend) in S3 with the following directory structure:
>>>>
>>>> s3://{bucket}/flink-checkpoints/    /{job-id}        |        + --shared/  
>>>>       + --taskowned/        + --chk-<num>/
>>>>
>>>>
>>>> I have my job pipeline defined like:
>>>>
>>>>     final DataStream<Data> events = env.fromSource(..., "Src");
>>>>
>>>>     SingleOutputStreamOperator<StatsData> statsData =
>>>>         events
>>>>             .keyBy(new MyKeySelector())
>>>>             .process(new MyStatsProcessor(), Types.POJO(StatsData.class))
>>>>             .name("Proc");
>>>>
>>>>     statsData
>>>>         .addSink(new MySink<>(...))
>>>>         .name("Sink");
>>>>
>>>>     env.execute("Exec");
>>>>
>>>>
>>>> The MyStatsProcessor has keyed states defined as:
>>>>
>>>>     state1 =
>>>>         getRuntimeContext().getState(new ValueStateDescriptor<>("state1", 
>>>> Types.POJO(StateOne.class)));
>>>>     state2 =
>>>>         getRuntimeContext().getState(new ValueStateDescriptor<>("state2", 
>>>> Types.POJO(StateTwo.class)));
>>>>
>>>>
>>>> So my question is how can I read any checkpoint state. I see this API 
>>>> flink-state-processor-api.
>>>> Can I use the same here, if so how do I instantiate it:
>>>>
>>>> StreamExecutionEnvironment env = 
>>>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader 
>>>> savepoint = SavepointReader.read(env, 
>>>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>);
>>>>
>>>> Is this the correct way to use this API for reading a checkpoint ?
>>>>
>>>> Please note I have also enabled:
>>>> state.backend.incremental: 'true'
>>>> state.backend.local-recovery: 'true'
>>>> state.backend.changelog.enabled: 'true'
>>>> state.backend.changelog.storage: filesystem
>>>> dstl.dfs.base-path: s3://{bucket}/changelog
>>>> dstl.dfs.compression.enabled: 'true'
>>>>
>>>> Now after say I am able to create the reader how do I inspect a
>>>> particular keyed state.
>>>> I see a function called readKeyedState but I am unsure as to what uuid
>>>> I need to pass to read a particular state?
>>>> Would something like this work:
>>>>
>>>> DataStream<KeyedState> keyedState = savepoint.readKeyedState(
>>>> OperatorIdentifier.forUid("Proc"), new ReaderFunction());
>>>> And now in my KeyedState class, I can access state1 and state2.
>>>>
>>>> Would this work?
>>>>
>>>> Please let me know if I am on the right track or this is something not
>>>> possible to read checkpointed states via any external application for
>>>> debugging.
>>>>
>>>> Thanks
>>>> Sachin
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>

Reply via email to