ZanderXu commented on code in PR #16927: URL: https://github.com/apache/flink/pull/16927#discussion_r959323071
########## flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java: ########## @@ -360,4 +391,37 @@ private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path p } return isClosed; } + + /** + * If the last block of the file that the previous execution was writing to is not in COMPLETE + * state, HDFS will perform block recovery which blocks truncate. Thus we have to wait for block + * recovery to ensure the truncate is successful. + */ + private static boolean waitForBlockRecovery(final FileSystem fs, final Path path) Review Comment: I have one question, why not judge this by `isFileClosed(path)`, such as: ``` final Deadline deadline = Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT)); boolean isClosed = dfs.isFileClosed(path); while (!isClosed && deadline.hasTimeLeft()) { try { Thread.sleep(500L); } catch (InterruptedException e1) { LOG.warn("Interrupted when waiting for block recovery for file {}.", path, e1); break; } isClosed = dfs.isFileClosed(path); } return isClosed; ``` If the path is a very large file, `LocatedBlocks blocks = dfs.getClient().getLocatedBlocks(absolutePath, 0, Long.MAX_VALUE);` will be expensive. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org