Sxnan commented on code in PR #23521:
URL: https://github.com/apache/flink/pull/23521#discussion_r1384472347


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java:
##########
@@ -569,6 +571,12 @@ public void handleOperatorEvent(OperatorEvent event) {
             sourceReader.handleSourceEvents(((SourceEventWrapper) 
event).getSourceEvent());
         } else if (event instanceof NoMoreSplitsEvent) {
             sourceReader.notifyNoMoreSplits();
+        } else if (event instanceof IsBacklogEvent) {
+            eventTimeLogic.triggerPeriodicEmit(System.currentTimeMillis());

Review Comment:
   We should invoke this method to send the watermark downstream in both cases. 
In case of event.isBacklog() == false, the downstream need to update the max 
watermark during backlog.
   
   We cannot cancel the periodicEmit. Otherwise, operators that do not support 
backlog processing, such as multi-input operators, will not work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to