Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6351#discussion_r204342877 --- Diff: flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java --- @@ -197,6 +210,11 @@ public void initializeState(FunctionInitializationContext context) throws Except for (KeyRangeStates keyRange : snapshotKeyRanges.get()) { keyRanges.add(keyRange); } + + // let event time start from the max of all event time progress across subtasks in the last execution + for (Long lastEventTime : lastEventTimes.get()) { + monotonousEventTime = Math.max(monotonousEventTime, lastEventTime); --- End diff -- actually, watermarks are not the direct concern here. What this piece of change is doing is just to ensure that all subtasks start from an event time that is guaranteed to have not "jump back" in time. Watermark extraction is not done within the source.
---