Zakelly commented on code in PR #25066:
URL: https://github.com/apache/flink/pull/25066#discussion_r1672561489


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java:
##########
@@ -249,48 +249,42 @@ private void verifyStateHandleType(String checkpointPath, 
boolean fileMergingEna
         assertThat(hasKeyedState).isTrue();
     }
 
-    private static void waitUntilNoJobThreads() throws InterruptedException {
-        SecurityManager securityManager = System.getSecurityManager();
-        ThreadGroup group =
-                (securityManager != null)
-                        ? securityManager.getThreadGroup()
-                        : Thread.currentThread().getThreadGroup();
-
-        boolean jobThreads = true;
-        while (jobThreads) {
-            jobThreads = false;
-            Thread[] activeThreads = new Thread[group.activeCount() * 2];
-            group.enumerate(activeThreads);
-            for (Thread thread : activeThreads) {
-                if (thread != null
-                        && thread != Thread.currentThread()
-                        && thread.getName().contains("jobmanager")) {
-                    jobThreads = true;
-                    Thread.sleep(500);
-                    break;
-                }
-            }
-        }
-    }
-
-    private void verifyCheckpointExist(
-            String checkpointPath, boolean exist, boolean fileMergingEnabled) 
throws IOException {
+    private void verifyCheckpointExistOrWaitDeleted(
+            String checkpointPath, boolean exist, boolean fileMergingEnabled) 
throws Exception {
         org.apache.flink.core.fs.Path checkpointDir =
                 new org.apache.flink.core.fs.Path(checkpointPath);
         FileSystem fs = checkpointDir.getFileSystem();
-        assertThat(fs.exists(checkpointDir)).isEqualTo(exist);
         org.apache.flink.core.fs.Path baseDir = checkpointDir.getParent();
-        assertThat(fs.exists(baseDir)).isTrue();
         org.apache.flink.core.fs.Path sharedFile =
                 new org.apache.flink.core.fs.Path(baseDir, 
CHECKPOINT_SHARED_STATE_DIR);
-        assertThat(fs.exists(sharedFile)).isTrue();
-        assertThat(fs.listStatus(sharedFile) != null && 
fs.listStatus(sharedFile).length > 0)
-                .isEqualTo(exist);
         org.apache.flink.core.fs.Path taskOwnedFile =
                 new org.apache.flink.core.fs.Path(baseDir, 
CHECKPOINT_TASK_OWNED_STATE_DIR);
+        assertThat(fs.exists(baseDir)).isTrue();
+        assertThat(fs.exists(sharedFile)).isTrue();
         assertThat(fs.exists(taskOwnedFile)).isTrue();
-        // Since there is no exclusive state, we should consider 
fileMergingEnabled.
-        assertThat(fs.exists(taskOwnedFile) && 
fs.listStatus(taskOwnedFile).length > 0)
-                .isEqualTo(exist && fileMergingEnabled);
+        if (exist) {
+            // should exist, just check
+            assertThat(fs.exists(checkpointDir)).isTrue();
+            assertThat(fs.listStatus(sharedFile) != null && 
fs.listStatus(sharedFile).length > 0)
+                    .isTrue();
+            // Since there is no exclusive state, we should consider 
fileMergingEnabled.
+            assertThat(
+                            fs.listStatus(taskOwnedFile) != null
+                                    && fs.listStatus(taskOwnedFile).length > 0)
+                    .isEqualTo(fileMergingEnabled);
+        } else {
+            // should be cleaned, since the job io threads may work slow, we 
wait.
+            long waited = 0L;
+            while (fs.exists(checkpointDir)
+                    || (fs.listStatus(sharedFile) != null && 
fs.listStatus(sharedFile).length > 0)
+                    || (fs.listStatus(taskOwnedFile) != null
+                            && fs.listStatus(taskOwnedFile).length > 0)) {
+                // We wait
+                Thread.sleep(500L);
+                waited += 500L;
+                // Or timeout
+                assertThat(waited).isLessThan(DELETE_TIMEOUT_MILLS);

Review Comment:
   > If the timeout is reached and the files are not deleted, will this check 
failed? 
   
   Yes. 
   
   > If restoring from `file-merging disabled` to `file-merging disabled`, is 
it okay to skip this check?
   
   In this case, we are not testing the `file-merging`, but we could check the 
file existence by the way.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to