Hi Lorenzo,

I think the most convenient way is to modify the code of the state backend,
adding a k-v cache as you want.

Otherwise IIUC, there's no public interface to get keyContext. But well,
you may try something hacky. You may use the passed-in `Context` instance
in processElement, and leverage java reflection to get
the KeyedProcessOperator instance, where you can perform setCurrentKey().


Best,
Zakelly

On Wed, Feb 21, 2024 at 1:05 AM Lorenzo Nicora <lorenzo.nic...@gmail.com>
wrote:

> Thanks Zakelly,
>
> I'd need to do something similar, with a map containing my
> non-serializable "state", similar to the kvCache in FastTop1Fucntion.
>
> But I am not sure I understand how I can set the keyed state for a
> specific key, in snapshotState().
> FastTop1Function seems to rely on keyContext set via setKeyContext(). This
> method is not part of the API. I see it's set specifically for
> AbstractTopNFuction in StreamExecRank.
> How can I do something similar without modifying the Flink runtime?
>
> Lorenzo
>
>
> On Sun, 18 Feb 2024 at 03:42, Zakelly Lan <zakelly....@gmail.com> wrote:
>
>> Hi Lorenzo,
>>
>> It is not recommended to do this with the keyed state. However there is
>> an example in flink code (FastTop1Function#snapshotState) [1] of setting
>> keys when snapshotState().
>>
>> Hope this helps.
>>
>> [1]
>> https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/rank/FastTop1Function.java#L165
>>
>> Best,
>> Zakelly
>>
>> On Sat, Feb 17, 2024 at 1:48 AM Lorenzo Nicora <lorenzo.nic...@gmail.com>
>> wrote:
>>
>>> Hi Thias
>>>
>>> I considered CheckpointedFunction.
>>> In snapshotState() I would have to update the state of each key,
>>> extracting the in-memory "state" of each key and putting it in the state
>>> with state.update(...) .
>>> This must happen per key,
>>> But snapshotState() has no visibility of the keys. And I have no way of
>>> selectively accessing the state of a specific key to update it.
>>> Unless I am missing something
>>>
>>> Thanks
>>> Lorenzo
>>>
>>>
>>> On Fri, 16 Feb 2024 at 07:21, Schwalbe Matthias <
>>> matthias.schwa...@viseca.ch> wrote:
>>>
>>>> Good morning Lorenzo,
>>>>
>>>>
>>>>
>>>> You may want to implement
>>>> org.apache.flink.streaming.api.checkpoint.CheckpointedFunction interface in
>>>> your KeyedProcessFunction.
>>>>
>>>> Btw. By the time initializeState(…) is called, the state backend is
>>>> fully initialized and can be read and written to (which is not the case for
>>>> when the open(…) function is called.
>>>>
>>>> In initializeState(…) you also get access to state of different
>>>> operator key.
>>>>
>>>> SnapshotState(…) is called as part of the (each) checkpoint in order to
>>>> store data.
>>>>
>>>>
>>>>
>>>> Sincere greetings
>>>>
>>>>
>>>>
>>>> Thias
>>>>
>>>>
>>>>
>>>> *From:* Lorenzo Nicora <lorenzo.nic...@gmail.com>
>>>> *Sent:* Thursday, February 15, 2024 7:50 PM
>>>> *To:* Flink User Group <user@flink.apache.org>
>>>> *Subject:* Preparing keyed state before snapshot
>>>>
>>>>
>>>>
>>>> Hello everyone,
>>>>
>>>>
>>>>
>>>> I have a convoluted problem.
>>>>
>>>>
>>>>
>>>> I am implementing a KeyedProcessFunction that keeps some
>>>> non-serializable "state" in memory, in a transient Map (key = stream key,
>>>> value = the non-serializable "state").
>>>>
>>>>
>>>>
>>>> I can extract a serializable representation to put in Flink state, and
>>>> I can load my in-memory "state" from the Flink state. But these operations
>>>> are expensive.
>>>>
>>>>
>>>>
>>>> Initializing the in-memory "state" is relatively easy. I do it lazily,
>>>> in processElement(), on the first record for the key.
>>>>
>>>>
>>>>
>>>> The problem is saving the in-memory "state" to Flink state.
>>>>
>>>> I need to do it only before the state snapshot. But
>>>> KeyedProcessFunction has no entrypoint called before the state snapshot.
>>>>
>>>> I cannot use CheckpointedFunction.snapshotState(), because it does not
>>>> work for keyed state.
>>>>
>>>>
>>>>
>>>> Any suggestions?
>>>>
>>>>
>>>>
>>>> Note that I cannot use operator state nor a broadcast state.
>>>>
>>>> Processing is keyed. Every processed record modifies the in-memory
>>>> "state" of that key. If the job rescale, the state of the key must follow
>>>> the partition.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Regards
>>>>
>>>> Lorenzo
>>>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>>>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>>>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>>>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>>>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>>>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>>>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>>>> dieser Informationen ist streng verboten.
>>>>
>>>> This message is intended only for the named recipient and may contain
>>>> confidential or privileged information. As the confidentiality of email
>>>> communication cannot be guaranteed, we do not accept any responsibility for
>>>> the confidentiality and the intactness of this message. If you have
>>>> received it in error, please advise the sender by return e-mail and delete
>>>> this message and any attachments. Any unauthorised use or dissemination of
>>>> this information is strictly prohibited.
>>>>
>>>

Reply via email to