Zakelly commented on code in PR #25066: URL: https://github.com/apache/flink/pull/25066#discussion_r1672561489
########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java: ########## @@ -249,48 +249,42 @@ private void verifyStateHandleType(String checkpointPath, boolean fileMergingEna assertThat(hasKeyedState).isTrue(); } - private static void waitUntilNoJobThreads() throws InterruptedException { - SecurityManager securityManager = System.getSecurityManager(); - ThreadGroup group = - (securityManager != null) - ? securityManager.getThreadGroup() - : Thread.currentThread().getThreadGroup(); - - boolean jobThreads = true; - while (jobThreads) { - jobThreads = false; - Thread[] activeThreads = new Thread[group.activeCount() * 2]; - group.enumerate(activeThreads); - for (Thread thread : activeThreads) { - if (thread != null - && thread != Thread.currentThread() - && thread.getName().contains("jobmanager")) { - jobThreads = true; - Thread.sleep(500); - break; - } - } - } - } - - private void verifyCheckpointExist( - String checkpointPath, boolean exist, boolean fileMergingEnabled) throws IOException { + private void verifyCheckpointExistOrWaitDeleted( + String checkpointPath, boolean exist, boolean fileMergingEnabled) throws Exception { org.apache.flink.core.fs.Path checkpointDir = new org.apache.flink.core.fs.Path(checkpointPath); FileSystem fs = checkpointDir.getFileSystem(); - assertThat(fs.exists(checkpointDir)).isEqualTo(exist); org.apache.flink.core.fs.Path baseDir = checkpointDir.getParent(); - assertThat(fs.exists(baseDir)).isTrue(); org.apache.flink.core.fs.Path sharedFile = new org.apache.flink.core.fs.Path(baseDir, CHECKPOINT_SHARED_STATE_DIR); - assertThat(fs.exists(sharedFile)).isTrue(); - assertThat(fs.listStatus(sharedFile) != null && fs.listStatus(sharedFile).length > 0) - .isEqualTo(exist); org.apache.flink.core.fs.Path taskOwnedFile = new org.apache.flink.core.fs.Path(baseDir, CHECKPOINT_TASK_OWNED_STATE_DIR); + assertThat(fs.exists(baseDir)).isTrue(); + assertThat(fs.exists(sharedFile)).isTrue(); assertThat(fs.exists(taskOwnedFile)).isTrue(); - // Since there is no exclusive state, we should consider fileMergingEnabled. - assertThat(fs.exists(taskOwnedFile) && fs.listStatus(taskOwnedFile).length > 0) - .isEqualTo(exist && fileMergingEnabled); + if (exist) { + // should exist, just check + assertThat(fs.exists(checkpointDir)).isTrue(); + assertThat(fs.listStatus(sharedFile) != null && fs.listStatus(sharedFile).length > 0) + .isTrue(); + // Since there is no exclusive state, we should consider fileMergingEnabled. + assertThat( + fs.listStatus(taskOwnedFile) != null + && fs.listStatus(taskOwnedFile).length > 0) + .isEqualTo(fileMergingEnabled); + } else { + // should be cleaned, since the job io threads may work slow, we wait. + long waited = 0L; + while (fs.exists(checkpointDir) + || (fs.listStatus(sharedFile) != null && fs.listStatus(sharedFile).length > 0) + || (fs.listStatus(taskOwnedFile) != null + && fs.listStatus(taskOwnedFile).length > 0)) { + // We wait + Thread.sleep(500L); + waited += 500L; + // Or timeout + assertThat(waited).isLessThan(DELETE_TIMEOUT_MILLS); Review Comment: > If the timeout is reached and the files are not deleted, will this check failed? Yes. > If restoring from `file-merging disabled` to `file-merging disabled`, is it okay to skip this check? In this case, we are not testing the `file-merging`, but we could check the file existence by the way. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org