yunfengzhou-hub commented on code in PR #22931:
URL: https://github.com/apache/flink/pull/22931#discussion_r1271940611


##########
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java:
##########
@@ -426,6 +446,36 @@ public boolean isShutdown() {
         return shutdown;
     }
 
+    /**
+     * Reports whether a source operator is currently processing backlog.
+     *
+     * <p>If any source operator is processing backlog, the checkpoint 
interval would be decided by
+     * {@code execution.checkpointing.interval-during-backlog} instead of 
{@code
+     * execution.checkpointing.interval}.
+     *
+     * <p>If a source has not invoked this method, the source is considered to 
have
+     * isProcessingBacklog=false. If a source operator has invoked this method 
multiple times, the
+     * last reported value is used.
+     *
+     * @param operatorID the operator ID of the source operator.
+     * @param isProcessingBacklog whether the source operator is processing 
backlog.
+     */
+    public void setIsProcessingBacklog(OperatorID operatorID, boolean 
isProcessingBacklog) {
+        if (isProcessingBacklog) {
+            backlogOperators.add(operatorID);
+        } else {
+            backlogOperators.remove(operatorID);
+        }
+
+        if (getCurrentCheckpointInterval() != Long.MAX_VALUE) {
+            long newNextCheckpointTriggeringTime =
+                    clock.absoluteTimeMillis() + 
getCurrentCheckpointInterval();
+            if (newNextCheckpointTriggeringTime < 
nextCheckpointTriggeringRelativeTime) {
+                rescheduleTrigger(getCurrentCheckpointInterval());

Review Comment:
   `newNextCheckpointTriggeringTime` is not suitable here, as in that case we 
need to write `rescheduleTrigger(newNextCheckpointTriggeringTime - 
clock.relativeTimeMillis());`, while `clock.relativeTimeMillis()` might change.
   
   I'll create variable `long currentCheckpointInterval = 
getCurrentCheckpointInterval()` to avoid getting timestamp twice.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to