1996fanrui commented on code in PR #22852: URL: https://github.com/apache/flink/pull/22852#discussion_r1243183753
########## flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java: ########## @@ -642,7 +643,32 @@ private static class WatermarkAggregator<T> { * Optional.empty()} otherwise. */ public Optional<Watermark> aggregate(T key, Watermark watermark) { - watermarks.put(key, watermark); + Watermark oldWatermark = watermarks.put(key, watermark); + // Step (1): Update the latest watermark of current key as the aggregatedWatermark + // directly if it is less than the aggregatedWatermark. + if (watermark.getTimestamp() < aggregatedWatermark.getTimestamp()) { + aggregatedWatermark = watermark; + return Optional.of(aggregatedWatermark); + } + + // Step(2): The aggWM won't change when these conditions are met, so return directly: + // case1. The latest WM of the current key isn't changed + // case2. When oldWatermark isn't null and is greater than aggWm, it means that aggWm + // comes from other keys. If new WM is greater than or equal to aggWm, then aggWm must + // not change. + // case3. When oldWatermark is null and {@link watermarks} has other keys, it means that + // aggWm comes from other keys. If new WM is greater than or equal to aggWm, then aggWm + // must not change. + // Note: step(1) have returned when `watermark < aggregatedWatermark`, so all calls Review Comment: Thanks for the review and good suggestion! Your suggestion is more concise, updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org