echauchot commented on a change in pull request #13040: URL: https://github.com/apache/flink/pull/13040#discussion_r494401864
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ########## @@ -283,6 +286,54 @@ public void testConcurrentCheckpointOperations() throws Exception { recoveredTestCheckpoint.awaitDiscard(); } + /** + * FLINK-17073 tests that there is no request triggered when there are too many checkpoints + * waiting to clean and that it resumes when the number of waiting checkpoints as gone below + * the threshold. + * + */ + @Test + public void testChekpointingPausesAndResumeWhenTooManyCheckpoints() throws Exception{ + ManualClock clock = new ManualClock(); + clock.advanceTime(1, TimeUnit.DAYS); + int maxCleaningCheckpoints = 1; + CheckpointsCleaner checkpointsCleaner = new CheckpointsCleaner(); + CheckpointRequestDecider checkpointRequestDecider = new CheckpointRequestDecider(maxCleaningCheckpoints, unused ->{}, clock, 1, new AtomicInteger(0)::get, checkpointsCleaner::getNumberOfCheckpointsToClean); + + final int maxCheckpointsToRetain = 1; + Executors.PausableThreadPoolExecutor executor = Executors.pausableExecutor(); + ZooKeeperCompletedCheckpointStore checkpointStore = createCompletedCheckpoints(maxCheckpointsToRetain, executor); + + //pause the executor to pause checkpoints cleaning, to allow assertions + executor.pause(); + + int nbCheckpointsToInject = 3; + for (int i = 1; i <= nbCheckpointsToInject; i++) { + // add checkpoints to clean + TestCompletedCheckpoint completedCheckpoint = new TestCompletedCheckpoint(new JobID(), i, + i, Collections.emptyMap(), CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE), + checkpointsCleaner::cleanCheckpoint); + checkpointStore.addCheckpoint(completedCheckpoint); + } + + Thread.sleep(100L); // give time to submit checkpoints for cleaning + + int nbCheckpointsSubmittedForCleaningByCheckpointStore = nbCheckpointsToInject - maxCheckpointsToRetain; + assertEquals(nbCheckpointsSubmittedForCleaningByCheckpointStore, checkpointsCleaner.getNumberOfCheckpointsToClean()); Review comment: @rkhachatryan I used waiting loops instead of `countDownLatch` to synchronise the test and the executor. And I simply used the executor queue size as the condition for the waiting loop. That way we no more have the condition I did not like and we no more have the countDownLatch and the new executor class you did not like. Do you find this solution acceptable ? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org