link3280 opened a new pull request #16927:
URL: https://github.com/apache/flink/pull/16927


   ## What is the purpose of the change
   
   In the case of HDFS, upon job recovery, StreamingFileSink would not wait for 
lease recoveries to complete before truncating a file (now it would try to 
truncate the file after a timeout, no matter if the lease is revoked or not). 
This may lead to an IOException because the file length could be behind the 
actual length and the checkpointed length. What's worse, the job may fall into 
an endless restart loop, because a new invoke of #recoverLease will interrupt 
the previous one (see [HBase's 
RecoverLeaseFSUtils](https://github.com/apache/hbase/blob/a9a1b9524daa9e33541c655620b9c07d5a93d533/hbase-asyncfs/src/main/java/org/apache/hadoop/hbase/util/RecoverLeaseFSUtils.java#L68)).
   
   Moreover, we should wait for block recoveries which may be triggered by 
truncate calls (as mentioned in [Hadoop FileSystem 
Javadoc](https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html#truncate)),
 before appending to the recovered files.
   
   This PR fixes the problem, but with two hard-coded timeout thresholds since 
it requires interfaces changes to make these timeouts configurable. If the 
lease recovery of the block recovery fails, an IOException would be thrown, 
which triggers a restart of the job.
   
   ## Brief change log
   
   - Wait for lease recoveries to complete before truncating in-progress files.
   - Wait for possible block recoveries to complete before appending to 
recovered files.
   
   ## Verifying this change
   
   I simply tested it on an HDFS cluster with 3 nodes, but it may require 
further tests.
   
   This change added tests and can be verified as follows:
   
   1. Start a Flink job writing recoverable HDFS files.
   2. Manually kill a DateNode which StreamingFileSink is writing to (to 
trigger lease recovery and block recovery).
   3. Restart the job from the latest successful checkpoint. The files should 
be properly recovered.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to