Hi, I have a toy Flink job [1] where I have a KeyedProcessFunction implementation [2] that also implements the CheckpointedFunction. My stream definition has .keyBy(...) call as you can see in [1].
However when I'm trying to run this toy job I'm getting an exception from CheckpointedFunction::initializeState method that says: "Keyed state can only be used on a 'keyed stream', i.e., after a 'keyBy()' operation." I got an impression from the docs that CheckpointedFunction can be used on a keyed stream and CheckpointedFunction::initializeState is for initialziing the state object. Are my assumptions wrong? Is initializeState onlty to set an initial value of a state per key and state object must be initialized in open() method? Thanks, Krzysztof [1] https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/DataStreamJob.java [2] https://github.com/kristoffSC/FlinkSimpleStreamingJob/blob/CheckpointedFunction_issueKeyedStream/src/main/java/org/example/KeyCounter.java