pnowojski commented on a change in pull request #11899:
URL: https://github.com/apache/flink/pull/11899#discussion_r417211460



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -144,16 +148,15 @@
         * enforce minimum processing time between checkpoint attempts */
        private final long minPauseBetweenCheckpoints;
 
-       /** The maximum number of checkpoints that may be in progress at the 
same time. */
-       private final int maxConcurrentCheckpointAttempts;
-
        /** The timer that handles the checkpoint timeouts and triggers 
periodic checkpoints.
         * It must be single-threaded. Eventually it will be replaced by main 
thread executor. */
        private final ScheduledExecutor timer;
 
        /** The master checkpoint hooks executed by this checkpoint 
coordinator. */
        private final HashMap<String, MasterTriggerRestoreHook<?>> masterHooks;
 
+       private final boolean isUnalignedCheckpointsEnabled;

Review comment:
       `areUnalignedCheckpointsEnabled`
   
   and the rename from `isUnalignedCheckpoint` should have been a separate 
commit :)

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java
##########
@@ -170,9 +198,87 @@ public String toString() {
                        ", maxConcurrentCheckpoints=" + 
maxConcurrentCheckpoints +
                        ", checkpointRetentionPolicy=" + 
checkpointRetentionPolicy +
                        ", isExactlyOnce=" + isExactlyOnce +
-                       ", isUnalignedCheckpoint=" + isUnalignedCheckpoint +
+                       ", isUnalignedCheckpoint=" + 
isUnalignedCheckpointsEnabled +
                        ", isPreferCheckpointForRecovery=" + 
isPreferCheckpointForRecovery +
                        ", tolerableCheckpointFailureNumber=" + 
tolerableCheckpointFailureNumber +
                        '}';
        }
+
+       public static CheckpointCoordinatorConfigurationBuilder builder() {
+               return new CheckpointCoordinatorConfigurationBuilder();
+       }
+
+       /**
+        * {@link CheckpointCoordinatorConfiguration} builder.
+        */
+       public static class CheckpointCoordinatorConfigurationBuilder {

Review comment:
       introduction of this builder also should have been a separate commit.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java
##########
@@ -813,13 +809,11 @@ public void 
testStateRecoveryWithTopologyChange(TestScaleType scaleType) throws
                        
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION),
                        new TestCompletedCheckpointStorageLocation());
 
-               
when(standaloneCompletedCheckpointStore.getLatestCheckpoint(false)).thenReturn(completedCheckpoint);
-
                // set up the coordinator and validate the initial state
                CheckpointCoordinator coord =
                        new CheckpointCoordinatorBuilder()
                                .setTasks(newJobVertex1.getTaskVertices())
-                               
.setCompletedCheckpointStore(standaloneCompletedCheckpointStore)
+                               
.setCompletedCheckpointStore(CompletedCheckpointStore.storeFor(completedCheckpoint))

Review comment:
       This is a small change, but I also tend to "demockitofy" tests in a 
separate commit (when I'm finding a test failing because of mockito, I 
rebase/stash all of my changes, fix the test and re-apply my changes).

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.CheckpointTriggerRequest;
+import org.apache.flink.runtime.util.clock.Clock;
+
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS;
+
+@SuppressWarnings("ConstantConditions")
+class CheckpointRequestDecider {
+       private static final int MAX_QUEUED_REQUESTS = 1000;
+
+       private final int maxConcurrentCheckpointAttempts;
+       private final Consumer<Long> rescheduleTrigger;
+       private final Clock clock;
+       private final long minPauseBetweenCheckpoints;
+       private final Supplier<Integer> pendingCheckpointsSizeSupplier;
+       private final Function<Supplier<Optional<CheckpointTriggerRequest>>, 
Optional<CheckpointTriggerRequest>> sync;
+       private final PriorityQueue<CheckpointTriggerRequest> 
triggerRequestQueue = new PriorityQueue<>((r1, r2) -> {
+               if (r1.props.isSavepoint() != r2.props.isSavepoint()) {
+                       return r1.props.isSavepoint() ? -1 : 1;
+               } else if (r1.props.forceCheckpoint() != 
r2.props.forceCheckpoint()) {
+                       return r1.props.forceCheckpoint() ? -1 : 1;
+               } else if (r1.isPeriodic != r2.isPeriodic) {
+                       return r1.isPeriodic ? 1 : -1;
+               } else {
+                       return Long.compare(r1.timestamp, r2.timestamp);
+               }
+       });
+
+       CheckpointRequestDecider(
+                       int maxConcurrentCheckpointAttempts,
+                       Consumer<Long> rescheduleTrigger,
+                       Clock clock,
+                       long minPauseBetweenCheckpoints,
+                       Supplier<Integer> pendingCheckpointsSizeSupplier,
+                       Function<Supplier<Optional<CheckpointTriggerRequest>>,
+                       Optional<CheckpointTriggerRequest>> sync) {
+               this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
+               this.rescheduleTrigger = rescheduleTrigger;
+               this.clock = clock;
+               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.pendingCheckpointsSizeSupplier = 
pendingCheckpointsSizeSupplier;
+               this.sync = sync;
+       }
+
+       Optional<CheckpointTriggerRequest> 
chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean 
isTriggering, long lastCompletionMs) {
+               return chooseRequestToExecute(false, () -> newRequest, 
isTriggering, lastCompletionMs);
+       }
+
+       Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean 
isTriggering, long lastCompletionMs) {
+               return chooseRequestToExecute(true, triggerRequestQueue::peek, 
isTriggering, lastCompletionMs);
+       }
+
+       /**
+        * Choose the next {@link CheckpointTriggerRequest request} to execute 
based on the provided candidate and the
+        * current state. Acquires a lock and may update the state.
+        * @return request to execute, if any.
+        */
+       private Optional<CheckpointTriggerRequest> chooseRequestToExecute(
+                       boolean isEnqueued,
+                       Supplier<CheckpointTriggerRequest> candidateSupplier,

Review comment:
       Does it have to be a supplier? Why can not it be just a `nextRequest`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.CheckpointTriggerRequest;
+import org.apache.flink.runtime.util.clock.Clock;
+
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS;
+
+@SuppressWarnings("ConstantConditions")
+class CheckpointRequestDecider {
+       private static final int MAX_QUEUED_REQUESTS = 1000;
+
+       private final int maxConcurrentCheckpointAttempts;
+       private final Consumer<Long> rescheduleTrigger;
+       private final Clock clock;
+       private final long minPauseBetweenCheckpoints;
+       private final Supplier<Integer> pendingCheckpointsSizeSupplier;
+       private final Function<Supplier<Optional<CheckpointTriggerRequest>>, 
Optional<CheckpointTriggerRequest>> sync;
+       private final PriorityQueue<CheckpointTriggerRequest> 
triggerRequestQueue = new PriorityQueue<>((r1, r2) -> {
+               if (r1.props.isSavepoint() != r2.props.isSavepoint()) {
+                       return r1.props.isSavepoint() ? -1 : 1;
+               } else if (r1.props.forceCheckpoint() != 
r2.props.forceCheckpoint()) {
+                       return r1.props.forceCheckpoint() ? -1 : 1;
+               } else if (r1.isPeriodic != r2.isPeriodic) {
+                       return r1.isPeriodic ? 1 : -1;

Review comment:
       For what use case have you added this condition? Or that was a behaviour 
before your changes?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.CheckpointTriggerRequest;
+import org.apache.flink.runtime.util.clock.Clock;
+
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS;
+
+@SuppressWarnings("ConstantConditions")
+class CheckpointRequestDecider {
+       private static final int MAX_QUEUED_REQUESTS = 1000;
+
+       private final int maxConcurrentCheckpointAttempts;
+       private final Consumer<Long> rescheduleTrigger;
+       private final Clock clock;
+       private final long minPauseBetweenCheckpoints;
+       private final Supplier<Integer> pendingCheckpointsSizeSupplier;
+       private final Function<Supplier<Optional<CheckpointTriggerRequest>>, 
Optional<CheckpointTriggerRequest>> sync;
+       private final PriorityQueue<CheckpointTriggerRequest> 
triggerRequestQueue = new PriorityQueue<>((r1, r2) -> {
+               if (r1.props.isSavepoint() != r2.props.isSavepoint()) {
+                       return r1.props.isSavepoint() ? -1 : 1;
+               } else if (r1.props.forceCheckpoint() != 
r2.props.forceCheckpoint()) {
+                       return r1.props.forceCheckpoint() ? -1 : 1;
+               } else if (r1.isPeriodic != r2.isPeriodic) {
+                       return r1.isPeriodic ? 1 : -1;
+               } else {
+                       return Long.compare(r1.timestamp, r2.timestamp);
+               }
+       });
+
+       CheckpointRequestDecider(
+                       int maxConcurrentCheckpointAttempts,
+                       Consumer<Long> rescheduleTrigger,
+                       Clock clock,
+                       long minPauseBetweenCheckpoints,
+                       Supplier<Integer> pendingCheckpointsSizeSupplier,
+                       Function<Supplier<Optional<CheckpointTriggerRequest>>,
+                       Optional<CheckpointTriggerRequest>> sync) {
+               this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
+               this.rescheduleTrigger = rescheduleTrigger;
+               this.clock = clock;
+               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.pendingCheckpointsSizeSupplier = 
pendingCheckpointsSizeSupplier;
+               this.sync = sync;
+       }
+
+       Optional<CheckpointTriggerRequest> 
chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean 
isTriggering, long lastCompletionMs) {
+               return chooseRequestToExecute(false, () -> newRequest, 
isTriggering, lastCompletionMs);
+       }
+
+       Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean 
isTriggering, long lastCompletionMs) {
+               return chooseRequestToExecute(true, triggerRequestQueue::peek, 
isTriggering, lastCompletionMs);
+       }
+
+       /**
+        * Choose the next {@link CheckpointTriggerRequest request} to execute 
based on the provided candidate and the
+        * current state. Acquires a lock and may update the state.
+        * @return request to execute, if any.
+        */
+       private Optional<CheckpointTriggerRequest> chooseRequestToExecute(
+                       boolean isEnqueued,
+                       Supplier<CheckpointTriggerRequest> candidateSupplier,
+                       boolean isTriggering,
+                       long lastCompletionMs) {
+               return sync.apply(() -> {
+                       final CheckpointTriggerRequest candidate = 
candidateSupplier.get();
+                       if (candidate == null) {
+                               return Optional.empty();

Review comment:
       You could have moved this check to `chooseQueuedRequestToExecute` 
allowing us to avoid `@Nullable` value handling.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.CheckpointTriggerRequest;
+import org.apache.flink.runtime.util.clock.Clock;
+
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS;
+
+@SuppressWarnings("ConstantConditions")
+class CheckpointRequestDecider {
+       private static final int MAX_QUEUED_REQUESTS = 1000;

Review comment:
       This is a completely new limit right? And you have introduced it to 
limit the pending savepoints as previously `triggerRequestQueue` would be 
cleaned up very quickly, while now if the head of the queue is a savepoint, it 
can be blocked for a long time? 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.CheckpointTriggerRequest;
+import org.apache.flink.runtime.util.clock.Clock;
+
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS;
+
+@SuppressWarnings("ConstantConditions")
+class CheckpointRequestDecider {
+       private static final int MAX_QUEUED_REQUESTS = 1000;
+
+       private final int maxConcurrentCheckpointAttempts;
+       private final Consumer<Long> rescheduleTrigger;
+       private final Clock clock;
+       private final long minPauseBetweenCheckpoints;
+       private final Supplier<Integer> pendingCheckpointsSizeSupplier;
+       private final Object lock;
+       private final PriorityQueue<CheckpointTriggerRequest> 
triggerRequestQueue = new PriorityQueue<>((r1, r2) -> {
+               if (r1.props.isSavepoint() != r2.props.isSavepoint()) {
+                       return r1.props.isSavepoint() ? -1 : 1;
+               } else if (r1.props.forceCheckpoint() != 
r2.props.forceCheckpoint()) {
+                       return r1.props.forceCheckpoint() ? -1 : 1;
+               } else if (r1.isPeriodic != r2.isPeriodic) {
+                       return r1.isPeriodic ? 1 : -1;
+               } else {
+                       return Long.compare(r1.timestamp, r2.timestamp);
+               }
+       });
+
+       CheckpointRequestDecider(
+                       int maxConcurrentCheckpointAttempts,
+                       Consumer<Long> rescheduleTrigger,
+                       Clock clock,
+                       long minPauseBetweenCheckpoints,
+                       Supplier<Integer> pendingCheckpointsSizeSupplier,
+                       Object lock) {
+               this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
+               this.rescheduleTrigger = rescheduleTrigger;
+               this.clock = clock;
+               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.pendingCheckpointsSizeSupplier = 
pendingCheckpointsSizeSupplier;
+               this.lock = lock;
+       }
+
+       Optional<CheckpointTriggerRequest> 
chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean 
isTriggering, long lastCompletionMs) {
+               return chooseRequestToExecute(false, () -> newRequest, 
isTriggering, lastCompletionMs);
+       }
+
+       Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean 
isTriggering, long lastCompletionMs) {
+               return chooseRequestToExecute(true, triggerRequestQueue::peek, 
isTriggering, lastCompletionMs);
+       }
+
+       /**
+        * Choose the next {@link CheckpointTriggerRequest request} to execute 
based on the provided candidate and the
+        * current state. Acquires a lock and may update the state.
+        * @return request to execute, if any.
+        */
+       private Optional<CheckpointTriggerRequest> chooseRequestToExecute(
+                       boolean isEnqueued,
+                       Supplier<CheckpointTriggerRequest> candidateSupplier,
+                       boolean isTriggering,
+                       long lastCompletionMs) {
+               synchronized (lock) {
+                       final CheckpointTriggerRequest candidate = 
candidateSupplier.get();
+                       if (candidate == null) {
+                               return Optional.empty();
+                       } else if (isTriggering) {
+                               return onCantTriggerNow(candidate, isEnqueued);
+                       } else if (pendingCheckpointsSizeSupplier.get() >= 
maxConcurrentCheckpointAttempts) {
+                               return onCantCheckpointNow(candidate, 
TOO_MANY_CONCURRENT_CHECKPOINTS, () -> { /* not used */ }, isEnqueued);
+                       } else if (nextTriggerDelayMillis(lastCompletionMs) > 
0) {
+                               return onCantCheckpointNow(candidate, 
MINIMUM_TIME_BETWEEN_CHECKPOINTS, () -> 
rescheduleTrigger.accept(nextTriggerDelayMillis(lastCompletionMs)), isEnqueued);
+                       } else if (candidate != triggerRequestQueue.peek()) {
+                               triggerRequestQueue.offer(candidate);
+                               return Optional.of(triggerRequestQueue.poll());
+                       } else {
+                               return Optional.of(isEnqueued ? 
triggerRequestQueue.poll() : candidate);
+                       }
+               }
+       }
+
+       private Optional<CheckpointTriggerRequest> 
onCantTriggerNow(CheckpointTriggerRequest candidate, boolean isEnqueued) {
+               offerIfNeeded(candidate, isEnqueued);
+               return Optional.empty();
+       }
+
+       private Optional<CheckpointTriggerRequest> onCantCheckpointNow(
+                       CheckpointTriggerRequest candidate,
+                       CheckpointFailureReason dropReason,
+                       Runnable postDrop,
+                       boolean isEnqueued) {
+               if (candidate.props.forceCheckpoint()) {
+                       return Optional.of(isEnqueued ? 
triggerRequestQueue.poll() : candidate);

Review comment:
       it's a bit confusing the invartiant, that `candidate` can be still on 
the queue or not yet. I'm not sure, but It might cleaned up the code a bit, if 
you polled the candidate from the the queue immediately. This could unify some 
code paths with not enqueued candidate.
   
   Especially that this `isEnqueued ? triggerRequestQueue.poll() : candidate` 
condition is already duplicated and can be easily forgotten in case of future 
changes.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.CheckpointTriggerRequest;
+import org.apache.flink.runtime.util.clock.Clock;
+
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS;
+
+@SuppressWarnings("ConstantConditions")
+class CheckpointRequestDecider {
+       private static final int MAX_QUEUED_REQUESTS = 1000;
+
+       private final int maxConcurrentCheckpointAttempts;
+       private final Consumer<Long> rescheduleTrigger;
+       private final Clock clock;
+       private final long minPauseBetweenCheckpoints;
+       private final Supplier<Integer> pendingCheckpointsSizeSupplier;
+       private final Function<Supplier<Optional<CheckpointTriggerRequest>>, 
Optional<CheckpointTriggerRequest>> sync;
+       private final PriorityQueue<CheckpointTriggerRequest> 
triggerRequestQueue = new PriorityQueue<>((r1, r2) -> {
+               if (r1.props.isSavepoint() != r2.props.isSavepoint()) {
+                       return r1.props.isSavepoint() ? -1 : 1;
+               } else if (r1.props.forceCheckpoint() != 
r2.props.forceCheckpoint()) {
+                       return r1.props.forceCheckpoint() ? -1 : 1;
+               } else if (r1.isPeriodic != r2.isPeriodic) {
+                       return r1.isPeriodic ? 1 : -1;
+               } else {
+                       return Long.compare(r1.timestamp, r2.timestamp);
+               }
+       });

Review comment:
       Ok, you don't have to provide another test for it, but it would save me 
(and thus probably other persons as well) if the function would be named 
`checkpointTriggerRequestsComparator(...)`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.CheckpointTriggerRequest;
+import org.apache.flink.runtime.util.clock.Clock;
+
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS;
+
+@SuppressWarnings("ConstantConditions")
+class CheckpointRequestDecider {
+       private static final int MAX_QUEUED_REQUESTS = 1000;
+
+       private final int maxConcurrentCheckpointAttempts;
+       private final Consumer<Long> rescheduleTrigger;
+       private final Clock clock;
+       private final long minPauseBetweenCheckpoints;
+       private final Supplier<Integer> pendingCheckpointsSizeSupplier;
+       private final Object lock;
+       private final PriorityQueue<CheckpointTriggerRequest> 
triggerRequestQueue = new PriorityQueue<>((r1, r2) -> {
+               if (r1.props.isSavepoint() != r2.props.isSavepoint()) {
+                       return r1.props.isSavepoint() ? -1 : 1;
+               } else if (r1.props.forceCheckpoint() != 
r2.props.forceCheckpoint()) {
+                       return r1.props.forceCheckpoint() ? -1 : 1;
+               } else if (r1.isPeriodic != r2.isPeriodic) {
+                       return r1.isPeriodic ? 1 : -1;
+               } else {
+                       return Long.compare(r1.timestamp, r2.timestamp);
+               }
+       });
+
+       CheckpointRequestDecider(
+                       int maxConcurrentCheckpointAttempts,
+                       Consumer<Long> rescheduleTrigger,
+                       Clock clock,
+                       long minPauseBetweenCheckpoints,
+                       Supplier<Integer> pendingCheckpointsSizeSupplier,
+                       Object lock) {
+               this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
+               this.rescheduleTrigger = rescheduleTrigger;
+               this.clock = clock;
+               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.pendingCheckpointsSizeSupplier = 
pendingCheckpointsSizeSupplier;
+               this.lock = lock;
+       }
+
+       Optional<CheckpointTriggerRequest> 
chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean 
isTriggering, long lastCompletionMs) {
+               return chooseRequestToExecute(false, () -> newRequest, 
isTriggering, lastCompletionMs);
+       }
+
+       Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean 
isTriggering, long lastCompletionMs) {
+               return chooseRequestToExecute(true, triggerRequestQueue::peek, 
isTriggering, lastCompletionMs);
+       }
+
+       /**
+        * Choose the next {@link CheckpointTriggerRequest request} to execute 
based on the provided candidate and the
+        * current state. Acquires a lock and may update the state.
+        * @return request to execute, if any.
+        */
+       private Optional<CheckpointTriggerRequest> chooseRequestToExecute(
+                       boolean isEnqueued,
+                       Supplier<CheckpointTriggerRequest> candidateSupplier,
+                       boolean isTriggering,
+                       long lastCompletionMs) {
+               synchronized (lock) {
+                       final CheckpointTriggerRequest candidate = 
candidateSupplier.get();
+                       if (candidate == null) {
+                               return Optional.empty();
+                       } else if (isTriggering) {
+                               return onCantTriggerNow(candidate, isEnqueued);
+                       } else if (pendingCheckpointsSizeSupplier.get() >= 
maxConcurrentCheckpointAttempts) {
+                               return onCantCheckpointNow(candidate, 
TOO_MANY_CONCURRENT_CHECKPOINTS, () -> { /* not used */ }, isEnqueued);
+                       } else if (nextTriggerDelayMillis(lastCompletionMs) > 
0) {
+                               return onCantCheckpointNow(candidate, 
MINIMUM_TIME_BETWEEN_CHECKPOINTS, () -> 
rescheduleTrigger.accept(nextTriggerDelayMillis(lastCompletionMs)), isEnqueued);
+                       } else if (candidate != triggerRequestQueue.peek()) {

Review comment:
       `else if (!isEnqueued)` to be consistent with other such checks? Or 
maybe introduce
   ```
   boolean isAlreadyEnqueued(candidate) {
     return candidate == triggerRequestQueue.peek();
   }
   ```
   ?
   
   edit: or as I suggested below, drop the case of enqueued `candidate` 
altogether ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java
##########
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.CheckpointTriggerRequest;
+import org.apache.flink.runtime.util.clock.Clock;
+
+import java.util.Optional;
+import java.util.PriorityQueue;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.MINIMUM_TIME_BETWEEN_CHECKPOINTS;
+import static 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason.TOO_MANY_CONCURRENT_CHECKPOINTS;
+
+@SuppressWarnings("ConstantConditions")
+class CheckpointRequestDecider {
+       private static final int MAX_QUEUED_REQUESTS = 1000;
+
+       private final int maxConcurrentCheckpointAttempts;
+       private final Consumer<Long> rescheduleTrigger;
+       private final Clock clock;
+       private final long minPauseBetweenCheckpoints;
+       private final Supplier<Integer> pendingCheckpointsSizeSupplier;
+       private final Function<Supplier<Optional<CheckpointTriggerRequest>>, 
Optional<CheckpointTriggerRequest>> sync;
+       private final PriorityQueue<CheckpointTriggerRequest> 
triggerRequestQueue = new PriorityQueue<>((r1, r2) -> {
+               if (r1.props.isSavepoint() != r2.props.isSavepoint()) {
+                       return r1.props.isSavepoint() ? -1 : 1;
+               } else if (r1.props.forceCheckpoint() != 
r2.props.forceCheckpoint()) {
+                       return r1.props.forceCheckpoint() ? -1 : 1;
+               } else if (r1.isPeriodic != r2.isPeriodic) {
+                       return r1.isPeriodic ? 1 : -1;
+               } else {
+                       return Long.compare(r1.timestamp, r2.timestamp);
+               }
+       });
+
+       CheckpointRequestDecider(
+                       int maxConcurrentCheckpointAttempts,
+                       Consumer<Long> rescheduleTrigger,
+                       Clock clock,
+                       long minPauseBetweenCheckpoints,
+                       Supplier<Integer> pendingCheckpointsSizeSupplier,
+                       Function<Supplier<Optional<CheckpointTriggerRequest>>,
+                       Optional<CheckpointTriggerRequest>> sync) {
+               this.maxConcurrentCheckpointAttempts = 
maxConcurrentCheckpointAttempts;
+               this.rescheduleTrigger = rescheduleTrigger;
+               this.clock = clock;
+               this.minPauseBetweenCheckpoints = minPauseBetweenCheckpoints;
+               this.pendingCheckpointsSizeSupplier = 
pendingCheckpointsSizeSupplier;
+               this.sync = sync;
+       }
+
+       Optional<CheckpointTriggerRequest> 
chooseRequestToExecute(CheckpointTriggerRequest newRequest, boolean 
isTriggering, long lastCompletionMs) {
+               return chooseRequestToExecute(false, () -> newRequest, 
isTriggering, lastCompletionMs);
+       }
+
+       Optional<CheckpointTriggerRequest> chooseQueuedRequestToExecute(boolean 
isTriggering, long lastCompletionMs) {
+               return chooseRequestToExecute(true, triggerRequestQueue::peek, 
isTriggering, lastCompletionMs);
+       }
+
+       /**
+        * Choose the next {@link CheckpointTriggerRequest request} to execute 
based on the provided candidate and the
+        * current state. Acquires a lock and may update the state.
+        * @return request to execute, if any.
+        */
+       private Optional<CheckpointTriggerRequest> chooseRequestToExecute(
+                       boolean isEnqueued,
+                       Supplier<CheckpointTriggerRequest> candidateSupplier,
+                       boolean isTriggering,
+                       long lastCompletionMs) {
+               return sync.apply(() -> {
+                       final CheckpointTriggerRequest candidate = 
candidateSupplier.get();
+                       if (candidate == null) {
+                               return Optional.empty();
+                       } else if (isTriggering) {
+                               return onCantTriggerNow(candidate, isEnqueued);
+                       } else if (pendingCheckpointsSizeSupplier.get() >= 
maxConcurrentCheckpointAttempts) {
+                               return onCantCheckpointNow(candidate, 
TOO_MANY_CONCURRENT_CHECKPOINTS, () -> { /* not used */ }, isEnqueued);
+                       } else if (nextTriggerDelayMillis(lastCompletionMs) > 
0) {
+                               return onCantCheckpointNow(candidate, 
MINIMUM_TIME_BETWEEN_CHECKPOINTS, () -> 
rescheduleTrigger.accept(nextTriggerDelayMillis(lastCompletionMs)), isEnqueued);
+                       } else if (candidate != triggerRequestQueue.peek()) {
+                               triggerRequestQueue.offer(candidate);
+                               return Optional.of(triggerRequestQueue.poll());
+                       } else {
+                               return Optional.of(isEnqueued ? 
triggerRequestQueue.poll() : candidate);
+                       }
+               });
+       }
+
+       private Optional<CheckpointTriggerRequest> 
onCantTriggerNow(CheckpointTriggerRequest candidate, boolean isEnqueued) {
+               offerIfNeeded(candidate, isEnqueued);
+               return Optional.empty();
+       }
+
+       private Optional<CheckpointTriggerRequest> onCantCheckpointNow(
+                       CheckpointTriggerRequest candidate,
+                       CheckpointFailureReason dropReason,
+                       Runnable postDrop,
+                       boolean isEnqueued) {
+               if (candidate.props.forceCheckpoint()) {
+                       return Optional.of(isEnqueued ? 
triggerRequestQueue.poll() : candidate);
+               } else if (candidate.isPeriodic) {
+                       if (isEnqueued) {
+                               triggerRequestQueue.poll();
+                       }
+                       candidate.onCompletionPromise.completeExceptionally(new 
CheckpointException(dropReason));
+                       postDrop.run();
+               }
+               offerIfNeeded(candidate, isEnqueued);
+               return Optional.empty();
+       }
+
+       private void offerIfNeeded(CheckpointTriggerRequest candidate, boolean 
isEnqueued) {
+               if (!isEnqueued) {
+                       if (triggerRequestQueue.size() < MAX_QUEUED_REQUESTS) {
+                               triggerRequestQueue.offer(candidate);
+                       } else {
+                               
candidate.onCompletionPromise.completeExceptionally(new 
CheckpointException(CheckpointFailureReason.TOO_MANY_CHECKPOINT_REQUESTS));

Review comment:
       Is this branch unit testes somewhere? (I couldn't find 
`TOO_MANY_CHECKPOINT_REQUESTS` referenced in tests)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to