This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new b2c5fc4800 Introduce Compactor configuration for dealing with
consecutive failures (#5726)
b2c5fc4800 is described below
commit b2c5fc4800272c9d669ad53d749602583b2ae724
Author: Dave Marion <[email protected]>
AuthorDate: Fri Jul 18 09:11:55 2025 -0400
Introduce Compactor configuration for dealing with consecutive failures
(#5726)
This change propagates exceptions from the
classloading related code. Prior to this change
some of the exceptions raised by the classloading
code would be caught early and a
RuntimeException would be raised instead.
This change also modifies the Compactor to
conditionally delay execution of the next
compaction job when the Compactors has
been failing to complete consecutive prior
compactions. Four new properties control
the behavior of new delay logic.
This change also modifies the API between
the Compactor and Coordinator when
compactions have failed. The exception class
is now relayed to the coordinator, which is
tracking and periodically logging a summary
of the failures.
New metrics related to outcome of the
compaction on the compactor have been added.
Co-authored-by: Keith Turner <[email protected]>
---
.../accumulo/core/classloader/ClassLoaderUtil.java | 19 +-
.../DefaultContextClassLoaderFactory.java | 11 +-
.../core/client/ClientSideIteratorScanner.java | 2 +-
.../accumulo/core/client/rfile/RFileScanner.java | 2 +-
.../accumulo/core/clientImpl/ClientContext.java | 4 +
.../accumulo/core/clientImpl/OfflineIterator.java | 8 +-
.../apache/accumulo/core/conf/ConfigCheckUtil.java | 3 +-
.../core/conf/ConfigurationTypeHelper.java | 5 +-
.../org/apache/accumulo/core/conf/Property.java | 28 +++
.../core/iteratorsImpl/IteratorConfigUtil.java | 6 +-
.../iteratorsImpl/conf/ColumnToClassMapping.java | 5 +-
.../accumulo/core/metrics/MetricsProducer.java | 42 ++++
.../core/spi/common/ContextClassLoaderFactory.java | 26 ++-
.../accumulo/core/summary/SummarizerFactory.java | 7 +-
.../thrift/CompactionCoordinatorService.java | 137 +++++++++--
core/src/main/thrift/compaction-coordinator.thrift | 1 +
.../classloader/ContextClassLoaderFactoryTest.java | 3 +-
.../core/iteratorsImpl/IteratorConfigUtilTest.java | 13 +-
pom.xml | 2 +-
.../org/apache/accumulo/server/AbstractServer.java | 6 +-
.../org/apache/accumulo/server/ServerContext.java | 3 +
.../accumulo/server/ServiceEnvironmentImpl.java | 6 +-
.../accumulo/server/compaction/FileCompactor.java | 8 +-
.../coordinator/CompactionCoordinator.java | 112 ++++++++-
.../coordinator/CompactionCoordinatorTest.java | 5 +-
.../org/apache/accumulo/compactor/Compactor.java | 151 +++++++++++-
.../apache/accumulo/compactor/CompactorTest.java | 2 +-
.../apache/accumulo/gc/SimpleGarbageCollector.java | 1 +
.../java/org/apache/accumulo/manager/Manager.java | 1 +
.../accumulo/tserver/ConditionCheckerContext.java | 7 +-
.../org/apache/accumulo/tserver/ScanServer.java | 1 +
.../accumulo/tserver/TabletClientHandler.java | 6 +-
.../org/apache/accumulo/tserver/TabletServer.java | 2 +-
.../accumulo/tserver/tablet/CompactableUtils.java | 6 +-
.../accumulo/tserver/tablet/MinorCompactor.java | 2 +-
.../accumulo/tserver/tablet/ScanDataSource.java | 9 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 4 +-
.../main/java/org/apache/accumulo/shell/Shell.java | 4 +-
.../accumulo/shell/commands/SetIterCommand.java | 3 +-
.../shell/commands/SetScanIterCommand.java | 3 +-
.../shell/commands/SetShellIterCommand.java | 3 +-
.../classloader/vfs/AccumuloVFSClassLoader.java | 30 +--
.../compaction/ClassLoaderContextCompactionIT.java | 260 +++++++++++++++++++++
.../apache/accumulo/test/metrics/MetricsIT.java | 5 +
.../test/performance/scan/CollectTabletStats.java | 2 +-
45 files changed, 861 insertions(+), 105 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java
b/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java
index 7af94627b0..31e69e0a20 100644
---
a/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/classloader/ClassLoaderUtil.java
@@ -18,9 +18,12 @@
*/
package org.apache.accumulo.core.classloader;
+import java.io.IOException;
+
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.spi.common.ContextClassLoaderFactory;
+import
org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException;
import org.apache.accumulo.core.util.ConfigurationImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,11 +77,15 @@ public class ClassLoaderUtil {
}
@SuppressWarnings("deprecation")
- public static ClassLoader getClassLoader(String context) {
+ public static ClassLoader getClassLoader(String context) throws
ContextClassLoaderException {
if (context != null && !context.isEmpty()) {
return FACTORY.getClassLoader(context);
} else {
- return
org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.getClassLoader();
+ try {
+ return
org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader.getClassLoader();
+ } catch (IOException e) {
+ throw new ContextClassLoaderException(context, e);
+ }
}
}
@@ -92,7 +99,7 @@ public class ClassLoaderUtil {
return false;
}
return true;
- } catch (RuntimeException e) {
+ } catch (ContextClassLoaderException e) {
LOG.debug("Context {} is not valid.", context, e);
return false;
}
@@ -103,7 +110,11 @@ public class ClassLoaderUtil {
public static <U> Class<? extends U> loadClass(String context, String
className,
Class<U> extension) throws ClassNotFoundException {
- return getClassLoader(context).loadClass(className).asSubclass(extension);
+ try {
+ return
getClassLoader(context).loadClass(className).asSubclass(extension);
+ } catch (ContextClassLoaderException e) {
+ throw new ClassNotFoundException("Error loading class from context: " +
context, e);
+ }
}
public static <U> Class<? extends U> loadClass(String className, Class<U>
extension)
diff --git
a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
index 1bd5fcb670..ee5634c714 100644
---
a/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
+++
b/core/src/main/java/org/apache/accumulo/core/classloader/DefaultContextClassLoaderFactory.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.core.classloader;
import static java.util.concurrent.TimeUnit.MINUTES;
+import java.io.IOException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ScheduledFuture;
@@ -97,8 +98,12 @@ public class DefaultContextClassLoaderFactory implements
ContextClassLoaderFacto
@SuppressWarnings("deprecation")
@Override
- public ClassLoader getClassLoader(String contextName) {
- return org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader
- .getContextClassLoader(contextName);
+ public ClassLoader getClassLoader(String contextName) throws
ContextClassLoaderException {
+ try {
+ return org.apache.accumulo.start.classloader.vfs.AccumuloVFSClassLoader
+ .getContextClassLoader(contextName);
+ } catch (RuntimeException | IOException e) {
+ throw new ContextClassLoaderException(contextName, e);
+ }
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
index 8a64621951..0209e8deec 100644
---
a/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
+++
b/core/src/main/java/org/apache/accumulo/core/client/ClientSideIteratorScanner.java
@@ -242,7 +242,7 @@ public class ClientSideIteratorScanner extends
ScannerOptions implements Scanner
IteratorBuilder.builder(tm.values()).opts(serverSideIteratorOptions).env(iterEnv).build();
skvi = IteratorConfigUtil.loadIterators(smi, ib);
- } catch (IOException e) {
+ } catch (IOException | ReflectiveOperationException e) {
throw new RuntimeException(e);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
index e34730ff36..9266a53e51 100644
--- a/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
+++ b/core/src/main/java/org/apache/accumulo/core/client/rfile/RFileScanner.java
@@ -320,7 +320,7 @@ class RFileScanner extends ScannerOptions implements
Scanner {
.opts(serverSideIteratorOptions).env(iterEnv).build();
iterator = IteratorConfigUtil.loadIterators(iterator,
iteratorBuilder);
}
- } catch (IOException e) {
+ } catch (IOException | ReflectiveOperationException e) {
throw new RuntimeException(e);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
index c2bcabe01e..e5b059a80f 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ClientContext.java
@@ -163,6 +163,10 @@ public class ClientContext implements AccumuloClient {
}
}
+ protected boolean isClosed() {
+ return closed;
+ }
+
private ScanServerSelector createScanServerSelector() {
String clazz =
ClientProperty.SCAN_SERVER_SELECTOR.getValue(info.getProperties());
try {
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
index d64e018148..7ec99bd9c4 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/OfflineIterator.java
@@ -137,8 +137,8 @@ class OfflineIterator implements Iterator<Entry<Key,Value>>
{
}
}
- private void nextTablet()
- throws TableNotFoundException, AccumuloException, IOException,
AccumuloSecurityException {
+ private void nextTablet() throws TableNotFoundException, AccumuloException,
IOException,
+ AccumuloSecurityException, ReflectiveOperationException {
Range nextRange;
@@ -204,8 +204,8 @@ class OfflineIterator implements Iterator<Entry<Key,Value>>
{
}
private SortedKeyValueIterator<Key,Value> createIterator(KeyExtent extent,
- Collection<StoredTabletFile> absFiles)
- throws TableNotFoundException, AccumuloException, IOException,
AccumuloSecurityException {
+ Collection<StoredTabletFile> absFiles) throws TableNotFoundException,
AccumuloException,
+ IOException, AccumuloSecurityException, ReflectiveOperationException {
// possible race condition here, if table is renamed
String tableName = context.getTableName(tableId);
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/ConfigCheckUtil.java
b/core/src/main/java/org/apache/accumulo/core/conf/ConfigCheckUtil.java
index eba5cc420f..db5cb305a6 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/ConfigCheckUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/ConfigCheckUtil.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.core.conf;
-import java.io.IOException;
import java.util.Map.Entry;
import java.util.Objects;
@@ -160,7 +159,7 @@ public class ConfigCheckUtil {
Class<?> requiredBaseClass) {
try {
ConfigurationTypeHelper.getClassInstance(null, className,
requiredBaseClass);
- } catch (IOException | ReflectiveOperationException e) {
+ } catch (ReflectiveOperationException e) {
fatal(confOption + " has an invalid class name: " + className);
} catch (ClassCastException e) {
fatal(confOption + " must implement " + requiredBaseClass
diff --git
a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationTypeHelper.java
b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationTypeHelper.java
index cd286c1cd7..ae0d4088e5 100644
---
a/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationTypeHelper.java
+++
b/core/src/main/java/org/apache/accumulo/core/conf/ConfigurationTypeHelper.java
@@ -24,7 +24,6 @@ import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.MINUTES;
import static java.util.concurrent.TimeUnit.SECONDS;
-import java.io.IOException;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
@@ -175,7 +174,7 @@ public class ConfigurationTypeHelper {
try {
instance = getClassInstance(context, clazzName, base);
- } catch (RuntimeException | IOException | ReflectiveOperationException e) {
+ } catch (RuntimeException | ReflectiveOperationException e) {
log.error("Failed to load class {} in classloader context {}",
clazzName, context, e);
}
@@ -196,7 +195,7 @@ public class ConfigurationTypeHelper {
* @return a new instance of the class
*/
public static <T> T getClassInstance(String context, String clazzName,
Class<T> base)
- throws IOException, ReflectiveOperationException {
+ throws ReflectiveOperationException {
T instance;
Class<? extends T> clazz = ClassLoaderUtil.loadClass(context, clazzName,
base);
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 625cb0ed1f..46e29f4234 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1561,6 +1561,34 @@ public enum Property {
COMPACTOR_CLIENTPORT("compactor.port.client", "9133", PropertyType.PORT,
"The port used for handling client connections on the compactor
servers.", "2.1.0"),
@Experimental
+ COMPACTOR_FAILURE_BACKOFF_THRESHOLD("compactor.failure.backoff.threshold",
"3",
+ PropertyType.COUNT,
+ "The number of consecutive failures that must occur before the Compactor
starts to back off"
+ + " processing compactions.",
+ "2.1.4"),
+ @Experimental
+ COMPACTOR_FAILURE_BACKOFF_INTERVAL("compactor.failure.backoff.interval", "0",
+ PropertyType.TIMEDURATION,
+ "The time basis for computing the wait time for compaction failure
backoff. A value of zero disables"
+ + " the backoff feature. When a non-zero value is supplied, then
after compactor.failure.backoff.threshold"
+ + " failures have occurred, the compactor will wait
compactor.failure.backoff.interval * the number of"
+ + " failures seconds before executing the next compaction. For
example, if this value is 10s, then after"
+ + " three failures the Compactor will wait 30s before starting the
next compaction. If the compaction fails"
+ + " again, then it will wait 40s before starting the next
compaction.",
+ "2.1.4"),
+ @Experimental
+ COMPACTOR_FAILURE_BACKOFF_RESET("compactor.failure.backoff.reset", "10m",
+ PropertyType.TIMEDURATION,
+ "The maximum amount of time that the compactor will wait before
executing the next compaction. When this"
+ + " time limit has been reached, the failures are cleared.",
+ "2.1.4"),
+ @Experimental
+
COMPACTOR_FAILURE_TERMINATION_THRESHOLD("compactor.failure.termination.threshold",
"0",
+ PropertyType.COUNT,
+ "The number of consecutive failures at which the Compactor exits and the
process terminates. A zero"
+ + " value disables this feature.",
+ "2.1.4"),
+ @Experimental
COMPACTOR_MINTHREADS("compactor.threads.minimum", "4", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests.",
"2.1.0"),
@Experimental
diff --git
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtil.java
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtil.java
index 93c098d0ef..5758411e04 100644
---
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtil.java
+++
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtil.java
@@ -174,7 +174,8 @@ public class IteratorConfigUtil {
*/
public static SortedKeyValueIterator<Key,Value>
convertItersAndLoad(IteratorScope scope,
SortedKeyValueIterator<Key,Value> source, AccumuloConfiguration conf,
- List<IteratorSetting> iterators, IteratorEnvironment env) throws
IOException {
+ List<IteratorSetting> iterators, IteratorEnvironment env)
+ throws IOException, ReflectiveOperationException {
List<IterInfo> ssiList = new ArrayList<>();
Map<String,Map<String,String>> ssio = new HashMap<>();
@@ -194,7 +195,7 @@ public class IteratorConfigUtil {
*/
public static SortedKeyValueIterator<Key,Value>
loadIterators(SortedKeyValueIterator<Key,Value> source, IteratorBuilder
iteratorBuilder)
- throws IOException {
+ throws IOException, ReflectiveOperationException {
SortedKeyValueIterator<Key,Value> prev = source;
final boolean useClassLoader = iteratorBuilder.useAccumuloClassLoader;
Map<String,Class<SortedKeyValueIterator<Key,Value>>> classCache = new
HashMap<>();
@@ -228,6 +229,7 @@ public class IteratorConfigUtil {
} catch (ReflectiveOperationException e) {
log.error("Failed to load iterator {}, for table {}, from context {}",
iterInfo.className,
iteratorBuilder.iteratorEnvironment.getTableId(),
iteratorBuilder.context, e);
+ // This has to be a RuntimeException to be handled properly to fail
the scan
throw new RuntimeException(e);
}
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/conf/ColumnToClassMapping.java
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/conf/ColumnToClassMapping.java
index 1e6ff268b0..419aecc5cb 100644
---
a/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/conf/ColumnToClassMapping.java
+++
b/core/src/main/java/org/apache/accumulo/core/iteratorsImpl/conf/ColumnToClassMapping.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.core.iteratorsImpl.conf;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -44,12 +43,12 @@ public class ColumnToClassMapping<K> {
}
public ColumnToClassMapping(Map<String,String> objectStrings, Class<?
extends K> c)
- throws ReflectiveOperationException, IOException {
+ throws ReflectiveOperationException {
this(objectStrings, c, null);
}
public ColumnToClassMapping(Map<String,String> objectStrings, Class<?
extends K> c,
- String context) throws ReflectiveOperationException, IOException {
+ String context) throws ReflectiveOperationException {
this();
for (Entry<String,String> entry : objectStrings.entrySet()) {
diff --git
a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
index c98a2f5c61..7ce7a023df 100644
--- a/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
+++ b/core/src/main/java/org/apache/accumulo/core/metrics/MetricsProducer.java
@@ -73,6 +73,42 @@ import io.micrometer.core.instrument.MeterRegistry;
* <td>FunctionCounter</td>
* <td>Number of entries written by all threads performing compactions</td>
* </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_COMPACTIONS_CANCELLED}</td>
+ * <td>FunctionCounter</td>
+ * <td>Number of compactions cancelled on a compactor</td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_COMPACTIONS_COMPLETED}</td>
+ * <td>FunctionCounter</td>
+ * <td>Number of compactions completed on a compactor</td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_COMPACTIONS_FAILED}</td>
+ * <td>FunctionCounter</td>
+ * <td>Number of compactions failed on a compactor</td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_FAILURES_CONSECUTIVE}</td>
+ * <td>Gauge</td>
+ * <td>Number of consecutive compaction failures on a compactor</td>
+ * </tr>
+ * <tr>
+ * <td>N/A</td>
+ * <td>N/A</td>
+ * <td>{@value #METRICS_COMPACTOR_FAILURES_TERMINATION}</td>
+ * <td>Gauge</td>
+ * <td>Number of Compactors terminated due to consecutive failures. Process
exits after this metric
+ * is incremented, so it's not guaranteed to be seen.</td>
+ * </tr>
* <!-- fate -->
* <tr>
* <td>currentFateOps</td>
@@ -625,6 +661,12 @@ public interface MetricsProducer {
String METRICS_COMPACTOR_MAJC_STUCK = METRICS_COMPACTOR_PREFIX +
"majc.stuck";
String METRICS_COMPACTOR_ENTRIES_READ = METRICS_COMPACTOR_PREFIX +
"entries.read";
String METRICS_COMPACTOR_ENTRIES_WRITTEN = METRICS_COMPACTOR_PREFIX +
"entries.written";
+ String METRICS_COMPACTOR_COMPACTIONS_CANCELLED = METRICS_COMPACTOR_PREFIX +
"majc.cancelled";
+ String METRICS_COMPACTOR_COMPACTIONS_COMPLETED = METRICS_COMPACTOR_PREFIX +
"majc.completed";
+ String METRICS_COMPACTOR_COMPACTIONS_FAILED = METRICS_COMPACTOR_PREFIX +
"majc.failed";
+ String METRICS_COMPACTOR_FAILURES_CONSECUTIVE =
+ METRICS_COMPACTOR_PREFIX + "majc.failures.consecutive";
+ String METRICS_COMPACTOR_FAILURES_TERMINATION = METRICS_COMPACTOR_PREFIX +
"terminated";
String METRICS_FATE_PREFIX = "accumulo.fate.";
String METRICS_FATE_TYPE_IN_PROGRESS = METRICS_FATE_PREFIX +
"ops.in.progress.by.type";
diff --git
a/core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java
b/core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java
index 3d9c18683f..950b0a59e1 100644
---
a/core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java
+++
b/core/src/main/java/org/apache/accumulo/core/spi/common/ContextClassLoaderFactory.java
@@ -47,6 +47,26 @@ package org.apache.accumulo.core.spi.common;
*/
public interface ContextClassLoaderFactory {
+ class ContextClassLoaderException extends Exception {
+
+ private static final long serialVersionUID = 1L;
+ private static final String msg = "Error getting classloader for context:
";
+
+ public ContextClassLoaderException(String context, Throwable cause,
boolean enableSuppression,
+ boolean writableStackTrace) {
+ super(msg + context, cause, enableSuppression, writableStackTrace);
+ }
+
+ public ContextClassLoaderException(String context, Throwable cause) {
+ super(msg + context, cause);
+ }
+
+ public ContextClassLoaderException(String context) {
+ super(msg + context);
+ }
+
+ }
+
/**
* Pass the service environment to allow for additional class loader
configuration
*
@@ -57,8 +77,8 @@ public interface ContextClassLoaderFactory {
/**
* Get the class loader for the given contextName. Callers should not cache
the ClassLoader result
* as it may change if/when the ClassLoader reloads. Implementations should
throw a
- * RuntimeException of some type (such as IllegalArgumentException) if the
provided contextName is
- * not supported or fails to be constructed.
+ * ContextClassLoaderException if the provided contextName is not supported
or fails to be
+ * constructed.
*
* @param contextName the name of the context that represents a class loader
that is managed by
* this factory. Currently, Accumulo will only call this method for
non-null and non-empty
@@ -66,5 +86,5 @@ public interface ContextClassLoaderFactory {
* consulting this plugin.
* @return the class loader for the given contextName
*/
- ClassLoader getClassLoader(String contextName);
+ ClassLoader getClassLoader(String contextName) throws
ContextClassLoaderException;
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java
b/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java
index 48b123b3a7..2ab51b6323 100644
--- a/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java
+++ b/core/src/main/java/org/apache/accumulo/core/summary/SummarizerFactory.java
@@ -18,8 +18,6 @@
*/
package org.apache.accumulo.core.summary;
-import java.io.IOException;
-
import org.apache.accumulo.core.classloader.ClassLoaderUtil;
import org.apache.accumulo.core.client.summary.Summarizer;
import org.apache.accumulo.core.client.summary.SummarizerConfiguration;
@@ -41,8 +39,7 @@ public class SummarizerFactory {
this.context = ClassLoaderUtil.tableContext(tableConfig);
}
- private Summarizer newSummarizer(String classname)
- throws IOException, ReflectiveOperationException {
+ private Summarizer newSummarizer(String classname) throws
ReflectiveOperationException {
if (classloader != null) {
return
classloader.loadClass(classname).asSubclass(Summarizer.class).getDeclaredConstructor()
.newInstance();
@@ -55,7 +52,7 @@ public class SummarizerFactory {
public Summarizer getSummarizer(SummarizerConfiguration conf) {
try {
return newSummarizer(conf.getClassName());
- } catch (ReflectiveOperationException | IOException e) {
+ } catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
diff --git
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java
index ac8e15f6e5..a59cec5ae9 100644
---
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java
+++
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/compaction/thrift/CompactionCoordinatorService.java
@@ -35,7 +35,7 @@ public class CompactionCoordinatorService {
public void
updateCompactionStatus(org.apache.accumulo.core.trace.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId, TCompactionStatusUpdate status, long
timestamp) throws org.apache.thrift.TException;
- public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent) throws
org.apache.thrift.TException;
+ public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String
exceptionClassName) throws org.apache.thrift.TException;
public TExternalCompactionList
getRunningCompactions(org.apache.accumulo.core.trace.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials) throws
org.apache.thrift.TException;
@@ -53,7 +53,7 @@ public class CompactionCoordinatorService {
public void
updateCompactionStatus(org.apache.accumulo.core.trace.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId, TCompactionStatusUpdate status, long
timestamp, org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler)
throws org.apache.thrift.TException;
- public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws
org.apache.thrift.TException;
+ public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String
exceptionClassName, org.apache.thrift.async.AsyncMethodCallback<Void>
resultHandler) throws org.apache.thrift.TException;
public void
getRunningCompactions(org.apache.accumulo.core.trace.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
org.apache.thrift.async.AsyncMethodCallback<TExternalCompactionList>
resultHandler) throws org.apache.thrift.TException;
@@ -164,19 +164,20 @@ public class CompactionCoordinatorService {
}
@Override
- public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent) throws
org.apache.thrift.TException
+ public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String
exceptionClassName) throws org.apache.thrift.TException
{
- send_compactionFailed(tinfo, credentials, externalCompactionId, extent);
+ send_compactionFailed(tinfo, credentials, externalCompactionId, extent,
exceptionClassName);
recv_compactionFailed();
}
- public void
send_compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent) throws
org.apache.thrift.TException
+ public void
send_compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String
exceptionClassName) throws org.apache.thrift.TException
{
compactionFailed_args args = new compactionFailed_args();
args.setTinfo(tinfo);
args.setCredentials(credentials);
args.setExternalCompactionId(externalCompactionId);
args.setExtent(extent);
+ args.setExceptionClassName(exceptionClassName);
sendBase("compactionFailed", args);
}
@@ -423,9 +424,9 @@ public class CompactionCoordinatorService {
}
@Override
- public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws
org.apache.thrift.TException {
+ public void compactionFailed(org.apache.accumulo.core.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String
exceptionClassName, org.apache.thrift.async.AsyncMethodCallback<Void>
resultHandler) throws org.apache.thrift.TException {
checkReady();
- compactionFailed_call method_call = new compactionFailed_call(tinfo,
credentials, externalCompactionId, extent, resultHandler, this,
___protocolFactory, ___transport);
+ compactionFailed_call method_call = new compactionFailed_call(tinfo,
credentials, externalCompactionId, extent, exceptionClassName, resultHandler,
this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
___manager.call(method_call);
}
@@ -435,12 +436,14 @@ public class CompactionCoordinatorService {
private org.apache.accumulo.core.securityImpl.thrift.TCredentials
credentials;
private java.lang.String externalCompactionId;
private org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent;
- public compactionFailed_call(org.apache.accumulo.core.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler,
org.apache.thrift.async.TAsyncClient client,
org.apache.thrift.protocol.TProtocolFactory protocolFactory,
org.apache.thrift.transport.TNonblockingTransport transport) thro [...]
+ private java.lang.String exceptionClassName;
+ public compactionFailed_call(org.apache.accumulo.core.trace.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent, java.lang.String
exceptionClassName, org.apache.thrift.async.AsyncMethodCallback<Void>
resultHandler, org.apache.thrift.async.TAsyncClient client,
org.apache.thrift.protocol.TProtocolFactory protocolFactory,
org.apache.thrift.transport. [...]
super(client, protocolFactory, transport, resultHandler, false);
this.tinfo = tinfo;
this.credentials = credentials;
this.externalCompactionId = externalCompactionId;
this.extent = extent;
+ this.exceptionClassName = exceptionClassName;
}
@Override
@@ -451,6 +454,7 @@ public class CompactionCoordinatorService {
args.setCredentials(credentials);
args.setExternalCompactionId(externalCompactionId);
args.setExtent(extent);
+ args.setExceptionClassName(exceptionClassName);
args.write(prot);
prot.writeMessageEnd();
}
@@ -715,7 +719,7 @@ public class CompactionCoordinatorService {
@Override
public compactionFailed_result getResult(I iface, compactionFailed_args
args) throws org.apache.thrift.TException {
compactionFailed_result result = new compactionFailed_result();
- iface.compactionFailed(args.tinfo, args.credentials,
args.externalCompactionId, args.extent);
+ iface.compactionFailed(args.tinfo, args.credentials,
args.externalCompactionId, args.extent, args.exceptionClassName);
return result;
}
}
@@ -1088,7 +1092,7 @@ public class CompactionCoordinatorService {
@Override
public void start(I iface, compactionFailed_args args,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws
org.apache.thrift.TException {
- iface.compactionFailed(args.tinfo, args.credentials,
args.externalCompactionId, args.extent,resultHandler);
+ iface.compactionFailed(args.tinfo, args.credentials,
args.externalCompactionId, args.extent, args.exceptionClassName,resultHandler);
}
}
@@ -4652,6 +4656,7 @@ public class CompactionCoordinatorService {
private static final org.apache.thrift.protocol.TField
CREDENTIALS_FIELD_DESC = new org.apache.thrift.protocol.TField("credentials",
org.apache.thrift.protocol.TType.STRUCT, (short)2);
private static final org.apache.thrift.protocol.TField
EXTERNAL_COMPACTION_ID_FIELD_DESC = new
org.apache.thrift.protocol.TField("externalCompactionId",
org.apache.thrift.protocol.TType.STRING, (short)3);
private static final org.apache.thrift.protocol.TField EXTENT_FIELD_DESC =
new org.apache.thrift.protocol.TField("extent",
org.apache.thrift.protocol.TType.STRUCT, (short)4);
+ private static final org.apache.thrift.protocol.TField
EXCEPTION_CLASS_NAME_FIELD_DESC = new
org.apache.thrift.protocol.TField("exceptionClassName",
org.apache.thrift.protocol.TType.STRING, (short)5);
private static final org.apache.thrift.scheme.SchemeFactory
STANDARD_SCHEME_FACTORY = new compactionFailed_argsStandardSchemeFactory();
private static final org.apache.thrift.scheme.SchemeFactory
TUPLE_SCHEME_FACTORY = new compactionFailed_argsTupleSchemeFactory();
@@ -4660,13 +4665,15 @@ public class CompactionCoordinatorService {
public @org.apache.thrift.annotation.Nullable
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials; //
required
public @org.apache.thrift.annotation.Nullable java.lang.String
externalCompactionId; // required
public @org.apache.thrift.annotation.Nullable
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent; // required
+ public @org.apache.thrift.annotation.Nullable java.lang.String
exceptionClassName; // required
/** The set of fields this struct contains, along with convenience methods
for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
TINFO((short)1, "tinfo"),
CREDENTIALS((short)2, "credentials"),
EXTERNAL_COMPACTION_ID((short)3, "externalCompactionId"),
- EXTENT((short)4, "extent");
+ EXTENT((short)4, "extent"),
+ EXCEPTION_CLASS_NAME((short)5, "exceptionClassName");
private static final java.util.Map<java.lang.String, _Fields> byName =
new java.util.HashMap<java.lang.String, _Fields>();
@@ -4690,6 +4697,8 @@ public class CompactionCoordinatorService {
return EXTERNAL_COMPACTION_ID;
case 4: // EXTENT
return EXTENT;
+ case 5: // EXCEPTION_CLASS_NAME
+ return EXCEPTION_CLASS_NAME;
default:
return null;
}
@@ -4744,6 +4753,8 @@ public class CompactionCoordinatorService {
new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
tmpMap.put(_Fields.EXTENT, new
org.apache.thrift.meta_data.FieldMetaData("extent",
org.apache.thrift.TFieldRequirementType.DEFAULT,
new
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent.class)));
+ tmpMap.put(_Fields.EXCEPTION_CLASS_NAME, new
org.apache.thrift.meta_data.FieldMetaData("exceptionClassName",
org.apache.thrift.TFieldRequirementType.DEFAULT,
+ new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(compactionFailed_args.class,
metaDataMap);
}
@@ -4755,13 +4766,15 @@ public class CompactionCoordinatorService {
org.apache.accumulo.core.trace.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
java.lang.String externalCompactionId,
- org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent)
+ org.apache.accumulo.core.dataImpl.thrift.TKeyExtent extent,
+ java.lang.String exceptionClassName)
{
this();
this.tinfo = tinfo;
this.credentials = credentials;
this.externalCompactionId = externalCompactionId;
this.extent = extent;
+ this.exceptionClassName = exceptionClassName;
}
/**
@@ -4780,6 +4793,9 @@ public class CompactionCoordinatorService {
if (other.isSetExtent()) {
this.extent = new
org.apache.accumulo.core.dataImpl.thrift.TKeyExtent(other.extent);
}
+ if (other.isSetExceptionClassName()) {
+ this.exceptionClassName = other.exceptionClassName;
+ }
}
@Override
@@ -4793,6 +4809,7 @@ public class CompactionCoordinatorService {
this.credentials = null;
this.externalCompactionId = null;
this.extent = null;
+ this.exceptionClassName = null;
}
@org.apache.thrift.annotation.Nullable
@@ -4895,6 +4912,31 @@ public class CompactionCoordinatorService {
}
}
+ @org.apache.thrift.annotation.Nullable
+ public java.lang.String getExceptionClassName() {
+ return this.exceptionClassName;
+ }
+
+ public compactionFailed_args
setExceptionClassName(@org.apache.thrift.annotation.Nullable java.lang.String
exceptionClassName) {
+ this.exceptionClassName = exceptionClassName;
+ return this;
+ }
+
+ public void unsetExceptionClassName() {
+ this.exceptionClassName = null;
+ }
+
+ /** Returns true if field exceptionClassName is set (has been assigned a
value) and false otherwise */
+ public boolean isSetExceptionClassName() {
+ return this.exceptionClassName != null;
+ }
+
+ public void setExceptionClassNameIsSet(boolean value) {
+ if (!value) {
+ this.exceptionClassName = null;
+ }
+ }
+
@Override
public void setFieldValue(_Fields field,
@org.apache.thrift.annotation.Nullable java.lang.Object value) {
switch (field) {
@@ -4930,6 +4972,14 @@ public class CompactionCoordinatorService {
}
break;
+ case EXCEPTION_CLASS_NAME:
+ if (value == null) {
+ unsetExceptionClassName();
+ } else {
+ setExceptionClassName((java.lang.String)value);
+ }
+ break;
+
}
}
@@ -4949,6 +4999,9 @@ public class CompactionCoordinatorService {
case EXTENT:
return getExtent();
+ case EXCEPTION_CLASS_NAME:
+ return getExceptionClassName();
+
}
throw new java.lang.IllegalStateException();
}
@@ -4969,6 +5022,8 @@ public class CompactionCoordinatorService {
return isSetExternalCompactionId();
case EXTENT:
return isSetExtent();
+ case EXCEPTION_CLASS_NAME:
+ return isSetExceptionClassName();
}
throw new java.lang.IllegalStateException();
}
@@ -5022,6 +5077,15 @@ public class CompactionCoordinatorService {
return false;
}
+ boolean this_present_exceptionClassName = true &&
this.isSetExceptionClassName();
+ boolean that_present_exceptionClassName = true &&
that.isSetExceptionClassName();
+ if (this_present_exceptionClassName || that_present_exceptionClassName) {
+ if (!(this_present_exceptionClassName &&
that_present_exceptionClassName))
+ return false;
+ if (!this.exceptionClassName.equals(that.exceptionClassName))
+ return false;
+ }
+
return true;
}
@@ -5045,6 +5109,10 @@ public class CompactionCoordinatorService {
if (isSetExtent())
hashCode = hashCode * 8191 + extent.hashCode();
+ hashCode = hashCode * 8191 + ((isSetExceptionClassName()) ? 131071 :
524287);
+ if (isSetExceptionClassName())
+ hashCode = hashCode * 8191 + exceptionClassName.hashCode();
+
return hashCode;
}
@@ -5096,6 +5164,16 @@ public class CompactionCoordinatorService {
return lastComparison;
}
}
+ lastComparison = java.lang.Boolean.compare(isSetExceptionClassName(),
other.isSetExceptionClassName());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (isSetExceptionClassName()) {
+ lastComparison =
org.apache.thrift.TBaseHelper.compareTo(this.exceptionClassName,
other.exceptionClassName);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
return 0;
}
@@ -5151,6 +5229,14 @@ public class CompactionCoordinatorService {
sb.append(this.extent);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("exceptionClassName:");
+ if (this.exceptionClassName == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.exceptionClassName);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -5240,6 +5326,14 @@ public class CompactionCoordinatorService {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
break;
+ case 5: // EXCEPTION_CLASS_NAME
+ if (schemeField.type == org.apache.thrift.protocol.TType.STRING)
{
+ struct.exceptionClassName = iprot.readString();
+ struct.setExceptionClassNameIsSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
}
@@ -5276,6 +5370,11 @@ public class CompactionCoordinatorService {
struct.extent.write(oprot);
oprot.writeFieldEnd();
}
+ if (struct.exceptionClassName != null) {
+ oprot.writeFieldBegin(EXCEPTION_CLASS_NAME_FIELD_DESC);
+ oprot.writeString(struct.exceptionClassName);
+ oprot.writeFieldEnd();
+ }
oprot.writeFieldStop();
oprot.writeStructEnd();
}
@@ -5307,7 +5406,10 @@ public class CompactionCoordinatorService {
if (struct.isSetExtent()) {
optionals.set(3);
}
- oprot.writeBitSet(optionals, 4);
+ if (struct.isSetExceptionClassName()) {
+ optionals.set(4);
+ }
+ oprot.writeBitSet(optionals, 5);
if (struct.isSetTinfo()) {
struct.tinfo.write(oprot);
}
@@ -5320,12 +5422,15 @@ public class CompactionCoordinatorService {
if (struct.isSetExtent()) {
struct.extent.write(oprot);
}
+ if (struct.isSetExceptionClassName()) {
+ oprot.writeString(struct.exceptionClassName);
+ }
}
@Override
public void read(org.apache.thrift.protocol.TProtocol prot,
compactionFailed_args struct) throws org.apache.thrift.TException {
org.apache.thrift.protocol.TTupleProtocol iprot =
(org.apache.thrift.protocol.TTupleProtocol) prot;
- java.util.BitSet incoming = iprot.readBitSet(4);
+ java.util.BitSet incoming = iprot.readBitSet(5);
if (incoming.get(0)) {
struct.tinfo = new org.apache.accumulo.core.trace.thrift.TInfo();
struct.tinfo.read(iprot);
@@ -5345,6 +5450,10 @@ public class CompactionCoordinatorService {
struct.extent.read(iprot);
struct.setExtentIsSet(true);
}
+ if (incoming.get(4)) {
+ struct.exceptionClassName = iprot.readString();
+ struct.setExceptionClassNameIsSet(true);
+ }
}
}
diff --git a/core/src/main/thrift/compaction-coordinator.thrift
b/core/src/main/thrift/compaction-coordinator.thrift
index cbf3ac0d1f..ae6a7cce6b 100644
--- a/core/src/main/thrift/compaction-coordinator.thrift
+++ b/core/src/main/thrift/compaction-coordinator.thrift
@@ -112,6 +112,7 @@ service CompactionCoordinatorService {
2:security.TCredentials credentials
3:string externalCompactionId
4:data.TKeyExtent extent
+ 5:string exceptionClassName
)
/*
diff --git
a/core/src/test/java/org/apache/accumulo/core/classloader/ContextClassLoaderFactoryTest.java
b/core/src/test/java/org/apache/accumulo/core/classloader/ContextClassLoaderFactoryTest.java
index 83b3821640..84ee2f0284 100644
---
a/core/src/test/java/org/apache/accumulo/core/classloader/ContextClassLoaderFactoryTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/classloader/ContextClassLoaderFactoryTest.java
@@ -28,6 +28,7 @@ import java.util.Objects;
import org.apache.accumulo.core.WithTestNames;
import org.apache.accumulo.core.conf.ConfigurationCopy;
import org.apache.accumulo.core.conf.Property;
+import
org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException;
import org.apache.commons.io.FileUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -64,7 +65,7 @@ public class ContextClassLoaderFactoryTest extends
WithTestNames {
}
@Test
- public void differentContexts() {
+ public void differentContexts() throws ContextClassLoaderException {
ConfigurationCopy cc = new ConfigurationCopy();
cc.set(Property.GENERAL_CONTEXT_CLASSLOADER_FACTORY.getKey(),
diff --git
a/core/src/test/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtilTest.java
b/core/src/test/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtilTest.java
index 251a3441b3..3042a9a0a5 100644
---
a/core/src/test/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtilTest.java
+++
b/core/src/test/java/org/apache/accumulo/core/iteratorsImpl/IteratorConfigUtilTest.java
@@ -136,14 +136,15 @@ public class IteratorConfigUtilTest {
}
private SortedKeyValueIterator<Key,Value> createIter(IteratorScope scope,
- SortedMapIterator source, AccumuloConfiguration conf) throws IOException
{
+ SortedMapIterator source, AccumuloConfiguration conf)
+ throws IOException, ReflectiveOperationException {
var ibEnv = IteratorConfigUtil.loadIterConf(scope, EMPTY_ITERS, new
HashMap<>(), conf);
var iteratorBuilder =
ibEnv.env(ClientIteratorEnvironment.DEFAULT).useClassLoader(null).build();
return IteratorConfigUtil.loadIterators(source, iteratorBuilder);
}
@Test
- public void test1() throws IOException {
+ public void test1() throws IOException, ReflectiveOperationException {
ConfigurationCopy conf = new ConfigurationCopy();
// create an iterator that adds 1 and then squares
@@ -177,7 +178,7 @@ public class IteratorConfigUtilTest {
}
@Test
- public void test4() throws IOException {
+ public void test4() throws IOException, ReflectiveOperationException {
// try loading for a different scope
AccumuloConfiguration conf = new ConfigurationCopy();
@@ -209,7 +210,7 @@ public class IteratorConfigUtilTest {
}
@Test
- public void test3() throws IOException {
+ public void test3() throws IOException, ReflectiveOperationException {
// change the load order, so it squares and then adds
ConfigurationCopy conf = new ConfigurationCopy();
@@ -245,7 +246,7 @@ public class IteratorConfigUtilTest {
}
@Test
- public void test2() throws IOException {
+ public void test2() throws IOException, ReflectiveOperationException {
ConfigurationCopy conf = new ConfigurationCopy();
@@ -284,7 +285,7 @@ public class IteratorConfigUtilTest {
}
@Test
- public void test5() throws IOException {
+ public void test5() throws IOException, ReflectiveOperationException {
ConfigurationCopy conf = new ConfigurationCopy();
// create an iterator that ages off
diff --git a/pom.xml b/pom.xml
index fdcab6190e..9fd30c1fc0 100644
--- a/pom.xml
+++ b/pom.xml
@@ -116,7 +116,7 @@
<accumulo.release.version>${project.version}</accumulo.release.version>
<!-- avoid error shutting down built-in ForkJoinPool.commonPool() during
exec:java tasks -->
<exec.cleanupDaemonThreads>false</exec.cleanupDaemonThreads>
- <extraTestArgs>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens
java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED
--add-opens java.base/java.net=ALL-UNNAMED --add-opens
java.management/java.lang.management=ALL-UNNAMED --add-opens
java.management/sun.management=ALL-UNNAMED --add-opens
java.base/java.security=ALL-UNNAMED --add-opens
java.base/java.lang.reflect=ALL-UNNAMED --add-opens
java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/java.ut [...]
+ <extraTestArgs>--add-opens java.base/java.lang=ALL-UNNAMED --add-opens
java.base/java.util=ALL-UNNAMED --add-opens java.base/java.io=ALL-UNNAMED
--add-opens java.base/java.net=ALL-UNNAMED --add-opens
java.management/java.lang.management=ALL-UNNAMED --add-opens
java.management/sun.management=ALL-UNNAMED --add-opens
java.base/java.security=ALL-UNNAMED --add-opens
java.base/java.lang.reflect=ALL-UNNAMED --add-opens
java.base/java.util.concurrent=ALL-UNNAMED --add-opens java.base/java.ut [...]
<failsafe.excludedGroups />
<failsafe.failIfNoSpecifiedTests>false</failsafe.failIfNoSpecifiedTests>
<failsafe.forkCount>1</failsafe.forkCount>
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
index cd5de7c442..b2122caafd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/AbstractServer.java
@@ -322,7 +322,11 @@ public abstract class AbstractServer
}
@Override
- public void close() {}
+ public void close() {
+ if (context != null) {
+ context.close();
+ }
+ }
protected void waitForUpgrade() throws InterruptedException {
while (AccumuloDataVersion.getCurrentVersion(getContext()) <
AccumuloDataVersion.get()) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index a2a1f93fd4..30543aac24 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -84,6 +84,8 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* Provides a server context for Accumulo server components that operate with
the system credentials
* and have access to the system files and configuration.
@@ -461,6 +463,7 @@ public class ServerContext extends ClientContext {
@Override
public void close() {
+ Preconditions.checkState(!isClosed(), "ServerContext.close was already
called.");
getMetricsInfo().close();
super.close();
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/ServiceEnvironmentImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/ServiceEnvironmentImpl.java
index 0f68a11d48..46c6fdb7bd 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/ServiceEnvironmentImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/ServiceEnvironmentImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.server;
-import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.classloader.ClassLoaderUtil;
@@ -62,14 +61,13 @@ public class ServiceEnvironmentImpl implements
ServiceEnvironment {
}
@Override
- public <T> T instantiate(String className, Class<T> base)
- throws ReflectiveOperationException, IOException {
+ public <T> T instantiate(String className, Class<T> base) throws
ReflectiveOperationException {
return ConfigurationTypeHelper.getClassInstance(null, className, base);
}
@Override
public <T> T instantiate(TableId tableId, String className, Class<T> base)
- throws ReflectiveOperationException, IOException {
+ throws ReflectiveOperationException {
String ctx =
ClassLoaderUtil.tableContext(context.getTableConfiguration(tableId));
return ConfigurationTypeHelper.getClassInstance(ctx, className, base);
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 1f655dd304..c9352473db 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -312,8 +312,8 @@ public class FileCompactor implements
Callable<CompactionStats> {
}
@Override
- public CompactionStats call()
- throws IOException, CompactionCanceledException, InterruptedException {
+ public CompactionStats call() throws IOException,
CompactionCanceledException,
+ InterruptedException, ReflectiveOperationException {
FileSKVWriter mfw = null;
@@ -539,7 +539,8 @@ public class FileCompactor implements
Callable<CompactionStats> {
private void compactLocalityGroup(String lgName, Set<ByteSequence>
columnFamilies,
boolean inclusive, FileSKVWriter mfw, CompactionStats majCStats,
- EnumSet<FilePrefix> dropCacheFilePrefixes) throws IOException,
CompactionCanceledException {
+ EnumSet<FilePrefix> dropCacheFilePrefixes)
+ throws IOException, CompactionCanceledException,
ReflectiveOperationException {
ArrayList<FileSKVIterator> readers = new
ArrayList<>(filesToCompact.size());
Span compactSpan = TraceUtil.startSpan(this.getClass(), "compact");
try (Scope span = compactSpan.makeCurrent()) {
@@ -562,7 +563,6 @@ public class FileCompactor implements
Callable<CompactionStats> {
SortedKeyValueIterator<Key,Value> itr =
iterEnv.getTopLevelIterator(IteratorConfigUtil
.convertItersAndLoad(env.getIteratorScope(), cfsi, acuTableConf,
iterators, iterEnv));
-
itr.seek(extent.toDataRange(), columnFamilies, inclusive);
if (inclusive) {
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index add5d60756..873ce2abd9 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -55,6 +55,7 @@ import
org.apache.accumulo.core.compaction.thrift.TNextCompactionJob;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.NamespaceId;
+import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.fate.zookeeper.ServiceLock;
@@ -95,6 +96,7 @@ import org.apache.thrift.transport.TTransportException;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
@@ -130,6 +132,37 @@ public class CompactionCoordinator extends AbstractServer
implements
/* Map of queue name to last time compactor called to get a compaction job */
private static final Map<String,Long> TIME_COMPACTOR_LAST_CHECKED = new
ConcurrentHashMap<>();
+ static class FailureCounts {
+ long failures;
+ long successes;
+
+ FailureCounts(long failures, long successes) {
+ this.failures = failures;
+ this.successes = successes;
+ }
+
+ static FailureCounts incrementFailure(Object key, FailureCounts counts) {
+ if (counts == null) {
+ return new FailureCounts(1, 0);
+ }
+ counts.failures++;
+ return counts;
+ }
+
+ static FailureCounts incrementSuccess(Object key, FailureCounts counts) {
+ if (counts == null) {
+ return new FailureCounts(0, 1);
+ }
+ counts.successes++;
+ return counts;
+ }
+ }
+
+ private final ConcurrentHashMap<String,FailureCounts> failingQueues = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<String,FailureCounts> failingCompactors =
+ new ConcurrentHashMap<>();
+ private final ConcurrentHashMap<TableId,FailureCounts> failingTables = new
ConcurrentHashMap<>();
+
private final GarbageCollectionLogger gcLogger = new
GarbageCollectionLogger();
protected AuditedSecurityOperation security;
protected final AccumuloConfiguration aconf;
@@ -312,6 +345,7 @@ public class CompactionCoordinator extends AbstractServer
implements
tserverSet.startListeningForTabletServerChanges();
startDeadCompactionDetector();
+ startFailureSummaryLogging();
LOG.info("Starting loop to check tservers for compaction summaries");
while (!isShutdownRequested()) {
@@ -352,6 +386,7 @@ public class CompactionCoordinator extends AbstractServer
implements
if (coordinatorAddress.server != null) {
coordinatorAddress.server.stop();
}
+ super.close();
getShutdownComplete().set(true);
LOG.info("stop requested. exiting ... ");
try {
@@ -531,6 +566,7 @@ public class CompactionCoordinator extends AbstractServer
implements
prioTserver = QUEUE_SUMMARIES.getNextTserver(queue);
continue;
}
+
// It is possible that by the time this added that the tablet has
already canceled the
// compaction or the compactor that made this request is dead. In
these cases the compaction
// is not actually running.
@@ -602,6 +638,7 @@ public class CompactionCoordinator extends AbstractServer
implements
LOG.debug("Compaction completed, id: {}, stats: {}, extent: {}",
externalCompactionId, stats,
extent);
final var ecid = ExternalCompactionId.of(externalCompactionId);
+ captureSuccess(ecid, extent);
compactionFinalizer.commitCompaction(ecid, extent, stats.fileSize,
stats.entriesWritten);
// It's possible that RUNNING might not have an entry for this ecid in the
case
// of a coordinator restart when the Coordinator can't find the TServer
for the
@@ -611,18 +648,89 @@ public class CompactionCoordinator extends AbstractServer
implements
@Override
public void compactionFailed(TInfo tinfo, TCredentials credentials, String
externalCompactionId,
- TKeyExtent extent) throws ThriftSecurityException {
+ TKeyExtent extent, String exceptionClassName) throws
ThriftSecurityException {
// do not expect users to call this directly, expect other tservers to
call this method
if (!security.canPerformSystemActions(credentials)) {
throw new AccumuloSecurityException(credentials.getPrincipal(),
SecurityErrorCode.PERMISSION_DENIED).asThriftException();
}
KeyExtent fromThriftExtent = KeyExtent.fromThrift(extent);
- LOG.info("Compaction failed: id: {}, extent: {}", externalCompactionId,
fromThriftExtent);
+ LOG.info("Compaction failed: id: {}, extent: {}, compactor exception:{}",
externalCompactionId,
+ fromThriftExtent, exceptionClassName);
final var ecid = ExternalCompactionId.of(externalCompactionId);
+ if (exceptionClassName != null) {
+ captureFailure(ecid, fromThriftExtent);
+ }
compactionFailed(Map.of(ecid, KeyExtent.fromThrift(extent)));
}
+ private void captureFailure(ExternalCompactionId ecid, KeyExtent extent) {
+ var rc = RUNNING_CACHE.get(ecid);
+ if (rc != null) {
+ final String queue = rc.getQueueName();
+ failingQueues.compute(queue, FailureCounts::incrementFailure);
+ final String compactor = rc.getCompactorAddress();
+ failingCompactors.compute(compactor, FailureCounts::incrementFailure);
+ }
+ failingTables.compute(extent.tableId(), FailureCounts::incrementFailure);
+ }
+
+ protected void startFailureSummaryLogging() {
+ ScheduledFuture<?> future = getContext().getScheduledExecutor()
+ .scheduleWithFixedDelay(this::printStats, 0, 5, TimeUnit.MINUTES);
+ ThreadPools.watchNonCriticalScheduledTask(future);
+ }
+
+ private <T> void printStats(String logPrefix,
ConcurrentHashMap<T,FailureCounts> failureCounts,
+ boolean logSuccessAtTrace) {
+ for (var key : failureCounts.keySet()) {
+ failureCounts.compute(key, (k, counts) -> {
+ if (counts != null) {
+ Level level;
+ if (counts.failures > 0) {
+ level = Level.WARN;
+ } else if (logSuccessAtTrace) {
+ level = Level.TRACE;
+ } else {
+ level = Level.DEBUG;
+ }
+
+ LOG.atLevel(level).log("{} {} failures:{} successes:{} since last
time this was logged ",
+ logPrefix, k, counts.failures, counts.successes);
+ }
+
+ // clear the counts so they can start building up for the next logging
if this key is ever
+ // used again
+ return null;
+ });
+ }
+ }
+
+ private void printStats() {
+
+ // Remove down compactors from failing list
+ Map<String,List<HostAndPort>> allCompactors =
+ ExternalCompactionUtil.getCompactorAddrs(getContext());
+ Set<String> allCompactorAddrs = new HashSet<>();
+ allCompactors.values().forEach(l -> l.forEach(c ->
allCompactorAddrs.add(c.toString())));
+ failingCompactors.keySet().retainAll(allCompactorAddrs);
+
+ printStats("Queue", failingQueues, false);
+ printStats("Table", failingTables, false);
+ printStats("Compactor", failingCompactors, true);
+ }
+
+ private void captureSuccess(ExternalCompactionId ecid, KeyExtent extent) {
+ var rc = RUNNING_CACHE.get(ecid);
+ if (rc != null) {
+ final String queue = rc.getQueueName();
+ failingQueues.compute(queue, FailureCounts::incrementSuccess);
+ final String compactor = rc.getCompactorAddress();
+ failingCompactors.compute(compactor, FailureCounts::incrementSuccess);
+ }
+ failingTables.compute(extent.tableId(), FailureCounts::incrementSuccess);
+ }
+
void compactionFailed(Map<ExternalCompactionId,KeyExtent> compactions) {
compactionFinalizer.failCompactions(compactions);
compactions.forEach((k, v) -> recordCompletion(k));
diff --git
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
index 8b320aac34..a38bcb61a0 100644
---
a/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
+++
b/server/compaction-coordinator/src/test/java/org/apache/accumulo/coordinator/CompactionCoordinatorTest.java
@@ -117,6 +117,9 @@ public class CompactionCoordinatorTest {
@Override
protected void startDeadCompactionDetector() {}
+ @Override
+ protected void startFailureSummaryLogging() {}
+
@Override
protected long getTServerCheckInterval() {
gracefulShutdown(null);
@@ -188,7 +191,7 @@ public class CompactionCoordinatorTest {
@Override
public void compactionFailed(TInfo tinfo, TCredentials credentials, String
externalCompactionId,
- TKeyExtent extent) throws ThriftSecurityException {}
+ TKeyExtent extent, String exceptionClassName) throws
ThriftSecurityException {}
void setMetadataCompactionIds(Set<ExternalCompactionId> mci) {
metadataCompactionIds = mci;
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index ce18a0ef9e..a317525b50 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -28,6 +28,7 @@ import java.security.SecureRandom;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -38,6 +39,7 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.function.Supplier;
@@ -124,6 +126,7 @@ import com.beust.jcommander.Parameter;
import com.google.common.base.Preconditions;
import io.micrometer.core.instrument.FunctionCounter;
+import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.LongTaskTimer;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
@@ -145,6 +148,51 @@ public class Compactor extends AbstractServer
private static final SecureRandom random = new SecureRandom();
+ private static class ConsecutiveErrorHistory extends
HashMap<TableId,HashMap<String,AtomicLong>> {
+
+ private static final long serialVersionUID = 1L;
+
+ public long getTotalFailures() {
+ long total = 0;
+ for (TableId tid : keySet()) {
+ total += getTotalTableFailures(tid);
+ }
+ return total;
+ }
+
+ public long getTotalTableFailures(TableId tid) {
+ long total = 0;
+ for (AtomicLong failures : get(tid).values()) {
+ total += failures.get();
+ }
+ return total;
+ }
+
+ /**
+ * Add error for table
+ *
+ * @param tid table id
+ * @param error exception
+ */
+ public void addError(TableId tid, Throwable error) {
+ computeIfAbsent(tid, t -> new HashMap<String,AtomicLong>())
+ .computeIfAbsent(error.toString(), e -> new
AtomicLong(0)).incrementAndGet();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ for (TableId tid : keySet()) {
+ buf.append("\nTable: ").append(tid);
+ for (Entry<String,AtomicLong> error : get(tid).entrySet()) {
+ buf.append("\n\tException: ").append(error.getKey()).append(",
count: ")
+ .append(error.getValue().get());
+ }
+ }
+ return buf.toString();
+ }
+ }
+
public static class CompactorServerOpts extends ServerOpts {
@Parameter(required = true, names = {"-q", "--queue"}, description =
"compaction queue name")
private String queueName = null;
@@ -171,6 +219,11 @@ public class Compactor extends AbstractServer
private ServerAddress compactorAddress = null;
private final AtomicBoolean compactionRunning = new AtomicBoolean(false);
+ private final ConsecutiveErrorHistory errorHistory = new
ConsecutiveErrorHistory();
+ private final AtomicLong completed = new AtomicLong(0);
+ private final AtomicLong cancelled = new AtomicLong(0);
+ private final AtomicLong failed = new AtomicLong(0);
+ private final AtomicLong terminated = new AtomicLong(0);
protected Compactor(CompactorServerOpts opts, String[] args) {
super("compactor", opts, args);
@@ -186,6 +239,26 @@ public class Compactor extends AbstractServer
return FileCompactor.getTotalEntriesWritten();
}
+ private double getConsecutiveFailures() {
+ return errorHistory.getTotalFailures();
+ }
+
+ private double getCancellations() {
+ return cancelled.get();
+ }
+
+ private double getCompletions() {
+ return completed.get();
+ }
+
+ private double getFailures() {
+ return failed.get();
+ }
+
+ private double getTerminated() {
+ return terminated.get();
+ }
+
@Override
public void registerMetrics(MeterRegistry registry) {
super.registerMetrics(registry);
@@ -196,6 +269,22 @@ public class Compactor extends AbstractServer
.builder(METRICS_COMPACTOR_ENTRIES_WRITTEN, this,
Compactor::getTotalEntriesWritten)
.description("Number of entries written by all compactions that have
run on this compactor")
.register(registry);
+ FunctionCounter
+ .builder(METRICS_COMPACTOR_COMPACTIONS_CANCELLED, this,
Compactor::getCancellations)
+ .description("Number compactions that have been cancelled on this
compactor")
+ .register(registry);
+ FunctionCounter
+ .builder(METRICS_COMPACTOR_COMPACTIONS_COMPLETED, this,
Compactor::getCompletions)
+ .description("Number compactions that have succeeded on this
compactor").register(registry);
+ FunctionCounter.builder(METRICS_COMPACTOR_COMPACTIONS_FAILED, this,
Compactor::getFailures)
+ .description("Number compactions that have failed on this
compactor").register(registry);
+ FunctionCounter.builder(METRICS_COMPACTOR_FAILURES_TERMINATION, this,
Compactor::getTerminated)
+ .description("Will report 1 if the Compactor terminates due to
consecutive failure, else 0")
+ .register(registry);
+ Gauge.builder(METRICS_COMPACTOR_FAILURES_CONSECUTIVE, this,
Compactor::getConsecutiveFailures)
+ .description(
+ "Number of consecutive compaction failures. Resets to zero on a
successful compaction")
+ .register(registry);
LongTaskTimer timer = LongTaskTimer.builder(METRICS_COMPACTOR_MAJC_STUCK)
.description("Number and duration of stuck major
compactions").register(registry);
CompactionWatcher.setTimer(timer);
@@ -395,16 +484,17 @@ public class Compactor extends AbstractServer
* Notify the CompactionCoordinator the job failed
*
* @param job current compaction job
+ * @param exception cause of failure
* @throws RetriesExceededException thrown when retries have been exceeded
*/
- protected void updateCompactionFailed(TExternalCompactionJob job)
+ protected void updateCompactionFailed(TExternalCompactionJob job, Throwable
exception)
throws RetriesExceededException {
RetryableThriftCall<String> thriftCall =
new RetryableThriftCall<>(1000, RetryableThriftCall.MAX_WAIT_TIME, 25,
() -> {
Client coordinatorClient = getCoordinatorClient();
try {
coordinatorClient.compactionFailed(TraceUtil.traceInfo(),
getContext().rpcCreds(),
- job.getExternalCompactionId(), job.extent);
+ job.getExternalCompactionId(), job.extent,
exception.getClass().getName());
return "";
} finally {
ThriftUtil.returnClient(coordinatorClient, getContext());
@@ -666,6 +756,48 @@ public class Compactor extends AbstractServer
clientAddress, queueName);
}
+ private void performFailureProcessing(ConsecutiveErrorHistory errorHistory)
+ throws InterruptedException {
+ // consecutive failure processing
+ final long totalFailures = errorHistory.getTotalFailures();
+ if (totalFailures > 0) {
+ LOG.warn("This Compactor has had {} consecutive failures. Failures: {}",
totalFailures,
+ errorHistory.toString()); // ErrorHistory.toString not invoked
without .toString
+ final long failureThreshold =
+
getConfiguration().getCount(Property.COMPACTOR_FAILURE_TERMINATION_THRESHOLD);
+ if (failureThreshold > 0 && totalFailures >= failureThreshold) {
+ LOG.error(
+ "Consecutive failures ({}) has met or exceeded failure threshold
({}), exiting...",
+ totalFailures, failureThreshold);
+ terminated.incrementAndGet();
+ throw new InterruptedException(
+ "Consecutive failures has exceeded failure threshold, exiting...");
+ }
+ if (totalFailures
+ >=
getConfiguration().getCount(Property.COMPACTOR_FAILURE_BACKOFF_THRESHOLD)) {
+ final long interval =
+
getConfiguration().getTimeInMillis(Property.COMPACTOR_FAILURE_BACKOFF_INTERVAL);
+ if (interval > 0) {
+ final long max =
+
getConfiguration().getTimeInMillis(Property.COMPACTOR_FAILURE_BACKOFF_RESET);
+ final long backoffMS = Math.min(max, interval * totalFailures);
+ LOG.warn(
+ "Not starting next compaction for {}ms due to consecutive
failures. Check the log and address any issues.",
+ backoffMS);
+ if (backoffMS == max) {
+ errorHistory.clear();
+ }
+ Thread.sleep(backoffMS);
+ } else if (interval == 0) {
+ LOG.info(
+ "This Compactor has had {} consecutive failures and failure
backoff is disabled.",
+ totalFailures);
+ errorHistory.clear();
+ }
+ }
+ }
+ }
+
@Override
public void run() {
@@ -712,6 +844,8 @@ public class Compactor extends AbstractServer
err.set(null);
JOB_HOLDER.reset();
+ performFailureProcessing(errorHistory);
+
TExternalCompactionJob job;
try {
TNextCompactionJob next = getNextJob(getNextId());
@@ -809,14 +943,15 @@ public class Compactor extends AbstractServer
new TCompactionStatusUpdate(TCompactionState.CANCELLED,
"Compaction cancelled",
-1, -1, -1, fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
- updateCompactionFailed(job);
+ updateCompactionFailed(job, null);
+ cancelled.incrementAndGet();
} catch (RetriesExceededException e) {
LOG.error("Error updating coordinator with compaction
cancellation.", e);
} finally {
currentCompactionId.set(null);
}
} else if (err.get() != null) {
- KeyExtent fromThriftExtent =
KeyExtent.fromThrift(job.getExtent());
+ final KeyExtent fromThriftExtent =
KeyExtent.fromThrift(job.getExtent());
try {
LOG.info("Updating coordinator with compaction failure: id:
{}, extent: {}",
job.getExternalCompactionId(), fromThriftExtent);
@@ -824,7 +959,9 @@ public class Compactor extends AbstractServer
TCompactionState.FAILED, "Compaction failed due to: " +
err.get().getMessage(),
-1, -1, -1, fcr.getCompactionAge().toNanos());
updateCompactionState(job, update);
- updateCompactionFailed(job);
+ updateCompactionFailed(job, err.get());
+ failed.incrementAndGet();
+ errorHistory.addError(fromThriftExtent.tableId(), err.get());
} catch (RetriesExceededException e) {
LOG.error("Error updating coordinator with compaction failure:
id: {}, extent: {}",
job.getExternalCompactionId(), fromThriftExtent, e);
@@ -835,6 +972,9 @@ public class Compactor extends AbstractServer
try {
LOG.trace("Updating coordinator with compaction completion.");
updateCompactionCompleted(job, JOB_HOLDER.getStats());
+ completed.incrementAndGet();
+ // job completed successfully, clear the error history
+ errorHistory.clear();
} catch (RetriesExceededException e) {
LOG.error(
"Error updating coordinator with compaction completion,
cancelling compaction.",
@@ -896,6 +1036,7 @@ public class Compactor extends AbstractServer
}
gcLogger.logGCInfo(getConfiguration());
+ super.close();
getShutdownComplete().set(true);
LOG.info("stop requested. exiting ... ");
try {
diff --git
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
index 3489e8fef8..228b6911a5 100644
---
a/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
+++
b/server/compactor/src/test/java/org/apache/accumulo/compactor/CompactorTest.java
@@ -272,7 +272,7 @@ public class CompactorTest {
}
@Override
- protected void updateCompactionFailed(TExternalCompactionJob job)
+ protected void updateCompactionFailed(TExternalCompactionJob job,
Throwable exception)
throws RetriesExceededException {
failedCalled = true;
}
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index a7cfca69b8..902642a6c0 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -344,6 +344,7 @@ public class SimpleGarbageCollector extends AbstractServer
gracefulShutdown(getContext().rpcCreds());
}
}
+ super.close();
getShutdownComplete().set(true);
log.info("stop requested. exiting ... ");
try {
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 36f0282102..90346bdea0 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -1507,6 +1507,7 @@ public class Manager extends AbstractServer implements
LiveTServerSet.Listener,
throw new IllegalStateException("Exception waiting on watcher", e);
}
}
+ super.close();
getShutdownComplete().set(true);
log.info("stop requested. exiting ... ");
try {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
index f395c4134a..cb98c24d35 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ConditionCheckerContext.java
@@ -86,7 +86,7 @@ public class ConditionCheckerContext {
}
SortedKeyValueIterator<Key,Value>
buildIterator(SortedKeyValueIterator<Key,Value> systemIter,
- TCondition tc) throws IOException {
+ TCondition tc) throws IOException, ReflectiveOperationException {
ArrayByteSequence key = new ArrayByteSequence(tc.iterators);
MergedIterConfig mic = mergedIterCache.get(key);
@@ -111,7 +111,7 @@ public class ConditionCheckerContext {
}
boolean checkConditions(SortedKeyValueIterator<Key,Value> systemIter,
- ServerConditionalMutation scm) throws IOException {
+ ServerConditionalMutation scm) throws IOException,
ReflectiveOperationException {
boolean add = true;
for (TCondition tc : scm.getConditions()) {
@@ -157,7 +157,8 @@ public class ConditionCheckerContext {
this.results = results;
}
- public void check(SortedKeyValueIterator<Key,Value> systemIter) throws
IOException {
+ public void check(SortedKeyValueIterator<Key,Value> systemIter)
+ throws IOException, ReflectiveOperationException {
checkArgument(!checked, "check() method should only be called once");
checked = true;
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 78ddee2b85..ec89e668af 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -463,6 +463,7 @@ public class ScanServer extends AbstractServer
}
gcLogger.logGCInfo(getConfiguration());
+ super.close();
getShutdownComplete().set(true);
LOG.info("stop requested. exiting ... ");
try {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
index 97dad09224..571acb5a3f 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletClientHandler.java
@@ -751,7 +751,7 @@ public class TabletClientHandler implements
TabletClientService.Iface {
private void checkConditions(Map<KeyExtent,List<ServerConditionalMutation>>
updates,
ArrayList<TCMResult> results, ConditionalSession cs, List<String>
symbols)
- throws IOException {
+ throws IOException, ReflectiveOperationException {
Iterator<Entry<KeyExtent,List<ServerConditionalMutation>>> iter =
updates.entrySet().iterator();
final CompressedIterators compressedIters = new
CompressedIterators(symbols);
@@ -905,7 +905,7 @@ public class TabletClientHandler implements
TabletClientService.Iface {
private Map<KeyExtent,List<ServerConditionalMutation>>
conditionalUpdate(ConditionalSession cs,
Map<KeyExtent,List<ServerConditionalMutation>> updates,
ArrayList<TCMResult> results,
- List<String> symbols) throws IOException {
+ List<String> symbols) throws IOException, ReflectiveOperationException {
// sort each list of mutations, this is done to avoid deadlock and doing
seeks in order is
// more efficient and detect duplicate rows.
ConditionalMutationSet.sortConditionalMutations(updates);
@@ -1023,7 +1023,7 @@ public class TabletClientHandler implements
TabletClientService.Iface {
}
return results;
- } catch (IOException ioe) {
+ } catch (IOException | ReflectiveOperationException ioe) {
throw new TException(ioe);
} catch (Exception e) {
log.warn("Exception returned for conditionalUpdate {}", e);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 0cf5c2ec47..e66f6ae2d4 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -1044,7 +1044,7 @@ public class TabletServer extends AbstractServer
}
gcLogger.logGCInfo(getConfiguration());
-
+ super.close();
getShutdownComplete().set(true);
log.info("TServerInfo: stop requested. exiting ... ");
try {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index 56a2006456..be4672107a 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -278,7 +278,7 @@ public class CompactableUtils {
String context = ClassLoaderUtil.tableContext(tableConfig);
try {
return ConfigurationTypeHelper.getClassInstance(context, className,
baseClass);
- } catch (IOException | ReflectiveOperationException e) {
+ } catch (ReflectiveOperationException e) {
throw new RuntimeException(e);
}
}
@@ -559,8 +559,8 @@ public class CompactableUtils {
*/
static CompactionStats compact(Tablet tablet, CompactionJob job,
CompactableImpl.CompactionInfo cInfo, CompactionEnv cenv,
- Map<StoredTabletFile,DataFileValue> compactFiles, TabletFile tmpFileName)
- throws IOException, CompactionCanceledException, InterruptedException {
+ Map<StoredTabletFile,DataFileValue> compactFiles, TabletFile
tmpFileName) throws IOException,
+ CompactionCanceledException, InterruptedException,
ReflectiveOperationException {
TableConfiguration tableConf = tablet.getTableConfiguration();
AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf,
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index f265056c6a..93f73dbd2e 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -115,7 +115,7 @@ public class MinorCompactor extends FileCompactor {
}
return ret;
- } catch (IOException | UnsatisfiedLinkError e) {
+ } catch (IOException | ReflectiveOperationException |
UnsatisfiedLinkError e) {
log.warn("MinC failed ({}) to create {} retrying ...",
e.getMessage(), outputFileName);
ProblemReports.getInstance(tabletServer.getContext()).report(
new ProblemReport(getExtent().tableId(), ProblemType.FILE_WRITE,
outputFileName, e));
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
index 85f355f7c9..0f30f7673e 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanDataSource.java
@@ -125,12 +125,17 @@ class ScanDataSource implements DataSource {
@Override
public SortedKeyValueIterator<Key,Value> iterator() throws IOException {
if (iter == null) {
- iter = createIterator();
+ try {
+ iter = createIterator();
+ } catch (ReflectiveOperationException e) {
+ throw new IOException("Error creating iterator", e);
+ }
}
return iter;
}
- private SortedKeyValueIterator<Key,Value> createIterator() throws
IOException {
+ private SortedKeyValueIterator<Key,Value> createIterator()
+ throws IOException, ReflectiveOperationException {
Map<TabletFile,DataFileValue> files;
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 8e296d400d..71f4efca1c 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -457,7 +457,7 @@ public class Tablet extends TabletBase {
}
public void checkConditions(ConditionChecker checker, Authorizations
authorizations,
- AtomicBoolean iFlag) throws IOException {
+ AtomicBoolean iFlag) throws IOException, ReflectiveOperationException {
ScanParameters scanParams = new ScanParameters(-1, authorizations,
Collections.emptySet(), null,
null, false, null, -1, null);
@@ -469,7 +469,7 @@ public class Tablet extends TabletBase {
try {
SortedKeyValueIterator<Key,Value> iter = new
SourceSwitchingIterator(dataSource);
checker.check(iter);
- } catch (IOException | RuntimeException e) {
+ } catch (IOException | RuntimeException | ReflectiveOperationException e) {
sawException = true;
throw e;
} finally {
diff --git a/shell/src/main/java/org/apache/accumulo/shell/Shell.java
b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
index 5e7a84a02f..0eb3214bda 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/Shell.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/Shell.java
@@ -67,6 +67,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.thrift.TConstraintViolationSummary;
+import
org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException;
import
org.apache.accumulo.core.tabletserver.thrift.ConstraintViolationException;
import org.apache.accumulo.core.util.BadArgumentException;
import org.apache.accumulo.core.util.format.DefaultFormatter;
@@ -469,7 +470,8 @@ public class Shell extends ShellOptions implements
KeywordExecutable {
}
public ClassLoader getClassLoader(final CommandLine cl, final Shell
shellState)
- throws AccumuloException, TableNotFoundException,
AccumuloSecurityException {
+ throws AccumuloException, TableNotFoundException,
AccumuloSecurityException,
+ ContextClassLoaderException {
boolean tables =
cl.hasOption(OptUtil.tableOpt().getOpt()) ||
!shellState.getTableName().isEmpty();
diff --git
a/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java
b/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java
index 6e4eed888f..f9e8be1f56 100644
--- a/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java
+++ b/shell/src/main/java/org/apache/accumulo/shell/commands/SetIterCommand.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.iterators.user.RegExFilter;
import org.apache.accumulo.core.iterators.user.ReqVisFilter;
import org.apache.accumulo.core.iterators.user.VersioningIterator;
import org.apache.accumulo.core.metadata.MetadataTable;
+import
org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.shell.Shell.Command;
import org.apache.accumulo.shell.ShellCommandException;
@@ -60,7 +61,7 @@ public class SetIterCommand extends Command {
@Override
public int execute(final String fullCommand, final CommandLine cl, final
Shell shellState)
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException, IOException,
- ShellCommandException {
+ ShellCommandException, ContextClassLoaderException {
boolean tables =
cl.hasOption(OptUtil.tableOpt().getOpt()) ||
!shellState.getTableName().isEmpty();
diff --git
a/shell/src/main/java/org/apache/accumulo/shell/commands/SetScanIterCommand.java
b/shell/src/main/java/org/apache/accumulo/shell/commands/SetScanIterCommand.java
index 2cb2295a8d..e5c67291ac 100644
---
a/shell/src/main/java/org/apache/accumulo/shell/commands/SetScanIterCommand.java
+++
b/shell/src/main/java/org/apache/accumulo/shell/commands/SetScanIterCommand.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.security.Authorizations;
+import
org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.shell.ShellCommandException;
import org.apache.commons.cli.CommandLine;
@@ -43,7 +44,7 @@ public class SetScanIterCommand extends SetIterCommand {
@Override
public int execute(final String fullCommand, final CommandLine cl, final
Shell shellState)
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException, IOException,
- ShellCommandException {
+ ShellCommandException, ContextClassLoaderException {
Shell.log.warn("Deprecated, use {}", new SetShellIterCommand().getName());
return super.execute(fullCommand, cl, shellState);
}
diff --git
a/shell/src/main/java/org/apache/accumulo/shell/commands/SetShellIterCommand.java
b/shell/src/main/java/org/apache/accumulo/shell/commands/SetShellIterCommand.java
index f10c13c869..336ece806f 100644
---
a/shell/src/main/java/org/apache/accumulo/shell/commands/SetShellIterCommand.java
+++
b/shell/src/main/java/org/apache/accumulo/shell/commands/SetShellIterCommand.java
@@ -27,6 +27,7 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.TableNotFoundException;
+import
org.apache.accumulo.core.spi.common.ContextClassLoaderFactory.ContextClassLoaderException;
import org.apache.accumulo.shell.Shell;
import org.apache.accumulo.shell.ShellCommandException;
import org.apache.commons.cli.CommandLine;
@@ -38,7 +39,7 @@ public class SetShellIterCommand extends SetIterCommand {
@Override
public int execute(final String fullCommand, final CommandLine cl, final
Shell shellState)
throws AccumuloException, AccumuloSecurityException,
TableNotFoundException, IOException,
- ShellCommandException {
+ ShellCommandException, ContextClassLoaderException {
return super.execute(fullCommand, cl, shellState);
}
diff --git
a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java
b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java
index c75b931b87..f1f17795f9 100644
---
a/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java
+++
b/start/src/main/java/org/apache/accumulo/start/classloader/vfs/AccumuloVFSClassLoader.java
@@ -194,12 +194,8 @@ public class AccumuloVFSClassLoader {
return new AccumuloReloadingVFSClassLoader(dynamicCPath, generateVfs(),
wrapper, 1000, true);
}
- public static ClassLoader getClassLoader() {
- try {
- return getClassLoader_Internal();
- } catch (IOException e) {
- throw new UncheckedIOException(e);
- }
+ public static ClassLoader getClassLoader() throws IOException {
+ return getClassLoader_Internal();
}
private static ClassLoader getClassLoader_Internal() throws IOException {
@@ -417,19 +413,25 @@ public class AccumuloVFSClassLoader {
}
}
- public static ClassLoader getContextClassLoader(String contextName) {
- try {
- return getContextManager().getClassLoader(contextName);
- } catch (IOException e) {
- throw new UncheckedIOException(
- "Error getting context class loader for context: " + contextName, e);
- }
+ public static ClassLoader getContextClassLoader(String contextName) throws
IOException {
+ return getContextManager().getClassLoader(contextName);
}
public static synchronized ContextManager getContextManager() throws
IOException {
if (contextManager == null) {
getClassLoader();
- contextManager = new ContextManager(generateVfs(),
AccumuloVFSClassLoader::getClassLoader);
+ try {
+ contextManager = new ContextManager(generateVfs(), () -> {
+ try {
+ return getClassLoader();
+ } catch (IOException e) {
+ // throw runtime, then unwrap it.
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (UncheckedIOException uioe) {
+ throw uioe.getCause();
+ }
}
return contextManager;
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ClassLoaderContextCompactionIT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ClassLoaderContextCompactionIT.java
new file mode 100644
index 0000000000..15a539bf5c
--- /dev/null
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ClassLoaderContextCompactionIT.java
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.test.compaction;
+
+import static org.apache.accumulo.core.conf.Property.TABLE_FILE_MAX;
+import static org.apache.accumulo.core.conf.Property.TABLE_MAJC_RATIO;
+import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.QUEUE1;
+import static
org.apache.accumulo.test.compaction.ExternalCompactionTestUtils.createTable;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.accumulo.compactor.Compactor;
+import org.apache.accumulo.coordinator.CompactionCoordinator;
+import org.apache.accumulo.core.client.Accumulo;
+import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.IteratorSetting;
+import org.apache.accumulo.core.client.admin.CompactionConfig;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
+import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
+import org.apache.accumulo.core.metadata.schema.TabletsMetadata;
+import org.apache.accumulo.core.metrics.MetricsProducer;
+import org.apache.accumulo.core.spi.metrics.LoggingMeterRegistryFactory;
+import org.apache.accumulo.core.util.HostAndPort;
+import org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.functional.ReadWriteIT;
+import org.apache.accumulo.test.metrics.TestStatsDRegistryFactory;
+import org.apache.accumulo.test.metrics.TestStatsDSink;
+import org.apache.accumulo.test.metrics.TestStatsDSink.Metric;
+import org.apache.accumulo.test.util.Wait;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClassLoaderContextCompactionIT extends AccumuloClusterHarness {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(ClassLoaderContextCompactionIT.class);
+ private static TestStatsDSink sink;
+
+ @BeforeAll
+ public static void before() throws Exception {
+ sink = new TestStatsDSink();
+ }
+
+ @AfterAll
+ public static void after() throws Exception {
+ sink.close();
+ }
+
+ @Override
+ public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration
coreSite) {
+ ExternalCompactionTestUtils.configureMiniCluster(cfg, coreSite);
+ // After 1 failure start backing off by 5s.
+ // After 3 failures, terminate the Compactor
+ cfg.setProperty(Property.COMPACTOR_FAILURE_BACKOFF_THRESHOLD, "1");
+ cfg.setProperty(Property.COMPACTOR_FAILURE_BACKOFF_INTERVAL, "5s");
+ cfg.setProperty(Property.COMPACTOR_FAILURE_BACKOFF_RESET, "10m");
+ cfg.setProperty(Property.COMPACTOR_FAILURE_TERMINATION_THRESHOLD, "3");
+ cfg.setNumCompactors(2);
+ // Tell the server processes to use a StatsDMeterRegistry and the simple
logging registry
+ // that will be configured to push all metrics to the sink we started.
+ cfg.setProperty(Property.GENERAL_MICROMETER_ENABLED, "true");
+ cfg.setProperty(Property.GENERAL_MICROMETER_JVM_METRICS_ENABLED, "true");
+ cfg.setProperty("general.custom.metrics.opts.logging.step", "1s");
+ String clazzList = LoggingMeterRegistryFactory.class.getName() + ","
+ + TestStatsDRegistryFactory.class.getName();
+ cfg.setProperty(Property.GENERAL_MICROMETER_FACTORY, clazzList);
+ Map<String,String> sysProps =
Map.of(TestStatsDRegistryFactory.SERVER_HOST, "127.0.0.1",
+ TestStatsDRegistryFactory.SERVER_PORT,
Integer.toString(sink.getPort()));
+ cfg.setSystemProperties(sysProps);
+ }
+
+ @Test
+ public void testClassLoaderContextErrorKillsCompactor() throws Exception {
+
+ final AtomicBoolean shutdownTailer = new AtomicBoolean(false);
+ final AtomicLong cancellations = new AtomicLong(0);
+ final AtomicLong completions = new AtomicLong(0);
+ final AtomicLong failures = new AtomicLong(0);
+ final AtomicLong consecutive = new AtomicLong(0);
+ final AtomicLong terminations = new AtomicLong(0);
+
+ final Thread thread = Threads.createNonCriticalThread("metric-tailer", ()
-> {
+ while (!shutdownTailer.get()) {
+ List<String> statsDMetrics = sink.getLines();
+ for (String s : statsDMetrics) {
+ if (shutdownTailer.get()) {
+ break;
+ }
+ if
(s.startsWith(MetricsProducer.METRICS_COMPACTOR_COMPACTIONS_CANCELLED)) {
+ Metric m = TestStatsDSink.parseStatsDMetric(s);
+ LOG.info("{}", m);
+ cancellations.set(Long.parseLong(m.getValue()));
+ } else if
(s.startsWith(MetricsProducer.METRICS_COMPACTOR_COMPACTIONS_COMPLETED)) {
+ Metric m = TestStatsDSink.parseStatsDMetric(s);
+ LOG.info("{}", m);
+ completions.set(Long.parseLong(m.getValue()));
+ } else if
(s.startsWith(MetricsProducer.METRICS_COMPACTOR_COMPACTIONS_FAILED)) {
+ Metric m = TestStatsDSink.parseStatsDMetric(s);
+ LOG.info("{}", m);
+ failures.set(Long.parseLong(m.getValue()));
+ } else if
(s.startsWith(MetricsProducer.METRICS_COMPACTOR_FAILURES_TERMINATION)) {
+ Metric m = TestStatsDSink.parseStatsDMetric(s);
+ LOG.info("{}", m);
+ terminations.set(Long.parseLong(m.getValue()));
+ } else if
(s.startsWith(MetricsProducer.METRICS_COMPACTOR_FAILURES_CONSECUTIVE)) {
+ Metric m = TestStatsDSink.parseStatsDMetric(s);
+ LOG.info("{}", m);
+ consecutive.set(Long.parseLong(m.getValue()));
+ }
+
+ }
+ }
+ });
+ thread.start();
+
+ final String table1 = this.getUniqueNames(1)[0];
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+
getCluster().getClusterControl().startCoordinator(CompactionCoordinator.class);
+ getCluster().getClusterControl().startCompactors(Compactor.class, 1,
QUEUE1);
+ Wait.waitFor(
+ () -> ExternalCompactionUtil.countCompactors(QUEUE1, (ClientContext)
client) == 1);
+ List<HostAndPort> compactors =
+ ExternalCompactionUtil.getCompactorAddrs((ClientContext)
client).get(QUEUE1);
+ assertEquals(1, compactors.size());
+ final HostAndPort compactorAddr = compactors.get(0);
+ createTable(client, table1, "cs1");
+ client.tableOperations().setProperty(table1, TABLE_FILE_MAX.getKey(),
"1001");
+ client.tableOperations().setProperty(table1, TABLE_MAJC_RATIO.getKey(),
"1001");
+ TableId tid =
TableId.of(client.tableOperations().tableIdMap().get(table1));
+
+ ReadWriteIT.ingest(client, 1000, 1, 1, 0, "colf", table1, 20);
+
+ Ample ample = ((ClientContext) client).getAmple();
+ try (
+ TabletsMetadata tms =
ample.readTablets().forTable(tid).fetch(ColumnType.FILES).build()) {
+ TabletMetadata tm = tms.iterator().next();
+ assertEquals(50, tm.getFiles().size());
+ }
+
+ final MiniAccumuloClusterImpl cluster = (MiniAccumuloClusterImpl)
getCluster();
+ final FileSystem fs = cluster.getFileSystem();
+
+ // Create the context directory in HDFS
+ final org.apache.hadoop.fs.Path contextDir = fs.makeQualified(new
org.apache.hadoop.fs.Path(
+ cluster.getConfig().getAccumuloDir().toString(), "classpath"));
+ assertTrue(fs.mkdirs(contextDir));
+
+ // Copy the FooFilter.jar to the context dir
+ final org.apache.hadoop.fs.Path src = new org.apache.hadoop.fs.Path(
+ System.getProperty("java.io.tmpdir") +
"/classes/org/apache/accumulo/test/FooFilter.jar");
+ final org.apache.hadoop.fs.Path dst = new
org.apache.hadoop.fs.Path(contextDir, "Test.jar");
+ fs.copyFromLocalFile(src, dst);
+ assertTrue(fs.exists(dst));
+
+ // Define a classloader context that references Test.jar
+ @SuppressWarnings("removal")
+ final Property p = Property.VFS_CONTEXT_CLASSPATH_PROPERTY;
+ client.instanceOperations().setProperty(p.getKey() + "undefined",
dst.toUri().toString());
+
+ // Force the classloader to look in the context jar first, don't
delegate to the parent first
+
client.instanceOperations().setProperty("general.vfs.context.classpath.undefined.delegation",
+ "post");
+
+ // Set the context on the table
+ client.tableOperations().setProperty(table1,
Property.TABLE_CLASSLOADER_CONTEXT.getKey(),
+ "undefined");
+
+ final IteratorSetting cfg =
+ new IteratorSetting(101, "FooFilter",
"org.apache.accumulo.test.FooFilter");
+ client.tableOperations().attachIterator(table1, cfg,
EnumSet.of(IteratorScope.majc));
+
+ // delete Test.jar, so that the classloader will fail
+ assertTrue(fs.delete(dst, false));
+
+ assertEquals(0, cancellations.get());
+ assertEquals(0, completions.get());
+ assertEquals(0, failures.get());
+ assertEquals(0, terminations.get());
+ assertEquals(0, consecutive.get());
+
+ // Start a compaction. The missing jar should cause a failure
+ client.tableOperations().compact(table1, new
CompactionConfig().setWait(false));
+ Wait.waitFor(
+ () -> ExternalCompactionUtil.getRunningCompaction(compactorAddr,
(ClientContext) client)
+ == null);
+ assertEquals(1, ExternalCompactionUtil.countCompactors(QUEUE1,
(ClientContext) client));
+ Wait.waitFor(() -> failures.get() == 1);
+ Wait.waitFor(() -> consecutive.get() == 1);
+
+ Wait.waitFor(() -> failures.get() == 0);
+ client.tableOperations().compact(table1, new
CompactionConfig().setWait(false));
+ Wait.waitFor(
+ () -> ExternalCompactionUtil.getRunningCompaction(compactorAddr,
(ClientContext) client)
+ == null);
+ assertEquals(1, ExternalCompactionUtil.countCompactors(QUEUE1,
(ClientContext) client));
+ Wait.waitFor(() -> failures.get() == 1);
+ Wait.waitFor(() -> consecutive.get() == 2);
+
+ Wait.waitFor(() -> failures.get() == 0);
+ client.tableOperations().compact(table1, new
CompactionConfig().setWait(false));
+ Wait.waitFor(
+ () -> ExternalCompactionUtil.getRunningCompaction(compactorAddr,
(ClientContext) client)
+ == null);
+ assertEquals(1, ExternalCompactionUtil.countCompactors(QUEUE1,
(ClientContext) client));
+ Wait.waitFor(() -> failures.get() == 1);
+ Wait.waitFor(() -> consecutive.get() == 3);
+
+ // Three failures have occurred, Compactor should shut down.
+ Wait.waitFor(
+ () -> ExternalCompactionUtil.countCompactors(QUEUE1, (ClientContext)
client) == 0);
+ Wait.waitFor(() -> terminations.get() == 1);
+ assertEquals(0, cancellations.get());
+ assertEquals(0, completions.get());
+
+ } finally {
+ shutdownTailer.set(true);
+ thread.join();
+ getCluster().getClusterControl().stopAllServers(ServerType.COMPACTOR);
+
getCluster().getClusterControl().stopAllServers(ServerType.COMPACTION_COORDINATOR);
+ }
+
+ }
+
+}
diff --git a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
index b7cefe3c8b..026f392a01 100644
--- a/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/metrics/MetricsIT.java
@@ -103,6 +103,11 @@ public class MetricsIT extends ConfigurableMacBase
implements MetricsProducer {
// @formatter:off
Set<String> unexpectedMetrics =
Set.of(METRICS_COMPACTOR_MAJC_STUCK,
+ METRICS_COMPACTOR_COMPACTIONS_CANCELLED,
+ METRICS_COMPACTOR_COMPACTIONS_COMPLETED,
+ METRICS_COMPACTOR_COMPACTIONS_FAILED,
+ METRICS_COMPACTOR_FAILURES_CONSECUTIVE,
+ METRICS_COMPACTOR_FAILURES_TERMINATION,
METRICS_REPLICATION_QUEUE,
METRICS_SCAN_YIELDS,
METRICS_UPDATE_ERRORS);
diff --git
a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
index bba4b355bc..62e034bbef 100644
---
a/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
+++
b/test/src/main/java/org/apache/accumulo/test/performance/scan/CollectTabletStats.java
@@ -429,7 +429,7 @@ public class CollectTabletStats {
Collection<SortedKeyValueIterator<Key,Value>> mapfiles, Authorizations
authorizations,
byte[] defaultLabels, HashSet<Column> columnSet, List<IterInfo> ssiList,
Map<String,Map<String,String>> ssio, boolean useTableIterators,
TableConfiguration conf,
- ServerContext context) throws IOException {
+ ServerContext context) throws IOException, ReflectiveOperationException {
SortedMapIterator smi = new SortedMapIterator(new TreeMap<>());