echauchot commented on a change in pull request #13040:
URL: https://github.com/apache/flink/pull/13040#discussion_r492043783



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.StateUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Delegate class responsible for checkpoints cleaning and counting the number 
of checkpoints yet
+ * to clean.
+ */
+public class CheckpointsCleaner implements Serializable{
+       private final AtomicInteger numberOfCheckpointsToClean;
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointsCleaner.class);
+
+       public CheckpointsCleaner() {
+               this.numberOfCheckpointsToClean = new AtomicInteger(0);
+       }
+
+       int getNumberOfCheckpointsToClean() {
+               return numberOfCheckpointsToClean.get();
+       }
+
+       public void cleanCheckpoint(Runnable cleanAction, Runnable 
postCleanAction, Executor executor) {
+               numberOfCheckpointsToClean.incrementAndGet();
+               executor.execute(() -> {
+                       try {
+                               cleanAction.run();
+                       } finally {
+                               numberOfCheckpointsToClean.decrementAndGet();
+                               postCleanAction.run();
+                       }
+               });
+       }
+
+       public void cleanStates(Runnable postCleanAction, Map<OperatorID, 
OperatorState> operatorStates, PendingCheckpoint pendingCheckpoint, 
CheckpointStorageLocation targetLocation, Executor executor){
+               numberOfCheckpointsToClean.incrementAndGet();
+               executor.execute(() -> {
+                       // discard the private states.
+                       // unregistered shared states are still considered 
private at this point.
+                       try {
+                               
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+                               targetLocation.disposeOnFailure();

Review comment:
       Yes that is what I asked in 
[this](https://github.com/apache/flink/pull/13040#issuecomment-688845131) 
comment if I needed to move the cleaning logic from `PendingCheckpoint` to 
`CheckpointsCleaner` but, as you were off, I took the choice to move it to have 
all cleaning logic at the same place. Anyway, I could move it back to 
`PendingCheckpoint`. 
   
   For that, we could indeed use `cleanCheckpoint`  for doing the 
`cleanStates()` job and call                          `operatorStates.clear(); 
CheckpointCoordinator::scheduleTriggerRequest();` as a `postCleanAction` 
callback. That way we have only `cleanCheckpoint()` in `CheckpointsCleaner`. Is 
that good for you ? (I would like it to be the final refactoring :) )

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
##########
@@ -108,6 +110,11 @@
        @Nullable
        private transient volatile CompletedCheckpointStats.DiscardCallback 
discardCallback;
 
+       private final CheckpointsCleaningRunner cleanCallback;
+
+       private final SerializableRunnable checkpointCleaningFinishedCallback;

Review comment:
       sure, thx for pointing out !

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
##########
@@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws 
Exception {
                recoveredTestCheckpoint.awaitDiscard();
        }
 
+       /**
+        * FLINK-17073 tests that there is no request triggered when there are 
too many checkpoints
+        * waiting to clean and that it resumes when the number of waiting 
checkpoints as gone below
+        * the threshold.
+        *
+        */
+       @Test
+       public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() 
throws Exception{
+               ManualClock clock = new ManualClock();
+               clock.advanceTime(1, TimeUnit.DAYS);
+               int maxCleaningCheckpoints = 1;
+               CheckpointsCleaner checkpointsCleaner = new 
CheckpointsCleaner();
+               CheckpointRequestDecider checkpointRequestDecider =  new 
CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new 
AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean);
+
+               final int maxCheckpointsToRetain = 1;
+               Executors.PausableThreadPoolExecutor executor = 
Executors.pausableExecutor();

Review comment:
       I was unaware of this class. Thanks for pointing out ! Yes the public 
API of this class fits the ITest needs.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
##########
@@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws 
Exception {
                recoveredTestCheckpoint.awaitDiscard();
        }
 
+       /**
+        * FLINK-17073 tests that there is no request triggered when there are 
too many checkpoints
+        * waiting to clean and that it resumes when the number of waiting 
checkpoints as gone below
+        * the threshold.
+        *
+        */
+       @Test
+       public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() 
throws Exception{
+               ManualClock clock = new ManualClock();
+               clock.advanceTime(1, TimeUnit.DAYS);
+               int maxCleaningCheckpoints = 1;
+               CheckpointsCleaner checkpointsCleaner = new 
CheckpointsCleaner();
+               CheckpointRequestDecider checkpointRequestDecider =  new 
CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new 
AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean);
+
+               final int maxCheckpointsToRetain = 1;
+               Executors.PausableThreadPoolExecutor executor = 
Executors.pausableExecutor();
+               ZooKeeperCompletedCheckpointStore checkpointStore = 
createCompletedCheckpoints(maxCheckpointsToRetain, executor);
+
+               //pause the executor to pause checkpoints cleaning, to allow 
assertions
+               executor.pause();
+
+               int nbCheckpointsToInject = 3;
+               for (int i = 1; i <= nbCheckpointsToInject; i++) {
+                       // add checkpoints to clean
+                       TestCompletedCheckpoint completedCheckpoint = new 
TestCompletedCheckpoint(new JobID(), i,
+                               i, Collections.emptyMap(), 
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                               checkpointsCleaner::cleanCheckpoint);
+                       checkpointStore.addCheckpoint(completedCheckpoint);
+               }
+
+               Thread.sleep(100L); // give time to submit checkpoints for 
cleaning
+
+               int nbCheckpointsSubmittedForCleaningByCheckpointStore = 
nbCheckpointsToInject - maxCheckpointsToRetain;
+               
assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, 
checkpointsCleaner.getNumberOfCheckpointsToClean());

Review comment:
       agree. I don't like absolute sleeps either but I did not go into so much 
thinking about that, I first wanted that the ITest behavior is validated before 
dealing with details.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.StateUtil;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * Delegate class responsible for checkpoints cleaning and counting the number 
of checkpoints yet
+ * to clean.
+ */
+public class CheckpointsCleaner implements Serializable{
+       private final AtomicInteger numberOfCheckpointsToClean;
+       private static final Logger LOG = 
LoggerFactory.getLogger(CheckpointsCleaner.class);
+
+       public CheckpointsCleaner() {
+               this.numberOfCheckpointsToClean = new AtomicInteger(0);
+       }
+
+       int getNumberOfCheckpointsToClean() {
+               return numberOfCheckpointsToClean.get();
+       }
+
+       public void cleanCheckpoint(Runnable cleanAction, Runnable 
postCleanAction, Executor executor) {
+               numberOfCheckpointsToClean.incrementAndGet();
+               executor.execute(() -> {
+                       try {
+                               cleanAction.run();
+                       } finally {
+                               numberOfCheckpointsToClean.decrementAndGet();
+                               postCleanAction.run();
+                       }
+               });
+       }
+
+       public void cleanStates(Runnable postCleanAction, Map<OperatorID, 
OperatorState> operatorStates, PendingCheckpoint pendingCheckpoint, 
CheckpointStorageLocation targetLocation, Executor executor){
+               numberOfCheckpointsToClean.incrementAndGet();
+               executor.execute(() -> {
+                       // discard the private states.
+                       // unregistered shared states are still considered 
private at this point.
+                       try {
+                               
StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values());
+                               targetLocation.disposeOnFailure();

Review comment:
       Yes that is what I asked in 
[this](https://github.com/apache/flink/pull/13040#issuecomment-688845131) 
comment if I needed to move the cleaning logic from `PendingCheckpoint` to 
`CheckpointsCleaner` but, as you were off, I took the choice to move it to have 
all cleaning logic at the same place. Anyway, I could move it back to 
`PendingCheckpoint`. 
   
   For that, we could indeed use `cleanCheckpoint`  for doing the 
`cleanStates()` job and call                          `operatorStates.clear(); 
CheckpointCoordinator::scheduleTriggerRequest();` as a `postCleanAction` 
callback. That way we have only `cleanCheckpoint()` in `CheckpointsCleaner`. 
   
   => Is that good for you ? (I would like it to be the final refactoring :) )

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
##########
@@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws 
Exception {
                recoveredTestCheckpoint.awaitDiscard();
        }
 
+       /**
+        * FLINK-17073 tests that there is no request triggered when there are 
too many checkpoints
+        * waiting to clean and that it resumes when the number of waiting 
checkpoints as gone below
+        * the threshold.
+        *
+        */
+       @Test
+       public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() 
throws Exception{
+               ManualClock clock = new ManualClock();
+               clock.advanceTime(1, TimeUnit.DAYS);
+               int maxCleaningCheckpoints = 1;
+               CheckpointsCleaner checkpointsCleaner = new 
CheckpointsCleaner();
+               CheckpointRequestDecider checkpointRequestDecider =  new 
CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new 
AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean);
+
+               final int maxCheckpointsToRetain = 1;
+               Executors.PausableThreadPoolExecutor executor = 
Executors.pausableExecutor();
+               ZooKeeperCompletedCheckpointStore checkpointStore = 
createCompletedCheckpoints(maxCheckpointsToRetain, executor);
+
+               //pause the executor to pause checkpoints cleaning, to allow 
assertions
+               executor.pause();
+
+               int nbCheckpointsToInject = 3;
+               for (int i = 1; i <= nbCheckpointsToInject; i++) {
+                       // add checkpoints to clean
+                       TestCompletedCheckpoint completedCheckpoint = new 
TestCompletedCheckpoint(new JobID(), i,
+                               i, Collections.emptyMap(), 
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                               checkpointsCleaner::cleanCheckpoint);
+                       checkpointStore.addCheckpoint(completedCheckpoint);
+               }
+
+               Thread.sleep(100L); // give time to submit checkpoints for 
cleaning
+
+               int nbCheckpointsSubmittedForCleaningByCheckpointStore = 
nbCheckpointsToInject - maxCheckpointsToRetain;
+               
assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, 
checkpointsCleaner.getNumberOfCheckpointsToClean());

Review comment:
       Some thing troubles me though: using the thing we want to test 
(`checkpointsCleaner.getNumberOfCheckpointsToClean()`) as a condition for a 
waiting loop seems not a good idea to me as if the tested element fails, we 
could have an infinite loop in place of a failing ITest. Maybe more trying to 
synchronise the ITest and checkpointsStore submit somehow (even by raising the 
visibility of some internals for the purpose of test). 
   
   WDYT ?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
##########
@@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws 
Exception {
                recoveredTestCheckpoint.awaitDiscard();
        }
 
+       /**
+        * FLINK-17073 tests that there is no request triggered when there are 
too many checkpoints
+        * waiting to clean and that it resumes when the number of waiting 
checkpoints as gone below
+        * the threshold.
+        *
+        */
+       @Test
+       public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() 
throws Exception{
+               ManualClock clock = new ManualClock();
+               clock.advanceTime(1, TimeUnit.DAYS);
+               int maxCleaningCheckpoints = 1;
+               CheckpointsCleaner checkpointsCleaner = new 
CheckpointsCleaner();
+               CheckpointRequestDecider checkpointRequestDecider =  new 
CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new 
AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean);
+
+               final int maxCheckpointsToRetain = 1;
+               Executors.PausableThreadPoolExecutor executor = 
Executors.pausableExecutor();
+               ZooKeeperCompletedCheckpointStore checkpointStore = 
createCompletedCheckpoints(maxCheckpointsToRetain, executor);
+
+               //pause the executor to pause checkpoints cleaning, to allow 
assertions
+               executor.pause();
+
+               int nbCheckpointsToInject = 3;
+               for (int i = 1; i <= nbCheckpointsToInject; i++) {
+                       // add checkpoints to clean
+                       TestCompletedCheckpoint completedCheckpoint = new 
TestCompletedCheckpoint(new JobID(), i,
+                               i, Collections.emptyMap(), 
CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE),
+                               checkpointsCleaner::cleanCheckpoint);
+                       checkpointStore.addCheckpoint(completedCheckpoint);
+               }
+
+               Thread.sleep(100L); // give time to submit checkpoints for 
cleaning
+
+               int nbCheckpointsSubmittedForCleaningByCheckpointStore = 
nbCheckpointsToInject - maxCheckpointsToRetain;
+               
assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, 
checkpointsCleaner.getNumberOfCheckpointsToClean());

Review comment:
       Some thing troubles me though: using the thing we want to test 
(`checkpointsCleaner.getNumberOfCheckpointsToClean()`) as a condition for a 
waiting loop seems not a good idea to me as if the tested element fails, we 
could have an infinite loop in place of a failing ITest. Maybe more trying to 
synchronise the ITest and checkpointsStore.submitCheckpointForCleaning somehow 
(even by raising the visibility of some internals for the purpose of test). 
   
   WDYT ?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
##########
@@ -210,6 +221,21 @@ public void 
registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg
        //  Discard and Dispose
        // 
------------------------------------------------------------------------
 
+       /**
+        * Asynchronously call a discard on the ioExecutor
+        * (FixedThreadPool of configurable size of default 4*CPU cores)
+        * and count the number of checkpoints that are waiting to clean.
+        */
+       void 
asyncDiscardCheckpointAndCountCheckpoint(ThrowingConsumer<CompletedCheckpoint, 
Exception> discardCallback, Executor executor){
+               cleanCallback.accept(() -> {
+                       try {
+                               discardCallback.accept(this);

Review comment:
       This is the code you gave at the end of [your 
comment](https://github.com/apache/flink/pull/13040#issuecomment-686639977) I 
just reused it. But anyway no problem about trying to improve it.
   
   Regarding your proposed refactoring: I understand that in place of passing 
callbacks `CompletedCheckpoint::discardOnSubsume` and 
`CompletedCheckpoint::discardOnShutdown`, as they both call 
`CompletedCheckpoint.doDiscard()` you prefer calling it directly. But it would 
require to evaluate the conditions inside ZooKeeperStore:
   
   - for `CompletedCheckpoint::discardOnSubsume`: means moving condition `if 
(props.discardOnSubsumed()) `
   
   - for `CompletedCheckpoint::discardOnShutdown`: means moving conditions `if 
(jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() ||
                                jobStatus == JobStatus.CANCELED && 
props.discardOnJobCancelled() ||
                                jobStatus == JobStatus.FAILED && 
props.discardOnJobFailed() ||
                                jobStatus == JobStatus.SUSPENDED && 
props.discardOnJobSuspended()) `
   => I'm not sure it's worth it, violating the integrity of 
CompletedCheckpoint object for the sake a readable 
`asyncDiscardCheckpointAndCountCheckpoint` method.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java
##########
@@ -210,6 +221,21 @@ public void 
registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg
        //  Discard and Dispose
        // 
------------------------------------------------------------------------
 
+       /**
+        * Asynchronously call a discard on the ioExecutor
+        * (FixedThreadPool of configurable size of default 4*CPU cores)
+        * and count the number of checkpoints that are waiting to clean.
+        */
+       void 
asyncDiscardCheckpointAndCountCheckpoint(ThrowingConsumer<CompletedCheckpoint, 
Exception> discardCallback, Executor executor){
+               cleanCallback.accept(() -> {
+                       try {
+                               discardCallback.accept(this);

Review comment:
       This is the code you gave at the end of [your 
comment](https://github.com/apache/flink/pull/13040#issuecomment-686639977) I 
just reused it. But anyway no problem about trying to improve it.
   
   Regarding your proposed refactoring: I understand that in place of passing 
callbacks `CompletedCheckpoint::discardOnSubsume` and 
`CompletedCheckpoint::discardOnShutdown`, as they both call 
`CompletedCheckpoint.doDiscard()` you prefer calling it directly. But it would 
require to evaluate the conditions inside ZooKeeperStore:
   
   - for `CompletedCheckpoint::discardOnSubsume`: means moving condition `if 
(props.discardOnSubsumed()) `
   
   - for `CompletedCheckpoint::discardOnShutdown`: means moving conditions `if 
(jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() ||
                                jobStatus == JobStatus.CANCELED && 
props.discardOnJobCancelled() ||
                                jobStatus == JobStatus.FAILED && 
props.discardOnJobFailed() ||
                                jobStatus == JobStatus.SUSPENDED && 
props.discardOnJobSuspended()) `
   
   => I'm not sure it's worth it, violating the integrity of 
CompletedCheckpoint object for the sake a readable 
`asyncDiscardCheckpointAndCountCheckpoint` method.
   WDYT?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to