link3280 opened a new pull request #16927: URL: https://github.com/apache/flink/pull/16927
## What is the purpose of the change In the case of HDFS, upon job recovery, StreamingFileSink would not wait for lease recoveries to complete before truncating a file (now it would try to truncate the file after a timeout, no matter if the lease is revoked or not). This may lead to an IOException because the file length could be behind the actual length and the checkpointed length. What's worse, the job may fall into an endless restart loop, because a new invoke of #recoverLease will interrupt the previous one (see [HBase's RecoverLeaseFSUtils](https://github.com/apache/hbase/blob/a9a1b9524daa9e33541c655620b9c07d5a93d533/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java#L68)). Moreover, we should wait for block recoveries which may be triggered by truncate calls (as mentioned in [Hadoop FileSystem Javadoc](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#truncate)), before appending to the recovered files. This PR fixes the problem, but with two hard-coded timeout thresholds since it requires interfaces changes to make these timeouts configurable. If the lease recovery of the block recovery fails, an IOException would be thrown, which triggers a restart of the job. ## Brief change log - Wait for lease recoveries to complete before truncating in-progress files. - Wait for possible block recoveries to complete before appending to recovered files. ## Verifying this change I simply tested it on an HDFS cluster with 3 nodes, but it may require further tests. This change added tests and can be verified as follows: 1. Start a Flink job writing recoverable HDFS files. 2. Manually kill a DateNode which StreamingFileSink is writing to (to trigger lease recovery and block recovery). 3. Restart the job from the latest successful checkpoint. The files should be properly recovered. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org