Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6351#discussion_r204336316 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java --- @@ -197,6 +210,11 @@ public void initializeState(FunctionInitializationContext context) throws Except for (KeyRangeStates keyRange : snapshotKeyRanges.get()) { keyRanges.add(keyRange); } + + // let event time start from the max of all event time progress across subtasks in the last execution + for (Long lastEventTime : lastEventTimes.get()) { + monotonousEventTime = Math.max(monotonousEventTime, lastEventTime); --- End diff -- Or, you really need to track the watermark per key-group partition.
---