Github user hzyuemeng1 commented on a diff in the pull request: https://github.com/apache/flink/pull/5917#discussion_r187887606 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java --- @@ -60,7 +60,12 @@ public boolean canMerge() { @Override public void onMerge(TimeWindow window, OnMergeContext ctx) { - ctx.registerProcessingTimeTimer(window.maxTimestamp()); + //only current wartermark less than a merge window maxtimestamp that we regsiter a new timer for fire + //otherwise if will fired immediately by call onElement + long windowMaxTimestamp = window.maxTimestamp(); + if (windowMaxTimestamp > ctx.getCurrentWatermark()) { --- End diff -- thanks for your review and merge
---