This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch PHOENIX-7562-feature
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/PHOENIX-7562-feature by this
push:
new 77b679ce91 Phoenix-7672 Handle Unclosed file via HDFS Lease Recovery
in ReplicationLogReplay
77b679ce91 is described below
commit 77b679ce91cf48abb57f034d3f5aa16f6a6c8f7f
Author: Himanshu Gwalani <[email protected]>
AuthorDate: Fri Sep 19 02:50:27 2025 +0530
Phoenix-7672 Handle Unclosed file via HDFS Lease Recovery in
ReplicationLogReplay
---
phoenix-client-parent/pom.xml | 6 +
phoenix-core-server/pom.xml | 8 +
.../replication/reader/RecoverLeaseFSUtils.java | 333 +++++++++++++++++++++
.../reader/ReplicationLogProcessor.java | 82 ++++-
phoenix-core/pom.xml | 5 +
.../reader/RecoverLeaseFSUtilsTest.java | 169 +++++++++++
.../reader/ReplicationLogProcessorTest.java | 7 +-
phoenix-mapreduce-byo-shaded-hbase/pom.xml | 6 +
8 files changed, 598 insertions(+), 18 deletions(-)
diff --git a/phoenix-client-parent/pom.xml b/phoenix-client-parent/pom.xml
index 0b3f15b8df..3cef52db14 100644
--- a/phoenix-client-parent/pom.xml
+++ b/phoenix-client-parent/pom.xml
@@ -112,6 +112,12 @@
<exclude>hbase-webapps/**</exclude>
</excludes>
</filter>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>csv-bulk-load-config.properties</exclude>
+ </excludes>
+ </filter>
<!-- Phoenix specific -->
</filters>
<transformers>
diff --git a/phoenix-core-server/pom.xml b/phoenix-core-server/pom.xml
index 42fb746f65..7f74c6eda1 100644
--- a/phoenix-core-server/pom.xml
+++ b/phoenix-core-server/pom.xml
@@ -59,6 +59,14 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ </dependency>
<!-- HBase dependencies -->
<dependency>
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
new file mode 100644
index 0000000000..23adb8509d
--- /dev/null
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtils.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.LeaseRecoverable;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods for recovering file lease for hdfs.
+ */
+public class RecoverLeaseFSUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RecoverLeaseFSUtils.class);
+
+ /**
+ * The lease recovery timeout in milliseconds.
+ *
+ * Default is 15 minutes. It's huge, but the idea is that if we have a
major issue, HDFS
+ * usually needs 10 minutes before marking the nodes as dead. So we're
putting ourselves
+ * beyond that limit 'to be safe'.
+ */
+ public static final String
REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND =
+ "phoenix.replication.replay.lease.recovery.timeout.millis";
+
+ public static final long
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND = 900000;
+
+ /**
+ * The first pause before retrying the lease recovery.
+ * This setting should be a little above what the cluster dfs heartbeat is
set to.
+ */
+ public static final String
REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND =
+ "phoenix.replication.replay.lease.recovery.first.pause.mills";
+
+ public static final long
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND =
+ 4000;
+
+ /**
+ * The pause between subsequent retries of the lease recovery.
+ */
+ public static final String
REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND =
+ "phoenix.replication.replay.lease.recovery.pause.mills";
+
+ public static final long
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND = 1000;
+
+ /**
+ * The dfs timeout during lease recovery in milliseconds.
+ *
+ * This should be set to how long it'll take for us to timeout against
primary datanode if it
+ * is dead. We set it to 64 seconds, 4 seconds than the default
READ_TIMEOUT in HDFS, the
+ * default value for DFS_CLIENT_SOCKET_TIMEOUT_KEY. If recovery is still
failing after this
+ * timeout, then further recovery will take liner backoff with this base,
to avoid endless
+ * preemptions when this value is not properly configured.
+ */
+ public static final String
REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND =
+ "phoenix.replication.replay.lease.recovery.dfs.timeout.mills";
+
+ public static final long
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND =
+ 64 * 1000;
+
+
+ private static Class<?> leaseRecoverableClazz = null;
+ private static Method recoverLeaseMethod = null;
+ public static final String LEASE_RECOVERABLE_CLASS_NAME =
+ LeaseRecoverable.class.getCanonicalName();
+ // static {
+ // } "org.apache.hadoop.fs.LeaseRecoverable";
+ static {
+ LOG.debug("Initialize RecoverLeaseFSUtils");
+ initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
+ }
+
+ /**
+ * Initialize reflection classes and methods. If LeaseRecoverable class is
not found, look for
+ * DistributedFilSystem#recoverLease method.
+ */
+ static void initializeRecoverLeaseMethod(String className) {
+ try {
+ leaseRecoverableClazz = Class.forName(className);
+ recoverLeaseMethod =
leaseRecoverableClazz.getMethod("recoverLease", Path.class);
+ LOG.debug("set recoverLeaseMethod to " + className +
".recoverLease()");
+ } catch (ClassNotFoundException e) {
+ LOG.debug("LeaseRecoverable interface not in the classpath, "
+ + "this means Hadoop 3.3.5 or below.");
+ try {
+ recoverLeaseMethod =
DistributedFileSystem.class.getMethod("recoverLease",
+ Path.class);
+ } catch (NoSuchMethodException ex) {
+ LOG.error("Cannot find recoverLease method in
DistributedFileSystem class. "
+ + "It should never happen. Abort.", ex);
+ throw new RuntimeException(ex);
+ }
+ } catch (NoSuchMethodException e) {
+ LOG.error("Cannot find recoverLease method in LeaseRecoverable
class. "
+ + "It should never happen. Abort.", e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ private RecoverLeaseFSUtils() {
+ }
+
+ public static void recoverFileLease(FileSystem fs, Path p, Configuration
conf)
+ throws IOException {
+ recoverFileLease(fs, p, conf, null);
+ }
+
+ /**
+ * Recover the lease from Hadoop file system, retrying multiple times.
+ */
+ public static void recoverFileLease(FileSystem fs, Path p, Configuration
conf,
+ CancelableProgressable reporter)
throws IOException {
+ if (fs instanceof FilterFileSystem) {
+ fs = ((FilterFileSystem) fs).getRawFileSystem();
+ }
+
+ // lease recovery not needed for local file system case.
+ if (isLeaseRecoverable(fs)) {
+ recoverDFSFileLease(fs, p, conf, reporter);
+ }
+ }
+
+ public static boolean isLeaseRecoverable(FileSystem fs) {
+ // return true if HDFS.
+ if (fs instanceof DistributedFileSystem) {
+ return true;
+ }
+ // return true if the file system implements LeaseRecoverable
interface.
+ if (leaseRecoverableClazz != null) {
+ return leaseRecoverableClazz.isAssignableFrom(fs.getClass());
+ }
+ // return false if the file system is not HDFS and does not implement
LeaseRecoverable.
+ return false;
+ }
+
+ /**
+ * Run the dfs recover lease. recoverLease is asynchronous. It returns:
-false when it starts
+ * the lease recovery (i.e. lease recovery not *yet* done) - true when the
lease recovery
+ * has succeeded or the file is closed. But, we have to be careful. Each
time we call
+ * recoverLease, it starts the recover lease process over from the
beginning.
+ * We could put ourselves in a situation where we are doing nothing but
starting a recovery,
+ * interrupting it to start again, and so on.
+ * The findings over in HBASE-8354 have it that the namenode will try to
recover the lease on
+ * the file's primary node. If all is well, it should return near
immediately. But, as is
+ * common, it is the very primary node that has crashed and so the
namenode will be stuck
+ * waiting on a socket timeout before it will ask another datanode to
start the recovery.
+ * It does not help if we call recoverLease in the meantime and in
particular, after the
+ * socket timeout, a recoverLease invocation will cause us to start over
from square one
+ * (possibly waiting on socket timeout against primary node). So, in the
below, we do the
+ * following: 1. Call recoverLease. 2. If it returns true, break. 3. If it
returns false,
+ * wait a few seconds and then call it again. 4. If it returns true,
break. 5. If it returns
+ * false, wait for what we think the datanode socket timeout is
(configurable) and then try
+ * again. 6. If it returns true, break. 7. If it returns false, repeat
starting at step 5.
+ * above. If HDFS-4525 is available, call it every second, and we might be
able to exit early.
+ */
+ private static boolean recoverDFSFileLease(final FileSystem dfs, final
Path p,
+ final Configuration conf,
+ final CancelableProgressable
reporter)
+ throws IOException {
+ LOG.info("Recover lease on dfs file " + p);
+
+ long startWaiting = EnvironmentEdgeManager.currentTime();
+ long recoveryTimeout =
conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND,
+ DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND)
+ startWaiting;
+ long firstPause =
conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND,
+
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND);
+ long subsequentPauseBase = conf.getLong(
+ REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
+
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND);
+
+ Method isFileClosedMeth = null;
+ // whether we need to look for isFileClosed method
+ boolean findIsFileClosedMeth = true;
+ boolean recovered = false;
+ // We break the loop if we succeed the lease recovery, timeout, or we
throw an exception.
+ for (int nbAttempt = 0; !recovered; nbAttempt++) {
+ recovered = recoverLease(dfs, nbAttempt, p, startWaiting);
+ if (recovered) {
+ break;
+ }
+ checkIfCancelled(reporter);
+ if (checkIfTimedout(conf, recoveryTimeout, nbAttempt, p,
startWaiting)) {
+ break;
+ }
+ try {
+ // On the first time through wait the short 'firstPause'.
+ if (nbAttempt == 0) {
+ Thread.sleep(firstPause);
+ } else {
+ // Cycle here until (subsequentPause * nbAttempt) elapses.
While spinning,
+ // check isFileClosed if available (should be in hadoop
2.0.5...
+ // not in hadoop 1 though).
+ long localStartWaiting =
EnvironmentEdgeManager.currentTime();
+ while (
+ (EnvironmentEdgeManager.currentTime() -
localStartWaiting)
+ < subsequentPauseBase * nbAttempt
+ ) {
+ Thread.sleep(conf.getLong(
+
REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND,
+
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND));
+ if (findIsFileClosedMeth) {
+ try {
+ isFileClosedMeth =
dfs.getClass().getMethod("isFileClosed",
+ new Class[] { Path.class });
+ } catch (NoSuchMethodException nsme) {
+ LOG.debug("isFileClosed not available");
+ } finally {
+ findIsFileClosedMeth = false;
+ }
+ }
+ if (isFileClosedMeth != null && isFileClosed(dfs,
isFileClosedMeth, p)) {
+ recovered = true;
+ break;
+ }
+ checkIfCancelled(reporter);
+ }
+ }
+ } catch (InterruptedException ie) {
+ InterruptedIOException iioe = new InterruptedIOException();
+ iioe.initCause(ie);
+ throw iioe;
+ }
+ }
+ return recovered;
+ }
+
+ private static boolean checkIfTimedout(final Configuration conf, final
long recoveryTimeout,
+ final int nbAttempt, final Path p,
+ final long startWaiting) {
+ if (recoveryTimeout < EnvironmentEdgeManager.currentTime()) {
+ LOG.warn("Cannot recoverLease after trying for "
+ +
conf.getLong(REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND,
+
DEFAULT_REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND)
+ + "ms " +
REPLICATION_REPLAY_LEASE_RECOVERY_TIMEOUT_MILLISECOND
+ + " continuing, but may be DATALOSS!!!; "
+ + getLogMessageDetail(nbAttempt, p, startWaiting));
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Try to recover the lease.
+ * @return True if dfs#recoverLease came by true.
+ */
+ private static boolean recoverLease(final FileSystem dfs, final int
nbAttempt, final Path p,
+ final long startWaiting) throws
FileNotFoundException {
+ boolean recovered = false;
+ try {
+ recovered = (Boolean) recoverLeaseMethod.invoke(dfs, p);
+ LOG.info((recovered ? "Recovered lease, " : "Failed to recover
lease, ")
+ + getLogMessageDetail(nbAttempt, p, startWaiting));
+ } catch (InvocationTargetException ite) {
+ final Throwable e = ite.getCause();
+ if (e instanceof LeaseExpiredException
+ && e.getMessage().contains("File does not exist")) {
+ // This exception comes out instead of FNFE, fix it
+ throw new FileNotFoundException("The given replication log
wasn't found at " + p);
+ } else if (e instanceof FileNotFoundException) {
+ throw (FileNotFoundException) e;
+ }
+ LOG.warn(getLogMessageDetail(nbAttempt, p, startWaiting), e);
+ } catch (IllegalAccessException e) {
+ LOG.error("Failed to call recoverLease on {}. Abort.", dfs, e);
+ throw new RuntimeException(e);
+ }
+ return recovered;
+ }
+
+ /**
+ * Returns Detail to append to any log message around lease recovering.
+ */
+ private static String getLogMessageDetail(final int nbAttempt, final Path
p,
+ final long startWaiting) {
+ return "attempt=" + nbAttempt + " on file=" + p + " after "
+ + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms";
+ }
+
+ /**
+ * Call HDFS-4525 isFileClosed if it is available.
+ * @return True if file is closed.
+ */
+ private static boolean isFileClosed(final FileSystem dfs, final Method m,
final Path p) {
+ try {
+ return (Boolean) m.invoke(dfs, p);
+ } catch (SecurityException e) {
+ LOG.warn("No access", e);
+ } catch (Exception e) {
+ LOG.warn("Failed invocation for " + p.toString(), e);
+ }
+ return false;
+ }
+
+ private static void checkIfCancelled(final CancelableProgressable reporter)
+ throws InterruptedIOException {
+ if (reporter == null) {
+ return;
+ }
+ if (!reporter.progress()) {
+ throw new InterruptedIOException("Operation cancelled");
+ }
+ }
+
+}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
index 73dbfdbdf9..4a47779d70 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/replication/reader/ReplicationLogProcessor.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -31,6 +32,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LeaseRecoverable;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -40,6 +42,7 @@ import org.apache.hadoop.hbase.client.AsyncTable;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.phoenix.replication.log.InvalidLogTrailerException;
import org.apache.phoenix.replication.log.LogFile;
import org.apache.phoenix.replication.log.LogFileReader;
import org.apache.phoenix.replication.log.LogFileReaderContext;
@@ -168,7 +171,7 @@ public class ReplicationLogProcessor implements Closeable {
*/
public static ReplicationLogProcessor get(Configuration conf, String
haGroupName) {
return INSTANCES.computeIfAbsent(haGroupName,
- k -> new ReplicationLogProcessor(conf, haGroupName));
+ k -> new ReplicationLogProcessor(conf, haGroupName));
}
/**
@@ -194,9 +197,9 @@ public class ReplicationLogProcessor implements Closeable {
decorateConf();
this.metrics = createMetricsSource();
this.executorService = Executors.newFixedThreadPool(threadPoolSize,
- new ThreadFactoryBuilder()
- .setNameFormat("Phoenix-Replication-Log-Processor-" +
haGroupName + "-%d")
- .build());
+ new ThreadFactoryBuilder()
+ .setNameFormat("Phoenix-Replication-Log-Processor-" +
haGroupName + "-%d")
+ .build());
}
/**
@@ -233,7 +236,15 @@ public class ReplicationLogProcessor implements Closeable {
try {
// Create the LogFileReader for given path
- logFileReader = createLogFileReader(fs, filePath);
+ Optional<LogFileReader> logFileReaderOptional =
createLogFileReader(fs, filePath);
+
+ if (!logFileReaderOptional.isPresent()) {
+ // This is an empty file, assume processed successfully and
return
+ LOG.warn("Found empty file to process {}", filePath);
+ return;
+ }
+
+ logFileReader = logFileReaderOptional.get();
for (LogFile.Record record : logFileReader) {
final TableName tableName =
TableName.valueOf(record.getHBaseTableName());
@@ -287,23 +298,60 @@ public class ReplicationLogProcessor implements Closeable
{
* @return A configured LogFileReader instance
* @throws IOException if the file doesn't exist or initialization fails
*/
- protected LogFileReader createLogFileReader(FileSystem fs, Path filePath)
throws IOException {
+ protected Optional<LogFileReader> createLogFileReader(FileSystem fs, Path
filePath)
+ throws IOException {
// Ensure that file exists. If we face exception while checking the
path itself,
// method would throw same exception back to the caller
if (!fs.exists(filePath)) {
throw new IOException("Log file does not exist: " + filePath);
}
LogFileReader logFileReader = new LogFileReader();
- try {
- LogFileReaderContext logFileReaderContext = new
LogFileReaderContext(conf)
- .setFileSystem(fs).setFilePath(filePath);
+ LogFileReaderContext logFileReaderContext = new
LogFileReaderContext(conf)
+ .setFileSystem(fs).setFilePath(filePath);
+ boolean isClosed = isFileClosed(fs, filePath);
+ if (isClosed) {
+ // As file is closed, ensure that the file has a valid header and
trailer
logFileReader.init(logFileReaderContext);
- } catch (IOException exception) {
- LOG.error("Failed to initialize new LogFileReader for path {}",
- filePath, exception);
- throw exception;
+ return Optional.of(logFileReader);
+ } else {
+ LOG.warn("Found un-closed file {}. Starting lease recovery.",
filePath);
+ recoverLease(fs, filePath);
+ if (fs.getFileStatus(filePath).getLen() <= 0) {
+ // Found empty file, returning null LogReader
+ return Optional.empty();
+ }
+ try {
+ // Acquired the lease, try to create reader with validation
both header and trailer
+ logFileReader.init(logFileReaderContext);
+ return Optional.of(logFileReader);
+ } catch (InvalidLogTrailerException invalidLogTrailerException) {
+ // If trailer is missing or corrupt, create reader without
trailer validation
+ LOG.warn("Invalid Trailer for file {}",
+ filePath, invalidLogTrailerException);
+ logFileReaderContext.setValidateTrailer(false);
+ logFileReader.init(logFileReaderContext);
+ return Optional.of(logFileReader);
+ } catch (IOException exception) {
+ LOG.error("Failed to initialize new LogFileReader for path {}",
+ filePath, exception);
+ throw exception;
+ }
+ }
+ }
+
+ protected void recoverLease(final FileSystem fs, final Path filePath)
throws IOException {
+ RecoverLeaseFSUtils.recoverFileLease(fs, filePath, conf);
+ }
+
+ protected boolean isFileClosed(final FileSystem fs, final Path filePath)
throws IOException {
+ boolean isClosed;
+ try {
+ isClosed = ((LeaseRecoverable) fs).isFileClosed(filePath);
+ } catch (ClassCastException classCastException) {
+ // If filesystem is not of type LeaseRecoverable, assume file is
always closed
+ isClosed = true;
}
- return logFileReader;
+ return isClosed;
}
/**
@@ -353,7 +401,8 @@ public class ReplicationLogProcessor implements Closeable {
// Update current operations for next retry
currentOperations = result.getFailedMutations();
- lastError = new IOException("Failed to apply the mutations",
result.getException());
+ lastError = new IOException("Failed to apply the mutations",
+ result.getException());
} catch (IOException e) {
lastError = e;
}
@@ -427,7 +476,8 @@ public class ReplicationLogProcessor implements Closeable {
// Add failed mutations to retry list
failedOperations.put(tableName,
tableMutationMap.get(tableName));
getMetrics().incrementFailedMutationsCount(tableMutationMap.get(tableName).size());
- LOG.debug("Failed to apply mutations for table {}: {}",
tableName, e.getMessage());
+ LOG.debug("Failed to apply mutations for table {}: {}",
tableName,
+ e.getMessage());
lastException = e;
}
}
diff --git a/phoenix-core/pom.xml b/phoenix-core/pom.xml
index c049ab4331..1287d72f5f 100644
--- a/phoenix-core/pom.xml
+++ b/phoenix-core/pom.xml
@@ -244,6 +244,11 @@
<artifactId>hbase-compression-aircompressor</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hbase</groupId>
+ <artifactId>hbase-asyncfs</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- HBase Adjacent Dependencies -->
<dependency>
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
new file mode 100644
index 0000000000..6df8577095
--- /dev/null
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/RecoverLeaseFSUtilsTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.replication.reader;
+
+import static
org.apache.phoenix.replication.reader.RecoverLeaseFSUtils.LEASE_RECOVERABLE_CLASS_NAME;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.util.CancelableProgressable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.phoenix.end2end.ParallelStatsDisabledIT;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.mockito.Mockito;
+
+/**
+ * Test our recoverLease loop against mocked up filesystem.
+ */
+public class RecoverLeaseFSUtilsTest extends ParallelStatsDisabledIT {
+
+ @ClassRule
+ public static TemporaryFolder testFolder = new TemporaryFolder();
+
+ private static Configuration conf;
+ private static FileSystem localFs;
+ private static Path FILE;
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ conf = getUtility().getConfiguration();
+ localFs = FileSystem.getLocal(conf);
+
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_FIRST_PAUSE_MILLISECOND,
10);
+
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_PAUSE_MILLISECOND,
10);
+ FILE = new Path(testFolder.newFile("file.txt").toURI());
+ }
+
+ @AfterClass
+ public static void cleanUp() throws IOException {
+ localFs.delete(new Path(testFolder.getRoot().toURI()), true);
+ }
+
+ /**
+ * Test recover lease eventually succeeding.
+ */
+ @Test
+ public void testRecoverLease() throws IOException {
+ long startTime = EnvironmentEdgeManager.currentTime();
+
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
1000);
+ CancelableProgressable reporter =
Mockito.mock(CancelableProgressable.class);
+ Mockito.when(reporter.progress()).thenReturn(true);
+ DistributedFileSystem dfs = Mockito.mock(DistributedFileSystem.class);
+ // Fail four times and pass on the fifth.
+
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(false)
+ .thenReturn(false).thenReturn(true);
+ RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, conf, reporter);
+ Mockito.verify(dfs, Mockito.times(5)).recoverLease(FILE);
+ // Make sure we waited at least hbase.lease.recovery.dfs.timeout * 3
(the first two
+ // invocations will happen pretty fast... then we fall into the longer
wait loop).
+ assertTrue((EnvironmentEdgeManager.currentTime() - startTime)
+ > (3 *
conf.getLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
61000)));
+ }
+
+ /**
+ * Test that we can use reflection to access LeaseRecoverable methods.
+ */
+ @Test
+ public void testLeaseRecoverable() throws IOException {
+ try {
+ // set LeaseRecoverable to FakeLeaseRecoverable for testing
+
RecoverLeaseFSUtils.initializeRecoverLeaseMethod(FakeLeaseRecoverable.class.getName());
+ RecoverableFileSystem mockFS =
Mockito.mock(RecoverableFileSystem.class);
+ Mockito.when(mockFS.recoverLease(FILE)).thenReturn(true);
+ RecoverLeaseFSUtils.recoverFileLease(mockFS, FILE, conf);
+ Mockito.verify(mockFS, Mockito.times(1)).recoverLease(FILE);
+
assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(Mockito.mock(RecoverableFileSystem.class)));
+ } finally {
+
RecoverLeaseFSUtils.initializeRecoverLeaseMethod(LEASE_RECOVERABLE_CLASS_NAME);
+ }
+ }
+
+ /**
+ * Test that isFileClosed makes us recover lease faster.
+ */
+ @Test
+ public void testIsFileClosed() throws IOException {
+ // Make this time long so it is plain we broke out because of the
isFileClosed invocation.
+
conf.setLong(RecoverLeaseFSUtils.REPLICATION_REPLAY_LEASE_RECOVERY_DFS_TIMEOUT_MILLISECOND,
100000);
+ CancelableProgressable reporter =
Mockito.mock(CancelableProgressable.class);
+ Mockito.when(reporter.progress()).thenReturn(true);
+ IsFileClosedDistributedFileSystem dfs =
Mockito.mock(IsFileClosedDistributedFileSystem.class);
+ // Now make it so we fail the first two times -- the two fast
invocations, then we fall into
+ // the long loop during which we will call isFileClosed.... the next
invocation should
+ // therefore return true if we are to break the loop.
+
Mockito.when(dfs.recoverLease(FILE)).thenReturn(false).thenReturn(false).thenReturn(true);
+ Mockito.when(dfs.isFileClosed(FILE)).thenReturn(true);
+ RecoverLeaseFSUtils.recoverFileLease(dfs, FILE, conf, reporter);
+ Mockito.verify(dfs, Mockito.times(2)).recoverLease(FILE);
+ Mockito.verify(dfs, Mockito.times(1)).isFileClosed(FILE);
+ }
+
+ /**
+ * Test isLeaseRecoverable for both distributed and local FS
+ */
+ @Test
+ public void testIsLeaseRecoverable() {
+ assertTrue(RecoverLeaseFSUtils.isLeaseRecoverable(new
DistributedFileSystem()));
+ assertFalse(RecoverLeaseFSUtils.isLeaseRecoverable(new
LocalFileSystem()));
+ }
+
+ private interface FakeLeaseRecoverable {
+ @SuppressWarnings("unused")
+ boolean recoverLease(Path p) throws IOException;
+
+ @SuppressWarnings("unused")
+ boolean isFileClosed(Path p) throws IOException;
+ }
+
+ private static abstract class RecoverableFileSystem extends FileSystem
+ implements FakeLeaseRecoverable {
+ @Override
+ public boolean recoverLease(Path p) throws IOException {
+ return true;
+ }
+
+ @Override
+ public boolean isFileClosed(Path p) throws IOException {
+ return true;
+ }
+ }
+
+ /**
+ * Version of DFS that has HDFS-4525 in it.
+ */
+ private static class IsFileClosedDistributedFileSystem extends
DistributedFileSystem {
+ /**
+ * Close status of a file. Copied over from HDFS-4525
+ * @return true if file is already closed
+ **/
+ @Override
+ public boolean isFileClosed(Path f) throws IOException {
+ return false;
+ }
+ }
+}
diff --git
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
index 2bbc18a35d..7d1ce01705 100644
---
a/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
+++
b/phoenix-core/src/test/java/org/apache/phoenix/replication/reader/ReplicationLogProcessorTest.java
@@ -35,6 +35,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ExecutorService;
@@ -138,10 +139,12 @@ public class ReplicationLogProcessorTest extends
ParallelStatsDisabledIT {
// Test createLogFileReader with valid file - should succeed
ReplicationLogProcessor replicationLogProcessor = new
ReplicationLogProcessor(conf, testHAGroupName);
- LogFileReader reader =
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
+ Optional<LogFileReader> optionalLogFileReader =
replicationLogProcessor.createLogFileReader(localFs, validFilePath);
// Verify reader is created successfully
- assertNotNull("Reader should not be null for valid file", reader);
+ assertNotNull("Reader should not be null for valid file",
optionalLogFileReader);
+ assertTrue("Reader should be present for valid file",
optionalLogFileReader.isPresent());
+ LogFileReader reader = optionalLogFileReader.get();
assertNotNull("Reader context should not be null",
reader.getContext());
assertEquals("File path should match", validFilePath,
reader.getContext().getFilePath());
assertEquals("File system should match", localFs,
reader.getContext().getFileSystem());
diff --git a/phoenix-mapreduce-byo-shaded-hbase/pom.xml
b/phoenix-mapreduce-byo-shaded-hbase/pom.xml
index d98b42160d..077d91691c 100644
--- a/phoenix-mapreduce-byo-shaded-hbase/pom.xml
+++ b/phoenix-mapreduce-byo-shaded-hbase/pom.xml
@@ -120,6 +120,12 @@
</excludes>
</filter>
<!-- Phoenix specific -->
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>csv-bulk-load-config.properties</exclude>
+ </excludes>
+ </filter>
</filters>
<transformers>
<transformer