All in all Flink generates and stores uid hashes in state which is a cryptic value. Savepoint metadata can be read the following way:
var metadata = SavepointLoader.loadSavepointMetadata(path); for (var operatorState: metadata.getOperatorStates()) { operatorState.getOperatorID(); } The task is to go through all the cryptic UIDs and try to read it as your data :) This has been enhanced in 2.x, namely one can access the human readable name and uid values too... BR, G On Fri, Feb 21, 2025 at 11:20 AM Sachin Mittal <sjmit...@gmail.com> wrote: > Hi, > I think now I realize the importance of setting uid. Perhaps in Flink docs > they should lay emphasis on always setting up uid to your operator, just > like we set name. > Unfortunately for my current pipeline which is in production, there is no > uid set. > > Is there a way I can find these uids? > > Thanks > Sachin > > > On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com> > wrote: > >> The UID must match in the Flink app `events.uid("my-uid")` and in the >> reader app `forUid("my-uid")`. >> In general it's a good practice to set uid in the Flink app for each and >> every operator, otherwise Flink generates an almost random number for it. >> >> When you don't know the generated uid then no worry there are nifty >> tricks to find it out. >> >> BR, >> G >> >> >> On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com> wrote: >> >>> Hi, >>> I am working on Flink 1.19.1, so I guess I cannot use the SQL connector >>> as that's to be released for 2.1.0 >>> If I directly try to use the flink-state-processor-api, my very first >>> question is how do I know the uuid for each keyed state ? >>> Is it the step name? >>> as in >>> >>> events >>> .keyBy(new MyKeySelector()) >>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >>> .name("Proc"); >>> >>> If now I want to access keyed states for MyStatsProcessor then I need >>> to access it using: >>> >>> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new >>> ReaderFunction()); >>> >>> Is this the right way to access it ? >>> >>> Thanks >>> Sachin >>> >>> >>> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote: >>> >>>> Hi, >>>> >>>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint >>>> data[1] >>>> >>>> >>>> [1] >>>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data >>>> >>>> >>>> -- >>>> Best! >>>> Xuyang >>>> >>>> >>>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote: >>>> >>>> Hi, >>>> So I have a flink application which stores state (RocksDB state >>>> backend) in S3 with the following directory structure: >>>> >>>> s3://{bucket}/flink-checkpoints/ /{job-id} | + --shared/ >>>> + --taskowned/ + --chk-<num>/ >>>> >>>> >>>> I have my job pipeline defined like: >>>> >>>> final DataStream<Data> events = env.fromSource(..., "Src"); >>>> >>>> SingleOutputStreamOperator<StatsData> statsData = >>>> events >>>> .keyBy(new MyKeySelector()) >>>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >>>> .name("Proc"); >>>> >>>> statsData >>>> .addSink(new MySink<>(...)) >>>> .name("Sink"); >>>> >>>> env.execute("Exec"); >>>> >>>> >>>> The MyStatsProcessor has keyed states defined as: >>>> >>>> state1 = >>>> getRuntimeContext().getState(new ValueStateDescriptor<>("state1", >>>> Types.POJO(StateOne.class))); >>>> state2 = >>>> getRuntimeContext().getState(new ValueStateDescriptor<>("state2", >>>> Types.POJO(StateTwo.class))); >>>> >>>> >>>> So my question is how can I read any checkpoint state. I see this API >>>> flink-state-processor-api. >>>> Can I use the same here, if so how do I instantiate it: >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader >>>> savepoint = SavepointReader.read(env, >>>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>); >>>> >>>> Is this the correct way to use this API for reading a checkpoint ? >>>> >>>> Please note I have also enabled: >>>> state.backend.incremental: 'true' >>>> state.backend.local-recovery: 'true' >>>> state.backend.changelog.enabled: 'true' >>>> state.backend.changelog.storage: filesystem >>>> dstl.dfs.base-path: s3://{bucket}/changelog >>>> dstl.dfs.compression.enabled: 'true' >>>> >>>> Now after say I am able to create the reader how do I inspect a >>>> particular keyed state. >>>> I see a function called readKeyedState but I am unsure as to what uuid >>>> I need to pass to read a particular state? >>>> Would something like this work: >>>> >>>> DataStream<KeyedState> keyedState = savepoint.readKeyedState( >>>> OperatorIdentifier.forUid("Proc"), new ReaderFunction()); >>>> And now in my KeyedState class, I can access state1 and state2. >>>> >>>> Would this work? >>>> >>>> Please let me know if I am on the right track or this is something not >>>> possible to read checkpointed states via any external application for >>>> debugging. >>>> >>>> Thanks >>>> Sachin >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>>