Hi Lorenzo, It is not recommended to do this with the keyed state. However there is an example in flink code (FastTop1Function#snapshotState) [1] of setting keys when snapshotState().
Hope this helps. [1] https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165 Best, Zakelly On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora <lorenzo.nic...@gmail.com> wrote: > Hi Thias > > I considered CheckpointedFunction. > In snapshotState() I would have to update the state of each key, > extracting the in-memory "state" of each key and putting it in the state > with state.update(...) . > This must happen per key, > But snapshotState() has no visibility of the keys. And I have no way of > selectively accessing the state of a specific key to update it. > Unless I am missing something > > Thanks > Lorenzo > > > On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias < > matthias.schwa...@viseca.ch> wrote: > >> Good morning Lorenzo, >> >> >> >> You may want to implement >> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in >> your KeyedProcessFunction. >> >> Btw. By the time initializeState(…) is called, the state backend is fully >> initialized and can be read and written to (which is not the case for when >> the open(…) function is called. >> >> In initializeState(…) you also get access to state of different operator >> key. >> >> SnapshotState(…) is called as part of the (each) checkpoint in order to >> store data. >> >> >> >> Sincere greetings >> >> >> >> Thias >> >> >> >> *From:* Lorenzo Nicora <lorenzo.nic...@gmail.com> >> *Sent:* Thursday, February 15, 2024 7:50 PM >> *To:* Flink User Group <user@flink.apache.org> >> *Subject:* Preparing keyed state before snapshot >> >> >> >> Hello everyone, >> >> >> >> I have a convoluted problem. >> >> >> >> I am implementing a KeyedProcessFunction that keeps some non-serializable >> "state" in memory, in a transient Map (key = stream key, value = the >> non-serializable "state"). >> >> >> >> I can extract a serializable representation to put in Flink state, and I >> can load my in-memory "state" from the Flink state. But these operations >> are expensive. >> >> >> >> Initializing the in-memory "state" is relatively easy. I do it lazily, in >> processElement(), on the first record for the key. >> >> >> >> The problem is saving the in-memory "state" to Flink state. >> >> I need to do it only before the state snapshot. But KeyedProcessFunction >> has no entrypoint called before the state snapshot. >> >> I cannot use CheckpointedFunction.snapshotState(), because it does not >> work for keyed state. >> >> >> >> Any suggestions? >> >> >> >> Note that I cannot use operator state nor a broadcast state. >> >> Processing is keyed. Every processed record modifies the in-memory >> "state" of that key. If the job rescale, the state of the key must follow >> the partition. >> >> >> >> >> >> Regards >> >> Lorenzo >> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >> dieser Informationen ist streng verboten. >> >> This message is intended only for the named recipient and may contain >> confidential or privileged information. As the confidentiality of email >> communication cannot be guaranteed, we do not accept any responsibility for >> the confidentiality and the intactness of this message. If you have >> received it in error, please advise the sender by return e-mail and delete >> this message and any attachments. Any unauthorised use or dissemination of >> this information is strictly prohibited. >> >