Hi Lorenzo, I think the most convenient way is to modify the code of the state backend, adding a k-v cache as you want.
Otherwise IIUC, there's no public interface to get keyContext. But well, you may try something hacky. You may use the passed-in `Context` instance in processElement, and leverage java reflection to get the KeyedProcessOperator instance, where you can perform setCurrentKey(). Best, Zakelly On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora <lorenzo.nic...@gmail.com> wrote: > Thanks Zakelly, > > I'd need to do something similar, with a map containing my > non-serializable "state", similar to the kvCache in FastTop1Fucntion. > > But I am not sure I understand how I can set the keyed state for a > specific key, in snapshotState(). > FastTop1Function seems to rely on keyContext set via setKeyContext(). This > method is not part of the API. I see it's set specifically for > AbstractTopNFuction in StreamExecRank. > How can I do something similar without modifying the Flink runtime? > > Lorenzo > > > On Sun, 18 Feb 2024 at 03:42, Zakelly Lan <zakelly....@gmail.com> wrote: > >> Hi Lorenzo, >> >> It is not recommended to do this with the keyed state. However there is >> an example in flink code (FastTop1Function#snapshotState) [1] of setting >> keys when snapshotState(). >> >> Hope this helps. >> >> [1] >> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165 >> >> Best, >> Zakelly >> >> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora <lorenzo.nic...@gmail.com> >> wrote: >> >>> Hi Thias >>> >>> I considered CheckpointedFunction. >>> In snapshotState() I would have to update the state of each key, >>> extracting the in-memory "state" of each key and putting it in the state >>> with state.update(...) . >>> This must happen per key, >>> But snapshotState() has no visibility of the keys. And I have no way of >>> selectively accessing the state of a specific key to update it. >>> Unless I am missing something >>> >>> Thanks >>> Lorenzo >>> >>> >>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias < >>> matthias.schwa...@viseca.ch> wrote: >>> >>>> Good morning Lorenzo, >>>> >>>> >>>> >>>> You may want to implement >>>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in >>>> your KeyedProcessFunction. >>>> >>>> Btw. By the time initializeState(…) is called, the state backend is >>>> fully initialized and can be read and written to (which is not the case for >>>> when the open(…) function is called. >>>> >>>> In initializeState(…) you also get access to state of different >>>> operator key. >>>> >>>> SnapshotState(…) is called as part of the (each) checkpoint in order to >>>> store data. >>>> >>>> >>>> >>>> Sincere greetings >>>> >>>> >>>> >>>> Thias >>>> >>>> >>>> >>>> *From:* Lorenzo Nicora <lorenzo.nic...@gmail.com> >>>> *Sent:* Thursday, February 15, 2024 7:50 PM >>>> *To:* Flink User Group <user@flink.apache.org> >>>> *Subject:* Preparing keyed state before snapshot >>>> >>>> >>>> >>>> Hello everyone, >>>> >>>> >>>> >>>> I have a convoluted problem. >>>> >>>> >>>> >>>> I am implementing a KeyedProcessFunction that keeps some >>>> non-serializable "state" in memory, in a transient Map (key = stream key, >>>> value = the non-serializable "state"). >>>> >>>> >>>> >>>> I can extract a serializable representation to put in Flink state, and >>>> I can load my in-memory "state" from the Flink state. But these operations >>>> are expensive. >>>> >>>> >>>> >>>> Initializing the in-memory "state" is relatively easy. I do it lazily, >>>> in processElement(), on the first record for the key. >>>> >>>> >>>> >>>> The problem is saving the in-memory "state" to Flink state. >>>> >>>> I need to do it only before the state snapshot. But >>>> KeyedProcessFunction has no entrypoint called before the state snapshot. >>>> >>>> I cannot use CheckpointedFunction.snapshotState(), because it does not >>>> work for keyed state. >>>> >>>> >>>> >>>> Any suggestions? >>>> >>>> >>>> >>>> Note that I cannot use operator state nor a broadcast state. >>>> >>>> Processing is keyed. Every processed record modifies the in-memory >>>> "state" of that key. If the job rescale, the state of the key must follow >>>> the partition. >>>> >>>> >>>> >>>> >>>> >>>> Regards >>>> >>>> Lorenzo >>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und >>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die >>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, >>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und >>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir >>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie >>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung >>>> dieser Informationen ist streng verboten. >>>> >>>> This message is intended only for the named recipient and may contain >>>> confidential or privileged information. As the confidentiality of email >>>> communication cannot be guaranteed, we do not accept any responsibility for >>>> the confidentiality and the intactness of this message. If you have >>>> received it in error, please advise the sender by return e-mail and delete >>>> this message and any attachments. Any unauthorised use or dissemination of >>>> this information is strictly prohibited. >>>> >>>