gaoyunhaii commented on a change in pull request #13377:
URL: https://github.com/apache/flink/pull/13377#discussion_r492593704



##########
File path: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
##########
@@ -357,6 +364,126 @@ private static boolean waitUntilLeaseIsRevoked(final 
FileSystem fs, final Path p
                        }
                        isClosed = dfs.isFileClosed(path);
                }
+               // [FLINK-18592] recover lease after the lease timeout passed 
but file was still not closed
+               if(!isClosed && !deadline.hasTimeLeft()){
+                       recoverLease(path, dfs);

Review comment:
       I think we might merge this process with the original lease recovering 
logic. 

##########
File path: 
flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
##########
@@ -357,6 +364,126 @@ private static boolean waitUntilLeaseIsRevoked(final 
FileSystem fs, final Path p
                        }
                        isClosed = dfs.isFileClosed(path);
                }
+               // [FLINK-18592] recover lease after the lease timeout passed 
but file was still not closed
+               if(!isClosed && !deadline.hasTimeLeft()){
+                       recoverLease(path, dfs);
+               }
                return isClosed;
        }
+
+
+       /*
+        * Run the dfs recover lease. recoverLease is asynchronous. It returns: 
-false when it starts the lease recovery (i.e. lease recovery not *yet* done) - 
true when the lease recovery has
+        * succeeded or the file is closed.
+        *
+        * But, we have to be careful. Each time we call recoverLease, it 
starts the recover lease process over from the beginning. We could put 
ourselves in a situation
+        * where we are doing nothing but starting a recovery, interrupting it 
to start again, and so on.
+        *
+        * The namenode will try to recover the lease on the file's primary 
node. If all is well, it should return near immediately.
+        * But, as is common, it is the very primary node that has crashed and 
so the namenode will be stuck waiting on a socket timeout before it will ask 
another datanode to start the recovery.
+        * It does not help if we call recoverLease in the meantime and in 
particular, subsequent to the socket timeout, a recoverLease invocation will 
cause us to start over from square one
+        * (possibly waiting on socket timeout against primary node).
+        * So, in the below, we do the following:
+        * 1. Call recoverLease.
+        * 2. If it returns true, break.
+        * 3. If it returns false, wait a few seconds and then call it again.
+        * 4. If it returns true, break.
+        * 5. If it returns false, wait for what we think the datanode socket 
timeout is (configurable) and then try again.
+        * 6. If it returns true, break.
+        * 7. If it returns false, repeat starting at step 5. above. If 
HDFS-4525 is available, call it every second and we might be able to exit early.
+        */
+       private static boolean recoverLease(Path path, DistributedFileSystem 
dfs) throws IOException {
+               LOG.info("Recover lease on dfs file " + path);
+               long startWaiting = System.currentTimeMillis();
+               // Default is 15 minutes. It's huge, but the idea is that if we 
have a major issue, HDFS
+               // usually needs 10 minutes before marking the nodes as dead. 
So we're putting ourselves
+               // beyond that limit 'to be safe'.
+               //Configuration conf = dfs.getConf();
+               long recoveryTimeout = HdfsConstants.LEASE_HARDLIMIT_PERIOD / 4;
+               long recoveryTargetTimeout = recoveryTimeout + startWaiting;
+               // This setting should be a little bit above what the cluster 
dfs heartbeat is set to.
+               long firstPause = 4000L;
+               long pause = 1000L;
+               // This should be set to how long it'll take for us to timeout 
against primary datanode if it
+               // is dead. We set it to 64 seconds, 4 second than the default 
READ_TIMEOUT in HDFS, the
+               // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery 
is still failing after this
+               // timeout, then further recovery will take liner backoff with 
this base, to avoid endless
+               // preemptions when this value is not properly configured.
+               long subsequentPauseBase = HdfsConstants.LEASE_SOFTLIMIT_PERIOD;
+
+               Method isFileClosedMeth = null;
+               // whether we need to look for isFileClosed method
+               boolean findIsFileClosedMeth = true;
+               boolean recovered = false;
+               // We break the loop if we succeed the lease recovery, timeout, 
or we throw an exception.
+               for (int nbAttempt = 0; !recovered; nbAttempt++) {
+                       recovered = recoverLease(dfs, nbAttempt, path, 
startWaiting);
+                       if (recovered) {
+                               break;
+                       }
+                       if (recoveryTargetTimeout < System.currentTimeMillis()) 
{
+                               LOG.warn("Cannot recoverLease after trying for 
" +
+                                       recoveryTimeout +
+                                       "ms (hbase.lease.recovery.timeout); 
continuing, but may be DATALOSS!!!; " +
+                                       String.format("attempt=%d, on file=%s, 
after %d ms", nbAttempt, path.toString(), System.currentTimeMillis() - 
startWaiting));
+                               break;
+                       }
+                       try {
+                               // On the first time through wait the short 
'firstPause'.
+                               if (nbAttempt == 0) {
+                                       Thread.sleep(firstPause);
+                               } else {
+                                       // Cycle here until (subsequentPause * 
nbAttempt) elapses. While spinning, check
+                                       // isFileClosed if available (should be 
in hadoop 2.0.5... not in hadoop 1 though.
+                                       long localStartWaiting = 
System.currentTimeMillis();
+                                       while ((System.currentTimeMillis() - 
localStartWaiting) < subsequentPauseBase *
+                                               nbAttempt) {
+                                               Thread.sleep(pause);
+                                               if (findIsFileClosedMeth) {

Review comment:
       I think Flink has always assumes `close` method is always available. 
This is due to Flink should limited the minimum version to support. Thus we 
could remove the logic to detect the close method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to