echauchot commented on a change in pull request #13040: URL: https://github.com/apache/flink/pull/13040#discussion_r492043783
########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.StateUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Delegate class responsible for checkpoints cleaning and counting the number of checkpoints yet + * to clean. + */ +public class CheckpointsCleaner implements Serializable{ + private final AtomicInteger numberOfCheckpointsToClean; + private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class); + + public CheckpointsCleaner() { + this.numberOfCheckpointsToClean = new AtomicInteger(0); + } + + int getNumberOfCheckpointsToClean() { + return numberOfCheckpointsToClean.get(); + } + + public void cleanCheckpoint(Runnable cleanAction, Runnable postCleanAction, Executor executor) { + numberOfCheckpointsToClean.incrementAndGet(); + executor.execute(() -> { + try { + cleanAction.run(); + } finally { + numberOfCheckpointsToClean.decrementAndGet(); + postCleanAction.run(); + } + }); + } + + public void cleanStates(Runnable postCleanAction, Map<OperatorID, OperatorState> operatorStates, PendingCheckpoint pendingCheckpoint, CheckpointStorageLocation targetLocation, Executor executor){ + numberOfCheckpointsToClean.incrementAndGet(); + executor.execute(() -> { + // discard the private states. + // unregistered shared states are still considered private at this point. + try { + StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); + targetLocation.disposeOnFailure(); Review comment: Yes that is what I asked in [this](https://github.com/apache/flink/pull/13040#issuecomment-688845131) comment if I needed to move the cleaning logic from `PendingCheckpoint` to `CheckpointsCleaner` but, as you were off, I took the choice to move it to have all cleaning logic at the same place. Anyway, I could move it back to `PendingCheckpoint`. For that, we could indeed use `cleanCheckpoint` for doing the `cleanStates()` job and call `operatorStates.clear(); CheckpointCoordinator::scheduleTriggerRequest();` as a `postCleanAction` callback. That way we have only `cleanCheckpoint()` in `CheckpointsCleaner`. Is that good for you ? (I would like it to be the final refactoring :) ) ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ########## @@ -108,6 +110,11 @@ @Nullable private transient volatile CompletedCheckpointStats.DiscardCallback discardCallback; + private final CheckpointsCleaningRunner cleanCallback; + + private final SerializableRunnable checkpointCleaningFinishedCallback; Review comment: sure, thx for pointing out ! ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ########## @@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws Exception { recoveredTestCheckpoint.awaitDiscard(); } + /** + * FLINK-17073 tests that there is no request triggered when there are too many checkpoints + * waiting to clean and that it resumes when the number of waiting checkpoints as gone below + * the threshold. + * + */ + @Test + public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception{ + ManualClock clock = new ManualClock(); + clock.advanceTime(1, TimeUnit.DAYS); + int maxCleaningCheckpoints = 1; + CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(); + CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean); + + final int maxCheckpointsToRetain = 1; + Executors.PausableThreadPoolExecutor executor = Executors.pausableExecutor(); Review comment: I was unaware of this class. Thanks for pointing out ! Yes the public API of this class fits the ITest needs. ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ########## @@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws Exception { recoveredTestCheckpoint.awaitDiscard(); } + /** + * FLINK-17073 tests that there is no request triggered when there are too many checkpoints + * waiting to clean and that it resumes when the number of waiting checkpoints as gone below + * the threshold. + * + */ + @Test + public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception{ + ManualClock clock = new ManualClock(); + clock.advanceTime(1, TimeUnit.DAYS); + int maxCleaningCheckpoints = 1; + CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(); + CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean); + + final int maxCheckpointsToRetain = 1; + Executors.PausableThreadPoolExecutor executor = Executors.pausableExecutor(); + ZooKeeperCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(maxCheckpointsToRetain, executor); + + //pause the executor to pause checkpoints cleaning, to allow assertions + executor.pause(); + + int nbCheckpointsToInject = 3; + for (int i = 1; i <= nbCheckpointsToInject; i++) { + // add checkpoints to clean + TestCompletedCheckpoint completedCheckpoint = new TestCompletedCheckpoint(new JobID(), i, + i, Collections.emptyMap(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), + checkpointsCleaner::cleanCheckpoint); + checkpointStore.addCheckpoint(completedCheckpoint); + } + + Thread.sleep(100L); // give time to submit checkpoints for cleaning + + int nbCheckpointsSubmittedForCleaningByCheckpointStore = nbCheckpointsToInject - maxCheckpointsToRetain; + assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, checkpointsCleaner.getNumberOfCheckpointsToClean()); Review comment: agree. I don't like absolute sleeps either but I did not go into so much thinking about that, I first wanted that the ITest behavior is validated before dealing with details. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointsCleaner.java ########## @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.checkpoint; + +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.state.CheckpointStorageLocation; +import org.apache.flink.runtime.state.StateUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Serializable; +import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Delegate class responsible for checkpoints cleaning and counting the number of checkpoints yet + * to clean. + */ +public class CheckpointsCleaner implements Serializable{ + private final AtomicInteger numberOfCheckpointsToClean; + private static final Logger LOG = LoggerFactory.getLogger(CheckpointsCleaner.class); + + public CheckpointsCleaner() { + this.numberOfCheckpointsToClean = new AtomicInteger(0); + } + + int getNumberOfCheckpointsToClean() { + return numberOfCheckpointsToClean.get(); + } + + public void cleanCheckpoint(Runnable cleanAction, Runnable postCleanAction, Executor executor) { + numberOfCheckpointsToClean.incrementAndGet(); + executor.execute(() -> { + try { + cleanAction.run(); + } finally { + numberOfCheckpointsToClean.decrementAndGet(); + postCleanAction.run(); + } + }); + } + + public void cleanStates(Runnable postCleanAction, Map<OperatorID, OperatorState> operatorStates, PendingCheckpoint pendingCheckpoint, CheckpointStorageLocation targetLocation, Executor executor){ + numberOfCheckpointsToClean.incrementAndGet(); + executor.execute(() -> { + // discard the private states. + // unregistered shared states are still considered private at this point. + try { + StateUtil.bestEffortDiscardAllStateObjects(operatorStates.values()); + targetLocation.disposeOnFailure(); Review comment: Yes that is what I asked in [this](https://github.com/apache/flink/pull/13040#issuecomment-688845131) comment if I needed to move the cleaning logic from `PendingCheckpoint` to `CheckpointsCleaner` but, as you were off, I took the choice to move it to have all cleaning logic at the same place. Anyway, I could move it back to `PendingCheckpoint`. For that, we could indeed use `cleanCheckpoint` for doing the `cleanStates()` job and call `operatorStates.clear(); CheckpointCoordinator::scheduleTriggerRequest();` as a `postCleanAction` callback. That way we have only `cleanCheckpoint()` in `CheckpointsCleaner`. => Is that good for you ? (I would like it to be the final refactoring :) ) ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ########## @@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws Exception { recoveredTestCheckpoint.awaitDiscard(); } + /** + * FLINK-17073 tests that there is no request triggered when there are too many checkpoints + * waiting to clean and that it resumes when the number of waiting checkpoints as gone below + * the threshold. + * + */ + @Test + public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception{ + ManualClock clock = new ManualClock(); + clock.advanceTime(1, TimeUnit.DAYS); + int maxCleaningCheckpoints = 1; + CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(); + CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean); + + final int maxCheckpointsToRetain = 1; + Executors.PausableThreadPoolExecutor executor = Executors.pausableExecutor(); + ZooKeeperCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(maxCheckpointsToRetain, executor); + + //pause the executor to pause checkpoints cleaning, to allow assertions + executor.pause(); + + int nbCheckpointsToInject = 3; + for (int i = 1; i <= nbCheckpointsToInject; i++) { + // add checkpoints to clean + TestCompletedCheckpoint completedCheckpoint = new TestCompletedCheckpoint(new JobID(), i, + i, Collections.emptyMap(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), + checkpointsCleaner::cleanCheckpoint); + checkpointStore.addCheckpoint(completedCheckpoint); + } + + Thread.sleep(100L); // give time to submit checkpoints for cleaning + + int nbCheckpointsSubmittedForCleaningByCheckpointStore = nbCheckpointsToInject - maxCheckpointsToRetain; + assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, checkpointsCleaner.getNumberOfCheckpointsToClean()); Review comment: Some thing troubles me though: using the thing we want to test (`checkpointsCleaner.getNumberOfCheckpointsToClean()`) as a condition for a waiting loop seems not a good idea to me as if the tested element fails, we could have an infinite loop in place of a failing ITest. Maybe more trying to synchronise the ITest and checkpointsStore submit somehow (even by raising the visibility of some internals for the purpose of test). WDYT ? ########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ########## @@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws Exception { recoveredTestCheckpoint.awaitDiscard(); } + /** + * FLINK-17073 tests that there is no request triggered when there are too many checkpoints + * waiting to clean and that it resumes when the number of waiting checkpoints as gone below + * the threshold. + * + */ + @Test + public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception{ + ManualClock clock = new ManualClock(); + clock.advanceTime(1, TimeUnit.DAYS); + int maxCleaningCheckpoints = 1; + CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(); + CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean); + + final int maxCheckpointsToRetain = 1; + Executors.PausableThreadPoolExecutor executor = Executors.pausableExecutor(); + ZooKeeperCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(maxCheckpointsToRetain, executor); + + //pause the executor to pause checkpoints cleaning, to allow assertions + executor.pause(); + + int nbCheckpointsToInject = 3; + for (int i = 1; i <= nbCheckpointsToInject; i++) { + // add checkpoints to clean + TestCompletedCheckpoint completedCheckpoint = new TestCompletedCheckpoint(new JobID(), i, + i, Collections.emptyMap(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), + checkpointsCleaner::cleanCheckpoint); + checkpointStore.addCheckpoint(completedCheckpoint); + } + + Thread.sleep(100L); // give time to submit checkpoints for cleaning + + int nbCheckpointsSubmittedForCleaningByCheckpointStore = nbCheckpointsToInject - maxCheckpointsToRetain; + assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, checkpointsCleaner.getNumberOfCheckpointsToClean()); Review comment: Some thing troubles me though: using the thing we want to test (`checkpointsCleaner.getNumberOfCheckpointsToClean()`) as a condition for a waiting loop seems not a good idea to me as if the tested element fails, we could have an infinite loop in place of a failing ITest. Maybe more trying to synchronise the ITest and checkpointsStore.submitCheckpointForCleaning somehow (even by raising the visibility of some internals for the purpose of test). WDYT ? ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ########## @@ -210,6 +221,21 @@ public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg // Discard and Dispose // ------------------------------------------------------------------------ + /** + * Asynchronously call a discard on the ioExecutor + * (FixedThreadPool of configurable size of default 4*CPU cores) + * and count the number of checkpoints that are waiting to clean. + */ + void asyncDiscardCheckpointAndCountCheckpoint(ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback, Executor executor){ + cleanCallback.accept(() -> { + try { + discardCallback.accept(this); Review comment: This is the code you gave at the end of [your comment](https://github.com/apache/flink/pull/13040#issuecomment-686639977) I just reused it. But anyway no problem about trying to improve it. Regarding your proposed refactoring: I understand that in place of passing callbacks `CompletedCheckpoint::discardOnSubsume` and `CompletedCheckpoint::discardOnShutdown`, as they both call `CompletedCheckpoint.doDiscard()` you prefer calling it directly. But it would require to evaluate the conditions inside ZooKeeperStore: - for `CompletedCheckpoint::discardOnSubsume`: means moving condition `if (props.discardOnSubsumed()) ` - for `CompletedCheckpoint::discardOnShutdown`: means moving conditions `if (jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() || jobStatus == JobStatus.CANCELED && props.discardOnJobCancelled() || jobStatus == JobStatus.FAILED && props.discardOnJobFailed() || jobStatus == JobStatus.SUSPENDED && props.discardOnJobSuspended()) ` => I'm not sure it's worth it, violating the integrity of CompletedCheckpoint object for the sake a readable `asyncDiscardCheckpointAndCountCheckpoint` method. ########## File path: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpoint.java ########## @@ -210,6 +221,21 @@ public void registerSharedStatesAfterRestored(SharedStateRegistry sharedStateReg // Discard and Dispose // ------------------------------------------------------------------------ + /** + * Asynchronously call a discard on the ioExecutor + * (FixedThreadPool of configurable size of default 4*CPU cores) + * and count the number of checkpoints that are waiting to clean. + */ + void asyncDiscardCheckpointAndCountCheckpoint(ThrowingConsumer<CompletedCheckpoint, Exception> discardCallback, Executor executor){ + cleanCallback.accept(() -> { + try { + discardCallback.accept(this); Review comment: This is the code you gave at the end of [your comment](https://github.com/apache/flink/pull/13040#issuecomment-686639977) I just reused it. But anyway no problem about trying to improve it. Regarding your proposed refactoring: I understand that in place of passing callbacks `CompletedCheckpoint::discardOnSubsume` and `CompletedCheckpoint::discardOnShutdown`, as they both call `CompletedCheckpoint.doDiscard()` you prefer calling it directly. But it would require to evaluate the conditions inside ZooKeeperStore: - for `CompletedCheckpoint::discardOnSubsume`: means moving condition `if (props.discardOnSubsumed()) ` - for `CompletedCheckpoint::discardOnShutdown`: means moving conditions `if (jobStatus == JobStatus.FINISHED && props.discardOnJobFinished() || jobStatus == JobStatus.CANCELED && props.discardOnJobCancelled() || jobStatus == JobStatus.FAILED && props.discardOnJobFailed() || jobStatus == JobStatus.SUSPENDED && props.discardOnJobSuspended()) ` => I'm not sure it's worth it, violating the integrity of CompletedCheckpoint object for the sake a readable `asyncDiscardCheckpointAndCountCheckpoint` method. WDYT? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org