Hi, I'm having a problem with my toy flink job where I would like to access a ValueState of a keyed stream. The Job setup can be found here [1], it is fairly simple
env .addSource(new CheckpointCountingSource(100, 60)) .keyBy(value -> value) .process(new KeyCounter()) .addSink(new ConsoleSink()); As you can see I'm using a keyBay and KeyCounter is extending KeyedProcessFunction. It seems that keyed state cannot be update from RichFunction::open() method. Is that intended? When I ran this example I have an exception that says: Caused by: java.lang.NullPointerException: No key set. This method should not be called outside of a keyed context. at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:76) at org.apache.flink.runtime.state.heap.StateTable.checkKeyNamespacePreconditions(StateTable.java:270) at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:260) at org.apache.flink.runtime.state.heap.StateTable.get(StateTable.java:143) at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:72) at org.example.KeyCounter.open(KeyCounter.java:26) [1] https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/KeyBayIssue/src/main/java/org/example/DataStreamJob.java