Hi, You cannot access the keyed state within #open(). It can only be accessed under a keyed context ( a key is selected while processing an element, e.g. #processElement).
Best, Zakelly On Thu, Sep 7, 2023 at 4:55 PM Krzysztof Chmielewski <krzysiek.chmielew...@gmail.com> wrote: > > Hi, > I'm having a problem with my toy flink job where I would like to access a > ValueState of a keyed stream. The Job setup can be found here [1], it is > fairly simple > > env > .addSource(new CheckpointCountingSource(100, 60)) > .keyBy(value -> value) > .process(new KeyCounter()) > .addSink(new ConsoleSink()); > > As you can see I'm using a keyBay and KeyCounter is extending > KeyedProcessFunction. > It seems that keyed state cannot be update from RichFunction::open() method. > Is that intended? > > When I ran this example I have an exception that says: > > Caused by: java.lang.NullPointerException: No key set. This method should not > be called outside of a keyed context. > at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76) > at > org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270) > at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260) > at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143) > at > org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72) > at org.example.KeyCounter.open(KeyCounter.java:26) > > > [1] > https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java