ZanderXu commented on code in PR #16927:
URL: https://github.com/apache/flink/pull/16927#discussion_r959323071


##########
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java:
##########
@@ -360,4 +391,37 @@ private static boolean waitUntilLeaseIsRevoked(final 
FileSystem fs, final Path p
         }
         return isClosed;
     }
+
+    /**
+     * If the last block of the file that the previous execution was writing 
to is not in COMPLETE
+     * state, HDFS will perform block recovery which blocks truncate. Thus we 
have to wait for block
+     * recovery to ensure the truncate is successful.
+     */
+    private static boolean waitForBlockRecovery(final FileSystem fs, final 
Path path)

Review Comment:
   I have one question, why not judge this by `isFileClosed(path)`, such as:
   ```
   final Deadline deadline = 
Deadline.now().plus(Duration.ofMillis(LEASE_TIMEOUT));
   boolean isClosed = dfs.isFileClosed(path);
   while (!isClosed && deadline.hasTimeLeft()) {
     try {
            Thread.sleep(500L);
       } catch (InterruptedException e1) {
            LOG.warn("Interrupted when waiting for block recovery for file 
{}.", path, e1);
            break;
        }
        isClosed = dfs.isFileClosed(path);
     }
    return isClosed;
   ```
   
   If the path is a very large file, `LocatedBlocks blocks = 
dfs.getClient().getLocatedBlocks(absolutePath, 0, Long.MAX_VALUE);` will be 
expensive.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to