Hi Lorenzo,

It is not recommended to do this with the keyed state. However there is an
example in flink code (FastTop1Function#snapshotState) [1] of setting keys
when snapshotState().

Hope this helps.

[1]
https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165

Best,
Zakelly

On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora <lorenzo.nic...@gmail.com>
wrote:

> Hi Thias
>
> I considered CheckpointedFunction.
> In snapshotState() I would have to update the state of each key,
> extracting the in-memory "state" of each key and putting it in the state
> with state.update(...) .
> This must happen per key,
> But snapshotState() has no visibility of the keys. And I have no way of
> selectively accessing the state of a specific key to update it.
> Unless I am missing something
>
> Thanks
> Lorenzo
>
>
> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
> matthias.schwa...@viseca.ch> wrote:
>
>> Good morning Lorenzo,
>>
>>
>>
>> You may want to implement
>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>> your KeyedProcessFunction.
>>
>> Btw. By the time initializeState(…) is called, the state backend is fully
>> initialized and can be read and written to (which is not the case for when
>> the open(…) function is called.
>>
>> In initializeState(…) you also get access to state of different operator
>> key.
>>
>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>> store data.
>>
>>
>>
>> Sincere greetings
>>
>>
>>
>> Thias
>>
>>
>>
>> *From:* Lorenzo Nicora <lorenzo.nic...@gmail.com>
>> *Sent:* Thursday, February 15, 2024 7:50 PM
>> *To:* Flink User Group <user@flink.apache.org>
>> *Subject:* Preparing keyed state before snapshot
>>
>>
>>
>> Hello everyone,
>>
>>
>>
>> I have a convoluted problem.
>>
>>
>>
>> I am implementing a KeyedProcessFunction that keeps some non-serializable
>> "state" in memory, in a transient Map (key = stream key, value = the
>> non-serializable "state").
>>
>>
>>
>> I can extract a serializable representation to put in Flink state, and I
>> can load my in-memory "state" from the Flink state. But these operations
>> are expensive.
>>
>>
>>
>> Initializing the in-memory "state" is relatively easy. I do it lazily, in
>> processElement(), on the first record for the key.
>>
>>
>>
>> The problem is saving the in-memory "state" to Flink state.
>>
>> I need to do it only before the state snapshot. But KeyedProcessFunction
>> has no entrypoint called before the state snapshot.
>>
>> I cannot use CheckpointedFunction.snapshotState(), because it does not
>> work for keyed state.
>>
>>
>>
>> Any suggestions?
>>
>>
>>
>> Note that I cannot use operator state nor a broadcast state.
>>
>> Processing is keyed. Every processed record modifies the in-memory
>> "state" of that key. If the job rescale, the state of the key must follow
>> the partition.
>>
>>
>>
>>
>>
>> Regards
>>
>> Lorenzo
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>

Reply via email to