Hi, I think now I realize the importance of setting uid. Perhaps in Flink docs they should lay emphasis on always setting up uid to your operator, just like we set name. Unfortunately for my current pipeline which is in production, there is no uid set.
Is there a way I can find these uids? Thanks Sachin On Fri, Feb 21, 2025 at 3:37 PM Gabor Somogyi <gabor.g.somo...@gmail.com> wrote: > The UID must match in the Flink app `events.uid("my-uid")` and in the > reader app `forUid("my-uid")`. > In general it's a good practice to set uid in the Flink app for each and > every operator, otherwise Flink generates an almost random number for it. > > When you don't know the generated uid then no worry there are nifty tricks > to find it out. > > BR, > G > > > On Fri, Feb 21, 2025 at 8:40 AM Sachin Mittal <sjmit...@gmail.com> wrote: > >> Hi, >> I am working on Flink 1.19.1, so I guess I cannot use the SQL connector >> as that's to be released for 2.1.0 >> If I directly try to use the flink-state-processor-api, my very first >> question is how do I know the uuid for each keyed state ? >> Is it the step name? >> as in >> >> events >> .keyBy(new MyKeySelector()) >> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >> .name("Proc"); >> >> If now I want to access keyed states for MyStatsProcessor then I need to >> access it using: >> >> savepoint.readKeyedState(OperatorIdentifier.forUid("Proc"), new >> ReaderFunction()); >> >> Is this the right way to access it ? >> >> Thanks >> Sachin >> >> >> On Fri, Feb 21, 2025 at 11:09 AM Xuyang <xyzhong...@163.com> wrote: >> >>> Hi, >>> >>> FYI that hopes helpful: FLIP-496: SQL connector for keyed savepoint >>> data[1] >>> >>> >>> [1] >>> https://cwiki.apache.org/confluence/display/FLINK/FLIP-496%3A+SQL+connector+for+keyed+savepoint+data >>> >>> >>> -- >>> Best! >>> Xuyang >>> >>> >>> At 2025-02-21 12:11:02, "Sachin Mittal" <sjmit...@gmail.com> wrote: >>> >>> Hi, >>> So I have a flink application which stores state (RocksDB state backend) >>> in S3 with the following directory structure: >>> >>> s3://{bucket}/flink-checkpoints/ /{job-id} | + --shared/ >>> + --taskowned/ + --chk-<num>/ >>> >>> >>> I have my job pipeline defined like: >>> >>> final DataStream<Data> events = env.fromSource(..., "Src"); >>> >>> SingleOutputStreamOperator<StatsData> statsData = >>> events >>> .keyBy(new MyKeySelector()) >>> .process(new MyStatsProcessor(), Types.POJO(StatsData.class)) >>> .name("Proc"); >>> >>> statsData >>> .addSink(new MySink<>(...)) >>> .name("Sink"); >>> >>> env.execute("Exec"); >>> >>> >>> The MyStatsProcessor has keyed states defined as: >>> >>> state1 = >>> getRuntimeContext().getState(new ValueStateDescriptor<>("state1", >>> Types.POJO(StateOne.class))); >>> state2 = >>> getRuntimeContext().getState(new ValueStateDescriptor<>("state2", >>> Types.POJO(StateTwo.class))); >>> >>> >>> So my question is how can I read any checkpoint state. I see this API >>> flink-state-processor-api. >>> Can I use the same here, if so how do I instantiate it: >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment();SavepointReader >>> savepoint = SavepointReader.read(env, >>> "s3://{bucket}/flink-checkpoints/{job-id}", <which state backend to use>); >>> >>> Is this the correct way to use this API for reading a checkpoint ? >>> >>> Please note I have also enabled: >>> state.backend.incremental: 'true' >>> state.backend.local-recovery: 'true' >>> state.backend.changelog.enabled: 'true' >>> state.backend.changelog.storage: filesystem >>> dstl.dfs.base-path: s3://{bucket}/changelog >>> dstl.dfs.compression.enabled: 'true' >>> >>> Now after say I am able to create the reader how do I inspect a >>> particular keyed state. >>> I see a function called readKeyedState but I am unsure as to what uuid I >>> need to pass to read a particular state? >>> Would something like this work: >>> >>> DataStream<KeyedState> keyedState = savepoint.readKeyedState( >>> OperatorIdentifier.forUid("Proc"), new ReaderFunction()); >>> And now in my KeyedState class, I can access state1 and state2. >>> >>> Would this work? >>> >>> Please let me know if I am on the right track or this is something not >>> possible to read checkpointed states via any external application for >>> debugging. >>> >>> Thanks >>> Sachin >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>>