Good morning all,

Let me loop myself in …


  1.  Another even more convenient way to enable cache is to actually 
configure/assign RocksDB to use more off-heap memory for cache, you also might 
consider enabling bloom filters  (all depends on how large you key-space is 
(thousands/millions/billions/…)
Within the technological limits, RocksDB is hard to top, if keeping all data in 
memory is no option, this is the path I usually follow.

  1.  The other question on how to control the current-key from within snapshot 
state: you can acquire a pointer to the underlying state backend e.g. from 
within open() and the get hold of a pointer of the specific state primitive, 
and set the current key directly.
In order to find out how to do that, put a breakpoint in debugger and walk up a 
couple of call stack frames, and/or walk into the value setters and model after 
how it is done there.
Mind though, to restore the current key, if you happen to change it to another 
key.
Doing this e.g. in initializeState() is time-insensitive, because this happens 
outside the ‘hot’ code paths.

  1.  If the number of elements to store is small, you can store it in operator 
state and initialize your local structure in initializeState() from it, you 
probably would want to keep the data in serialized form in operator state, 
since you mentioned, serialization would be expensive.
  2.  There is another API (which I don’t remember the name of) that allows you 
to store operator state as BLOB directly if that would be a doable option for 
you.

Sincere greetings

Thias




From: Zakelly Lan <zakelly....@gmail.com>
Sent: Wednesday, February 21, 2024 8:04 AM
To: Lorenzo Nicora <lorenzo.nic...@gmail.com>
Cc: Flink User Group <user@flink.apache.org>
Subject: Re: Preparing keyed state before snapshot

⚠EXTERNAL MESSAGE – CAUTION: Think Before You Click ⚠


Hi Lorenzo,

I think the most convenient way is to modify the code of the state backend, 
adding a k-v cache as you want.

Otherwise IIUC, there's no public interface to get keyContext. But well, you 
may try something hacky. You may use the passed-in `Context` instance in 
processElement, and leverage java reflection to get the KeyedProcessOperator 
instance, where you can perform setCurrentKey().


Best,
Zakelly

On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora 
<lorenzo.nic...@gmail.com<mailto:lorenzo.nic...@gmail.com>> wrote:
Thanks Zakelly,

I'd need to do something similar, with a map containing my non-serializable 
"state", similar to the kvCache in FastTop1Fucntion.

But I am not sure I understand how I can set the keyed state for a specific 
key, in snapshotState().
FastTop1Function seems to rely on keyContext set via setKeyContext(). This 
method is not part of the API. I see it's set specifically for 
AbstractTopNFuction in StreamExecRank.
How can I do something similar without modifying the Flink runtime?

Lorenzo


On Sun, 18 Feb 2024 at 03:42, Zakelly Lan 
<zakelly....@gmail.com<mailto:zakelly....@gmail.com>> wrote:
Hi Lorenzo,

It is not recommended to do this with the keyed state. However there is an 
example in flink code (FastTop1Function#snapshotState) [1] of setting keys when 
snapshotState().

Hope this helps.

[1] 
https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165

Best,
Zakelly

On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora 
<lorenzo.nic...@gmail.com<mailto:lorenzo.nic...@gmail.com>> wrote:
Hi Thias

I considered CheckpointedFunction.
In snapshotState() I would have to update the state of each key, extracting the 
in-memory "state" of each key and putting it in the state with 
state.update(...) .
This must happen per key,
But snapshotState() has no visibility of the keys. And I have no way of 
selectively accessing the state of a specific key to update it.
Unless I am missing something

Thanks
Lorenzo


On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias 
<matthias.schwa...@viseca.ch<mailto:matthias.schwa...@viseca.ch>> wrote:
Good morning Lorenzo,

You may want to implement 
org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in 
your KeyedProcessFunction.
Btw. By the time initializeState(…) is called, the state backend is fully 
initialized and can be read and written to (which is not the case for when the 
open(…) function is called.
In initializeState(…) you also get access to state of different operator key.
SnapshotState(…) is called as part of the (each) checkpoint in order to store 
data.

Sincere greetings

Thias

From: Lorenzo Nicora <lorenzo.nic...@gmail.com<mailto:lorenzo.nic...@gmail.com>>
Sent: Thursday, February 15, 2024 7:50 PM
To: Flink User Group <user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Preparing keyed state before snapshot

Hello everyone,

I have a convoluted problem.

I am implementing a KeyedProcessFunction that keeps some non-serializable 
"state" in memory, in a transient Map (key = stream key, value = the 
non-serializable "state").

I can extract a serializable representation to put in Flink state, and I can 
load my in-memory "state" from the Flink state. But these operations are 
expensive.

Initializing the in-memory "state" is relatively easy. I do it lazily, in 
processElement(), on the first record for the key.

The problem is saving the in-memory "state" to Flink state.
I need to do it only before the state snapshot. But KeyedProcessFunction has no 
entrypoint called before the state snapshot.
I cannot use CheckpointedFunction.snapshotState(), because it does not work for 
keyed state.

Any suggestions?

Note that I cannot use operator state nor a broadcast state.
Processing is keyed. Every processed record modifies the in-memory "state" of 
that key. If the job rescale, the state of the key must follow the partition.


Regards
Lorenzo
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.
Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und beinhaltet 
unter Umständen vertrauliche Mitteilungen. Da die Vertraulichkeit von 
e-Mail-Nachrichten nicht gewährleistet werden kann, übernehmen wir keine 
Haftung für die Gewährung der Vertraulichkeit und Unversehrtheit dieser 
Mitteilung. Bei irrtümlicher Zustellung bitten wir Sie um Benachrichtigung per 
e-Mail und um Löschung dieser Nachricht sowie eventueller Anhänge. Jegliche 
unberechtigte Verwendung oder Verbreitung dieser Informationen ist streng 
verboten.

This message is intended only for the named recipient and may contain 
confidential or privileged information. As the confidentiality of email 
communication cannot be guaranteed, we do not accept any responsibility for the 
confidentiality and the intactness of this message. If you have received it in 
error, please advise the sender by return e-mail and delete this message and 
any attachments. Any unauthorised use or dissemination of this information is 
strictly prohibited.

Reply via email to