Zakelly commented on code in PR #25090: URL: https://github.com/apache/flink/pull/25090#discussion_r1678670379
########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java: ########## @@ -319,9 +333,106 @@ private static void verifyCheckpointExistOrWaitDeleted( Thread.sleep(500L); waited += 500L; // Or timeout - assertThat(waited).isLessThan(DELETE_TIMEOUT_MILLS); + if (waited >= DELETE_TIMEOUT_MILLS) { + assertThat(fs.exists(checkpointDir)).isFalse(); + assertThat(fs.listStatus(sharedFile)).isNullOrEmpty(); + assertThat(fs.listStatus(taskOwnedFile)).isNullOrEmpty(); + } } } } } + + /** + * Traverse the checkpoint metadata and verify all the state handle is disposed. + * + * @param metadata the metadata to traverse. + * @return true if all corresponding files are deleted. + */ + private static boolean verifyCheckpointDisposed(CheckpointMetadata metadata) { + AtomicBoolean disposed = new AtomicBoolean(true); + for (OperatorState operatorState : metadata.getOperatorStates()) { + for (OperatorSubtaskState subtaskState : operatorState.getStates()) { + // Check keyed state handle + List<KeyedStateHandle> keyedStateHandles = + new ArrayList<>(subtaskState.getManagedKeyedState()); + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + assertThat(keyedStateHandle) + .isInstanceOf(IncrementalRemoteKeyedStateHandle.class); + ((IncrementalRemoteKeyedStateHandle) keyedStateHandle) + .streamSubHandles() + .forEach( + handle -> { + try { + if (handle instanceof FileStateHandle) { Review Comment: We check whether the managed directories are deleted as a whole in line 382~405, which is enough I think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org