My apologies Mattthias,
you are right. The issue was that I was trying to access state value from
open/init methods where there was not key context.

Regarding the CheckpointedFunction interface. From javadoc example and
description I got an impression that this can be used to access keyed state
on keyed stream.
Quote "The OperatorStateStore and KeyedStateStore give access to the data
structures in which state should be stored for Flink to transparently
manage and checkpoint it,"

The way how I believe I should do is this:
  public void initializeState(FunctionInitializationContext context) throws
Exception {
    keyCounterState = context.getKeyedStateStore()
        .getState(new ValueStateDescriptor<>("keyCounter", Integer.class));
  }

So use `context.getKeyedStateStore()` to which I should have an access
since my stream is keyed.
I have updated my code [1] and run the toy job, that now works. I think
that did the trick. WDYT?

[1]
https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/ae449b6b4b054f64f0828343456d814c3aaba962/src/main/java/org/example/KeyedCounter2.java#L19C5-L19C51

czw., 7 wrz 2023 o 10:40 Schwalbe Matthias <matthias.schwa...@viseca.ch>
napisał(a):

> Hi Krzysztof again,
>
>
>
> Just for clarity … your sample code [1] tries to count the number of
> events per key.
>
> Assuming this is your intention?
>
>
>
> Anyway your previous implementation initialized the keyed state
> keyCounterState in the open function that is the right place to do this,
>
> you just wouldn’t want to store values in the state from within the open()
> function.
>
>
>
> InitializeState() and snapshotState() are mainly used to initialize
> operator state, not keyed state … refer to the relevant documentation.
>
>
>
>
>
> Thias
>
>
>
>
>
> *From:* Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
> *Sent:* Donnerstag, 7. September 2023 09:59
> *To:* user <user@flink.apache.org>
> *Subject:* using CheckpointedFunction on a keyed state
>
>
>
> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>
>
>
> Hi,
> I have a toy Flink job [1] where I have a KeyedProcessFunction
> implementation [2] that also implements the CheckpointedFunction. My stream
> definition has .keyBy(...) call as you can see in [1].
>
> However when I'm trying to run this toy job I'm getting an exception
> from CheckpointedFunction::initializeState method that says:
> "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()'
> operation."
>
> I got an impression from the docs that  CheckpointedFunction can be used
>  on a keyed stream and CheckpointedFunction::initializeState is for
> initialziing the state object.
> Are my assumptions wrong? Is  initializeState onlty to set an initial
> value of a state per key and state object must be initialized in open()
> method?
>
>
> Thanks,
> Krzysztof
>
> [1]
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/DataStreamJob.java
>
> [2]
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/KeyCounter.java
> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
> dieser Informationen ist streng verboten.
>
> This message is intended only for the named recipient and may contain
> confidential or privileged information. As the confidentiality of email
> communication cannot be guaranteed, we do not accept any responsibility for
> the confidentiality and the intactness of this message. If you have
> received it in error, please advise the sender by return e-mail and delete
> this message and any attachments. Any unauthorised use or dissemination of
> this information is strictly prohibited.
>

Reply via email to