Zakelly commented on code in PR #25090:
URL: https://github.com/apache/flink/pull/25090#discussion_r1678670379


##########
flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java:
##########
@@ -319,9 +333,106 @@ private static void verifyCheckpointExistOrWaitDeleted(
                     Thread.sleep(500L);
                     waited += 500L;
                     // Or timeout
-                    assertThat(waited).isLessThan(DELETE_TIMEOUT_MILLS);
+                    if (waited >= DELETE_TIMEOUT_MILLS) {
+                        assertThat(fs.exists(checkpointDir)).isFalse();
+                        assertThat(fs.listStatus(sharedFile)).isNullOrEmpty();
+                        
assertThat(fs.listStatus(taskOwnedFile)).isNullOrEmpty();
+                    }
                 }
             }
         }
     }
+
+    /**
+     * Traverse the checkpoint metadata and verify all the state handle is 
disposed.
+     *
+     * @param metadata the metadata to traverse.
+     * @return true if all corresponding files are deleted.
+     */
+    private static boolean verifyCheckpointDisposed(CheckpointMetadata 
metadata) {
+        AtomicBoolean disposed = new AtomicBoolean(true);
+        for (OperatorState operatorState : metadata.getOperatorStates()) {
+            for (OperatorSubtaskState subtaskState : 
operatorState.getStates()) {
+                // Check keyed state handle
+                List<KeyedStateHandle> keyedStateHandles =
+                        new ArrayList<>(subtaskState.getManagedKeyedState());
+                for (KeyedStateHandle keyedStateHandle : keyedStateHandles) {
+                    assertThat(keyedStateHandle)
+                            
.isInstanceOf(IncrementalRemoteKeyedStateHandle.class);
+                    ((IncrementalRemoteKeyedStateHandle) keyedStateHandle)
+                            .streamSubHandles()
+                            .forEach(
+                                    handle -> {
+                                        try {
+                                            if (handle instanceof 
FileStateHandle) {

Review Comment:
   We check whether the managed directories are deleted as a whole in line 
382~405, which is enough I think.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to