Thanks, that helped. Regards, Krzysztof Chmielewski
czw., 7 wrz 2023 o 09:52 Schwalbe Matthias <matthias.schwa...@viseca.ch> napisał(a): > Hi Krzysztof, > > > > You cannot access keyed state in open(). > > Keyed state has a value per key. > > In theory you would have to initialize per possible key, which is quite > impractical. > > However you don’t need to initialize state, the initial state per key > default to the default value of the type (null for objects). > > Just drop the initializer [1] > > > > Hope this helps > > > > Thias > > > > > > [1] > https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/033f74c427553fbfe0aaffe7d2af4382c09734ad/src/main/java/org/example/KeyCounter.java#L26 > > > > > > > > > > *From:* Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> > *Sent:* Donnerstag, 7. September 2023 09:38 > *To:* user <user@flink.apache.org> > *Subject:* updating keyed state in open method. > > > > ⚠*EXTERNAL MESSAGE – **CAUTION: Think Before You Click *⚠ > > > > Hi, > I'm having a problem with my toy flink job where I would like to access a > ValueState of a keyed stream. The Job setup can be found here [1], it is > fairly simple > > env > .addSource(new CheckpointCountingSource(100, 60)) > .keyBy(value -> value) > .process(new KeyCounter()) > .addSink(new ConsoleSink()); > > > As you can see I'm using a keyBay and KeyCounter is > extending KeyedProcessFunction. > It seems that keyed state cannot be update from RichFunction::open() > method. Is that intended? > > When I ran this example I have an exception that says: > > > Caused by: java.lang.NullPointerException: No key set. This method should > not be called outside of a keyed context. > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76) > at > org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270) > at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260) > at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72) > at org.example.KeyCounter.open(KeyCounter.java:26) > > > [1] > https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java > Diese Nachricht ist ausschliesslich für den Adressaten bestimmt und > beinhaltet unter Umständen vertrauliche Mitteilungen. Da die > Vertraulichkeit von e-Mail-Nachrichten nicht gewährleistet werden kann, > übernehmen wir keine Haftung für die Gewährung der Vertraulichkeit und > Unversehrtheit dieser Mitteilung. Bei irrtümlicher Zustellung bitten wir > Sie um Benachrichtigung per e-Mail und um Löschung dieser Nachricht sowie > eventueller Anhänge. Jegliche unberechtigte Verwendung oder Verbreitung > dieser Informationen ist streng verboten. > > This message is intended only for the named recipient and may contain > confidential or privileged information. As the confidentiality of email > communication cannot be guaranteed, we do not accept any responsibility for > the confidentiality and the intactness of this message. If you have > received it in error, please advise the sender by return e-mail and delete > this message and any attachments. Any unauthorised use or dissemination of > this information is strictly prohibited. >