Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6351#discussion_r204342877
  
    --- Diff: 
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
 ---
    @@ -197,6 +210,11 @@ public void 
initializeState(FunctionInitializationContext context) throws Except
                        for (KeyRangeStates keyRange : snapshotKeyRanges.get()) 
{
                                keyRanges.add(keyRange);
                        }
    +
    +                   // let event time start from the max of all event time 
progress across subtasks in the last execution
    +                   for (Long lastEventTime : lastEventTimes.get()) {
    +                           monotonousEventTime = 
Math.max(monotonousEventTime, lastEventTime);
    --- End diff --
    
    actually, watermarks are not the direct concern here.
    What this piece of change is doing is just to ensure that all subtasks 
start from an event time that is guaranteed to have not "jump back" in time.
    
    Watermark extraction is not done within the source.


---

Reply via email to