Sxnan commented on code in PR #23521: URL: https://github.com/apache/flink/pull/23521#discussion_r1384472347
########## flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java: ########## @@ -569,6 +571,12 @@ public void handleOperatorEvent(OperatorEvent event) { sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent()); } else if (event instanceof NoMoreSplitsEvent) { sourceReader.notifyNoMoreSplits(); + } else if (event instanceof IsBacklogEvent) { + eventTimeLogic.triggerPeriodicEmit(System.currentTimeMillis()); Review Comment: We should invoke this method to send the watermark downstream in both cases. In case of event.isBacklog() == false, the downstream need to update the max watermark during backlog. We cannot cancel the periodicEmit. Otherwise, operators that do not support backlog processing, such as multi-input operators, will not work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org