gaoyunhaii commented on a change in pull request #13377: URL: https://github.com/apache/flink/pull/13377#discussion_r492593704
########## File path: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java ########## @@ -357,6 +364,126 @@ private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path p } isClosed = dfs.isFileClosed(path); } + // [FLINK-18592] recover lease after the lease timeout passed but file was still not closed + if(!isClosed && !deadline.hasTimeLeft()){ + recoverLease(path, dfs); Review comment: I think we might merge this process with the original lease recovering logic. ########## File path: flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java ########## @@ -357,6 +364,126 @@ private static boolean waitUntilLeaseIsRevoked(final FileSystem fs, final Path p } isClosed = dfs.isFileClosed(path); } + // [FLINK-18592] recover lease after the lease timeout passed but file was still not closed + if(!isClosed && !deadline.hasTimeLeft()){ + recoverLease(path, dfs); + } return isClosed; } + + + /* + * Run the dfs recover lease. recoverLease is asynchronous. It returns: -false when it starts the lease recovery (i.e. lease recovery not *yet* done) - true when the lease recovery has + * succeeded or the file is closed. + * + * But, we have to be careful. Each time we call recoverLease, it starts the recover lease process over from the beginning. We could put ourselves in a situation + * where we are doing nothing but starting a recovery, interrupting it to start again, and so on. + * + * The namenode will try to recover the lease on the file's primary node. If all is well, it should return near immediately. + * But, as is common, it is the very primary node that has crashed and so the namenode will be stuck waiting on a socket timeout before it will ask another datanode to start the recovery. + * It does not help if we call recoverLease in the meantime and in particular, subsequent to the socket timeout, a recoverLease invocation will cause us to start over from square one + * (possibly waiting on socket timeout against primary node). + * So, in the below, we do the following: + * 1. Call recoverLease. + * 2. If it returns true, break. + * 3. If it returns false, wait a few seconds and then call it again. + * 4. If it returns true, break. + * 5. If it returns false, wait for what we think the datanode socket timeout is (configurable) and then try again. + * 6. If it returns true, break. + * 7. If it returns false, repeat starting at step 5. above. If HDFS-4525 is available, call it every second and we might be able to exit early. + */ + private static boolean recoverLease(Path path, DistributedFileSystem dfs) throws IOException { + LOG.info("Recover lease on dfs file " + path); + long startWaiting = System.currentTimeMillis(); + // Default is 15 minutes. It's huge, but the idea is that if we have a major issue, HDFS + // usually needs 10 minutes before marking the nodes as dead. So we're putting ourselves + // beyond that limit 'to be safe'. + //Configuration conf = dfs.getConf(); + long recoveryTimeout = HdfsConstants.LEASE_HARDLIMIT_PERIOD / 4; + long recoveryTargetTimeout = recoveryTimeout + startWaiting; + // This setting should be a little bit above what the cluster dfs heartbeat is set to. + long firstPause = 4000L; + long pause = 1000L; + // This should be set to how long it'll take for us to timeout against primary datanode if it + // is dead. We set it to 64 seconds, 4 second than the default READ_TIMEOUT in HDFS, the + // default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still failing after this + // timeout, then further recovery will take liner backoff with this base, to avoid endless + // preemptions when this value is not properly configured. + long subsequentPauseBase = HdfsConstants.LEASE_SOFTLIMIT_PERIOD; + + Method isFileClosedMeth = null; + // whether we need to look for isFileClosed method + boolean findIsFileClosedMeth = true; + boolean recovered = false; + // We break the loop if we succeed the lease recovery, timeout, or we throw an exception. + for (int nbAttempt = 0; !recovered; nbAttempt++) { + recovered = recoverLease(dfs, nbAttempt, path, startWaiting); + if (recovered) { + break; + } + if (recoveryTargetTimeout < System.currentTimeMillis()) { + LOG.warn("Cannot recoverLease after trying for " + + recoveryTimeout + + "ms (hbase.lease.recovery.timeout); continuing, but may be DATALOSS!!!; " + + String.format("attempt=%d, on file=%s, after %d ms", nbAttempt, path.toString(), System.currentTimeMillis() - startWaiting)); + break; + } + try { + // On the first time through wait the short 'firstPause'. + if (nbAttempt == 0) { + Thread.sleep(firstPause); + } else { + // Cycle here until (subsequentPause * nbAttempt) elapses. While spinning, check + // isFileClosed if available (should be in hadoop 2.0.5... not in hadoop 1 though. + long localStartWaiting = System.currentTimeMillis(); + while ((System.currentTimeMillis() - localStartWaiting) < subsequentPauseBase * + nbAttempt) { + Thread.sleep(pause); + if (findIsFileClosedMeth) { Review comment: I think Flink has always assumes `close` method is always available. This is due to Flink should limited the minimum version to support. Thus we could remove the logic to detect the close method. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org