Good morning all, Let me loop myself in …
1. Another even more convenient way to enable cache is to actually configure/assign RocksDB to use more off-heap memory for cache, you also might consider enabling bloom filters (all depends on how large you key-space is (thousands/millions/billions/…) Within the technological limits, RocksDB is hard to top, if keeping all data in memory is no option, this is the path I usually follow. 1. The other question on how to control the current-key from within snapshot state: you can acquire a pointer to the underlying state backend e.g. from within open() and the get hold of a pointer of the specific state primitive, and set the current key directly. In order to find out how to do that, put a breakpoint in debugger and walk up a couple of call stack frames, and/or walk into the value setters and model after how it is done there. Mind though, to restore the current key, if you happen to change it to another key. Doing this e.g. in initializeState() is time-insensitive, because this happens outside the ‘hot’ code paths. 1. If the number of elements to store is small, you can store it in operator state and initialize your local structure in initializeState() from it, you probably would want to keep the data in serialized form in operator state, since you mentioned, serialization would be expensive. 2. There is another API (which I don’t remember the name of) that allows you to store operator state as BLOB directly if that would be a doable option for you. Sincere greetings Thias From: Zakelly Lan <zakelly....@gmail.com> Sent: Wednesday, February 21, 2024 8:04 AM To: Lorenzo Nicora <lorenzo.nic...@gmail.com> Cc: Flink User Group <user@flink.apache.org> Subject: Re: Preparing keyed state before snapshot ⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠ Hi Lorenzo, I think the most convenient way is to modify the code of the state backend, adding a k-v cache as you want. Otherwise IIUC, there's no public interface to get keyContext. But well, you may try something hacky. You may use the passed-in `Context` instance in processElement, and leverage java reflection to get the KeyedProcessOperator instance, where you can perform setCurrentKey(). Best, Zakelly On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora <lorenzo.nic...@gmail.com<mailto:lorenzo.nic...@gmail.com>> wrote: Thanks Zakelly, I'd need to do something similar, with a map containing my non-serializable "state", similar to the kvCache in FastTop1Fucntion. But I am not sure I understand how I can set the keyed state for a specific key, in snapshotState(). FastTop1Function seems to rely on keyContext set via setKeyContext(). This method is not part of the API. I see it's set specifically for AbstractTopNFuction in StreamExecRank. How can I do something similar without modifying the Flink runtime? Lorenzo On Sun, 18 Feb 2024 at 03:42, Zakelly Lan <zakelly....@gmail.com<mailto:zakelly....@gmail.com>> wrote: Hi Lorenzo, It is not recommended to do this with the keyed state. However there is an example in flink code (FastTop1Function#snapshotState) [1] of setting keys when snapshotState(). Hope this helps. [1] https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165 Best, Zakelly On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora <lorenzo.nic...@gmail.com<mailto:lorenzo.nic...@gmail.com>> wrote: Hi Thias I considered CheckpointedFunction. In snapshotState() I would have to update the state of each key, extracting the in-memory "state" of each key and putting it in the state with state.update(...) . This must happen per key, But snapshotState() has no visibility of the keys. And I have no way of selectively accessing the state of a specific key to update it. Unless I am missing something Thanks Lorenzo On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote: Good morning Lorenzo, You may want to implement org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in your KeyedProcessFunction. Btw. By the time initializeState(…) is called, the state backend is fully initialized and can be read and written to (which is not the case for when the open(…) function is called. In initializeState(…) you also get access to state of different operator key. SnapshotState(…) is called as part of the (each) checkpoint in order to store data. Sincere greetings Thias From: Lorenzo Nicora <lorenzo.nic...@gmail.com<mailto:lorenzo.nic...@gmail.com>> Sent: Thursday, February 15, 2024 7:50 PM To: Flink User Group <user@flink.apache.org<mailto:user@flink.apache.org>> Subject: Preparing keyed state before snapshot Hello everyone, I have a convoluted problem. I am implementing a KeyedProcessFunction that keeps some non-serializable "state" in memory, in a transient Map (key = stream key, value = the non-serializable "state"). I can extract a serializable representation to put in Flink state, and I can load my in-memory "state" from the Flink state. But these operations are expensive. Initializing the in-memory "state" is relatively easy. I do it lazily, in processElement(), on the first record for the key. The problem is saving the in-memory "state" to Flink state. I need to do it only before the state snapshot. But KeyedProcessFunction has no entrypoint called before the state snapshot. I cannot use CheckpointedFunction.snapshotState(), because it does not work for keyed state. Any suggestions? Note that I cannot use operator state nor a broadcast state. Processing is keyed. Every processed record modifies the in-memory "state" of that key. If the job rescale, the state of the key must follow the partition. Regards Lorenzo Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited. Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng verboten. This message is intended only for the named recipient and may contain confidential or privileged information. As the confidentiality of email communication cannot be guaranteed, we do not accept any responsibility for the confidentiality and the intactness of this message. If you have received it in error, please advise the sender by return e-mail and delete this message and any attachments. Any unauthorised use or dissemination of this information is strictly prohibited.