Hi Krzysztof

Can you give the original code in initializeState method and the
corresponding exception stack trace? It looks a little interesting.


Best,

Ron


Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> 于2023年9月9日周六 07:12写道:

> My apologies Mattthias,
> you are right. The issue was that I was trying to access state value from
> open/init methods where there was not key context.
>
> Regarding the CheckpointedFunction interface. From javadoc example and
> description I got an impression that this can be used to access keyed state
> on keyed stream.
> Quote "The OperatorStateStore and KeyedStateStore give access to the data
> structures in which state should be stored for Flink to transparently
> manage and checkpoint it,"
>
> The way how I believe I should do is this:
>   public void initializeState(FunctionInitializationContext context)
> throws Exception {
>     keyCounterState = context.getKeyedStateStore()
>         .getState(new ValueStateDescriptor<>("keyCounter", Integer.class));
>   }
>
> So use `context.getKeyedStateStore()` to which I should have an access
> since my stream is keyed.
> I have updated my code [1] and run the toy job, that now works. I think
> that did the trick. WDYT?
>
> [1]
> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/ae449b6b4b054f64f0828343456d814c3aaba962/src/main/java/org/example/KeyedCounter2.java#L19C5-L19C51
>
> czw., 7 wrz 2023 o 10:40 Schwalbe Matthias <matthias.schwa...@viseca.ch>
> napisał(a):
>
>> Hi Krzysztof again,
>>
>>
>>
>> Just for clarity … your sample code [1] tries to count the number of
>> events per key.
>>
>> Assuming this is your intention?
>>
>>
>>
>> Anyway your previous implementation initialized the keyed state
>> keyCounterState in the open function that is the right place to do this,
>>
>> you just wouldn’t want to store values in the state from within the
>> open() function.
>>
>>
>>
>> InitializeState() and snapshotState() are mainly used to initialize
>> operator state, not keyed state … refer to the relevant documentation.
>>
>>
>>
>>
>>
>> Thias
>>
>>
>>
>>
>>
>> *From:* Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com>
>> *Sent:* Donnerstag, 7. September 2023 09:59
>> *To:* user <user@flink.apache.org>
>> *Subject:* using CheckpointedFunction on a keyed state
>>
>>
>>
>> ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠
>>
>>
>>
>> Hi,
>> I have a toy Flink job [1] where I have a KeyedProcessFunction
>> implementation [2] that also implements the CheckpointedFunction. My stream
>> definition has .keyBy(...) call as you can see in [1].
>>
>> However when I'm trying to run this toy job I'm getting an exception
>> from CheckpointedFunction::initializeState method that says:
>> "Keyed state can only be used on a 'keyed stream', i.e., after a
>> 'keyBy()' operation."
>>
>> I got an impression from the docs that  CheckpointedFunction can be used
>>  on a keyed stream and CheckpointedFunction::initializeState is for
>> initialziing the state object.
>> Are my assumptions wrong? Is  initializeState onlty to set an initial
>> value of a state per key and state object must be initialized in open()
>> method?
>>
>>
>> Thanks,
>> Krzysztof
>>
>> [1]
>> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/DataStreamJob.java
>>
>> [2]
>> https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/KeyCounter.java
>> Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und
>> beinhaltet unter Umständen vertrauliche Mitteilungen. Da die
>> Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann,
>> übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und
>> Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir
>> Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie
>> eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung
>> dieser Informationen ist streng verboten.
>>
>> This message is intended only for the named recipient and may contain
>> confidential or privileged information. As the confidentiality of email
>> communication cannot be guaranteed, we do not accept any responsibility for
>> the confidentiality and the intactness of this message. If you have
>> received it in error, please advise the sender by return e-mail and delete
>> this message and any attachments. Any unauthorised use or dissemination of
>> this information is strictly prohibited.
>>
>

Reply via email to