Hi Krzysztof
Can you give the original code in initializeState method and the corresponding exception stack trace? It looks a little interesting. Best, Ron Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> 于2023年9月9日周六 07:12写道: > My apologies Mattthias, > you are right. The issue was that I was trying to access state value from > open/init methods where there was not key context. > > Regarding the CheckpointedFunction interface. From javadoc example and > description I got an impression that this can be used to access keyed state > on keyed stream. > Quote "The OperatorStateStore and KeyedStateStore give access to the data > structures in which state should be stored for Flink to transparently > manage and checkpoint it," > > The way how I believe I should do is this: > public void initializeState(FunctionInitializationContext context) > throws Exception { > keyCounterState = context.getKeyedStateStore() > .getState(new ValueStateDescriptor<>("keyCounter", Integer.class)); > } > > So use `context.getKeyedStateStore()` to which I should have an access > since my stream is keyed. > I have updated my code [1] and run the toy job, that now works. I think > that did the trick. WDYT? > > [1] > https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/ae449b6b4b054f64f0828343456d814c3aaba962/src/main/java/org/example/KeyedCounter2.java#L19C5-L19C51 > > czw., 7 wrz 2023 o 10:40 Schwalbe Matthias <matthias.schwa...@viseca.ch> > napisał(a): > >> Hi Krzysztof again, >> >> >> >> Just for clarity … your sample code [1] tries to count the number of >> events per key. >> >> Assuming this is your intention? >> >> >> >> Anyway your previous implementation initialized the keyed state >> keyCounterState in the open function that is the right place to do this, >> >> you just wouldn’t want to store values in the state from within the >> open() function. >> >> >> >> InitializeState() and snapshotState() are mainly used to initialize >> operator state, not keyed state … refer to the relevant documentation. >> >> >> >> >> >> Thias >> >> >> >> >> >> *From:* Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> >> *Sent:* Donnerstag, 7. September 2023 09:59 >> *To:* user <user@flink.apache.org> >> *Subject:* using CheckpointedFunction on a keyed state >> >> >> >> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ >> >> >> >> Hi, >> I have a toy Flink job [1] where I have a KeyedProcessFunction >> implementation [2] that also implements the CheckpointedFunction. My stream >> definition has .keyBy(...) call as you can see in [1]. >> >> However when I'm trying to run this toy job I'm getting an exception >> from CheckpointedFunction::initializeState method that says: >> "Keyed state can only be used on a 'keyed stream', i.e., after a >> 'keyBy()' operation." >> >> I got an impression from the docs that CheckpointedFunction can be used >> on a keyed stream and CheckpointedFunction::initializeState is for >> initialziing the state object. >> Are my assumptions wrong? Is initializeState onlty to set an initial >> value of a state per key and state object must be initialized in open() >> method? >> >> >> Thanks, >> Krzysztof >> >> [1] >> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/DataStreamJob.java >> >> [2] >> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/KeyCounter.java >> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >> dieser Informationen ist streng verboten. >> >> This message is intended only for the named recipient and may contain >> confidential or privileged information. As the confidentiality of email >> communication cannot be guaranteed, we do not accept any responsibility for >> the confidentiality and the intactness of this message. If you have >> received it in error, please advise the sender by return e-mail and delete >> this message and any attachments. Any unauthorised use or dissemination of >> this information is strictly prohibited. >> >