fredia commented on code in PR #25090: URL: https://github.com/apache/flink/pull/25090#discussion_r1677754547
########## flink-tests/src/test/java/org/apache/flink/test/checkpointing/SnapshotFileMergingCompatibilityITCase.java: ########## @@ -319,9 +333,106 @@ private static void verifyCheckpointExistOrWaitDeleted( Thread.sleep(500L); waited += 500L; // Or timeout - assertThat(waited).isLessThan(DELETE_TIMEOUT_MILLS); + if (waited >= DELETE_TIMEOUT_MILLS) { + assertThat(fs.exists(checkpointDir)).isFalse(); + assertThat(fs.listStatus(sharedFile)).isNullOrEmpty(); + assertThat(fs.listStatus(taskOwnedFile)).isNullOrEmpty(); + } } } } } + + /** + * Traverse the checkpoint metadata and verify all the state handle is disposed. + * + * @param metadata the metadata to traverse. + * @return true if all corresponding files are deleted. + */ + private static boolean verifyCheckpointDisposed(CheckpointMetadata metadata) { + AtomicBoolean disposed = new AtomicBoolean(true); + for (OperatorState operatorState : metadata.getOperatorStates()) { + for (OperatorSubtaskState subtaskState : operatorState.getStates()) { + // Check keyed state handle + List<KeyedStateHandle> keyedStateHandles = + new ArrayList<>(subtaskState.getManagedKeyedState()); + for (KeyedStateHandle keyedStateHandle : keyedStateHandles) { + assertThat(keyedStateHandle) + .isInstanceOf(IncrementalRemoteKeyedStateHandle.class); + ((IncrementalRemoteKeyedStateHandle) keyedStateHandle) + .streamSubHandles() + .forEach( + handle -> { + try { + if (handle instanceof FileStateHandle) { Review Comment: Should `SegmentFileStateHandle` also be checked here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org