Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/6351#discussion_r204336026 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java --- @@ -197,6 +210,11 @@ public void initializeState(FunctionInitializationContext context) throws Except for (KeyRangeStates keyRange : snapshotKeyRanges.get()) { keyRanges.add(keyRange); } + + // let event time start from the max of all event time progress across subtasks in the last execution + for (Long lastEventTime : lastEventTimes.get()) { + monotonousEventTime = Math.max(monotonousEventTime, lastEventTime); --- End diff -- I wonder why we compute the event time as the max and not as the min, as we would usually do for a combined watermark. This is probably never rescaled anyways, but still it looks a bit suspicious.
---