This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 990c6edebb interrupts compactions and adds IT to verify (#5395)
990c6edebb is described below
commit 990c6edebb001d5212c354f9e0698cba7713c330
Author: Keith Turner <[email protected]>
AuthorDate: Thu Mar 13 12:16:05 2025 -0400
interrupts compactions and adds IT to verify (#5395)
When a tablet is closed and compaction is running will now call
Thread.interrupt() on the thread running the compaction.
---
.../accumulo/server/compaction/CompactionInfo.java | 2 +-
.../accumulo/server/compaction/FileCompactor.java | 49 +++++++--
.../accumulo/tserver/tablet/CompactableUtils.java | 4 +-
.../accumulo/tserver/tablet/MinorCompactor.java | 3 +-
.../accumulo/test/functional/CompactionIT.java | 109 +++++++++++++++++++++
.../accumulo/test/functional/SlowIterator.java | 29 +++++-
6 files changed, 181 insertions(+), 15 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
index b505c38cb9..35bf574fb1 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/CompactionInfo.java
@@ -65,7 +65,7 @@ public class CompactionInfo {
}
public Thread getThread() {
- return compactor.thread;
+ return compactor.getThread();
}
public String getOutputFile() {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
index 2645962887..667c486f6d 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/compaction/FileCompactor.java
@@ -79,6 +79,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Collections2;
import io.opentelemetry.api.trace.Span;
@@ -141,13 +142,48 @@ public class FileCompactor implements
Callable<CompactionStats> {
// a unique id to identify a compactor
private final long compactorID = nextCompactorID.getAndIncrement();
- protected volatile Thread thread;
+ private volatile Thread thread;
private final ServerContext context;
private final AtomicBoolean interruptFlag = new AtomicBoolean(false);
- public void interrupt() {
+ public synchronized void interrupt() {
interruptFlag.set(true);
+
+ if (thread != null) {
+ // Never want to interrupt the thread after clearThread was called as
the thread could have
+ // moved on to something completely different than the compaction. This
method and clearThread
+ // being synchronized and clearThread setting thread to null prevent
this.
+ thread.interrupt();
+ }
+ }
+
+ private class ThreadClearer implements AutoCloseable {
+ @Override
+ public void close() throws InterruptedException {
+ clearThread();
+ }
+ }
+
+ private synchronized ThreadClearer setThread() {
+ thread = Thread.currentThread();
+ return new ThreadClearer();
+ }
+
+ private synchronized void clearThread() throws InterruptedException {
+ Preconditions.checkState(thread == Thread.currentThread());
+ thread = null;
+ // If the thread was interrupted during compaction do not want to allow
the thread to continue
+ // w/ the interrupt status set as this could impact code unrelated to the
compaction. For
+ // internal compactions the thread will execute metadata update code after
the compaction and
+ // would not want the interrupt status set for that.
+ if (Thread.interrupted()) {
+ throw new InterruptedException();
+ }
+ }
+
+ Thread getThread() {
+ return thread;
}
public long getCompactorID() {
@@ -272,7 +308,8 @@ public class FileCompactor implements
Callable<CompactionStats> {
}
@Override
- public CompactionStats call() throws IOException,
CompactionCanceledException {
+ public CompactionStats call()
+ throws IOException, CompactionCanceledException, InterruptedException {
FileSKVWriter mfw = null;
@@ -290,8 +327,9 @@ public class FileCompactor implements
Callable<CompactionStats> {
String newThreadName =
"MajC compacting " + extent + " started " + threadStartDate + " file:
" + outputFile;
Thread.currentThread().setName(newThreadName);
- thread = Thread.currentThread();
- try {
+ // Use try w/ resources for clearing the thread instead of finally because
clearing may throw an
+ // exception. Java's handling of exceptions thrown in finally blocks is
not good.
+ try (var ignored = setThread()) {
FileOperations fileFactory = FileOperations.getInstance();
FileSystem ns = this.fs.getFileSystemByPath(outputFile.getPath());
@@ -374,7 +412,6 @@ public class FileCompactor implements
Callable<CompactionStats> {
} finally {
Thread.currentThread().setName(oldThreadName);
if (remove) {
- thread = null;
runningCompactions.remove(this);
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
index 1a670c81d1..56a2006456 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactableUtils.java
@@ -560,7 +560,7 @@ public class CompactableUtils {
static CompactionStats compact(Tablet tablet, CompactionJob job,
CompactableImpl.CompactionInfo cInfo, CompactionEnv cenv,
Map<StoredTabletFile,DataFileValue> compactFiles, TabletFile tmpFileName)
- throws IOException, CompactionCanceledException {
+ throws IOException, CompactionCanceledException, InterruptedException {
TableConfiguration tableConf = tablet.getTableConfiguration();
AccumuloConfiguration compactionConfig = getCompactionConfig(tableConf,
@@ -576,7 +576,7 @@ public class CompactableUtils {
}
};
final ScheduledFuture<?> future =
tablet.getContext().getScheduledExecutor()
- .scheduleWithFixedDelay(compactionCancellerTask, 10, 10,
TimeUnit.SECONDS);
+ .scheduleWithFixedDelay(compactionCancellerTask, 3, 3,
TimeUnit.SECONDS);
try {
return compactor.call();
} finally {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
index facc5024b3..71258af11d 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/MinorCompactor.java
@@ -137,7 +137,7 @@ public class MinorCompactor extends FileCompactor {
log.warn("MinC failed ({}) to create {} retrying ...",
e.getMessage(), outputFileName, e);
reportedProblem = true;
retryCounter++;
- } catch (CompactionCanceledException e) {
+ } catch (CompactionCanceledException | InterruptedException e) {
throw new IllegalStateException(e);
}
@@ -161,7 +161,6 @@ public class MinorCompactor extends FileCompactor {
} while (true);
} finally {
- thread = null;
runningCompactions.remove(this);
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index 56e57bac33..fe79b5e8ef 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -37,11 +37,15 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NoSuchElementException;
import java.util.Objects;
+import java.util.SortedSet;
+import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
@@ -86,11 +90,14 @@ import
org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
import org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.harness.AccumuloClusterHarness;
+import org.apache.accumulo.minicluster.ServerType;
+import org.apache.accumulo.miniclusterImpl.MiniAccumuloClusterImpl;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.accumulo.test.compaction.CompactionExecutorIT;
import org.apache.accumulo.test.compaction.ExternalCompaction_1_IT.FSelector;
+import org.apache.accumulo.test.util.Wait;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -797,6 +804,108 @@ public class CompactionIT extends AccumuloClusterHarness {
}
}
+ @Test
+ public void testMigrationCancelCompaction() throws Exception {
+
+ // This test creates 40 tablets w/ slow iterator, causes 40 compactions to
start, and then
+ // starts a new tablet server. Some of the tablets should migrate to the
new tserver and cancel
+ // their compaction. Because the test uses a slow iterator, if close
blocks on compaction then
+ // the test should timeout. Two tables are used to have different iterator
settings inorder to
+ // test the two different way compactions can be canceled. Compactions can
be canceled by thread
+ // interrupt or by a check that is done after a compaction iterator
returns a key value.
+
+ final String[] tables = this.getUniqueNames(2);
+ try (AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+ client.instanceOperations().setProperty(
+ Property.TSERV_COMPACTION_SERVICE_DEFAULT_EXECUTORS.getKey(),
+ "[{'name':'any','numThreads':20}]".replaceAll("'", "\""));
+
+ SortedSet<Text> splits = IntStream.range(1, 20).mapToObj(i ->
String.format("%06d", i * 1000))
+ .map(Text::new).collect(Collectors.toCollection(TreeSet::new));
+
+ // This iterator is intended to cover the case of a compaction being
canceled by thread
+ // interrupt.
+ IteratorSetting setting1 = new IteratorSetting(50, "sleepy",
SlowIterator.class);
+ setting1.addOption("sleepTime", "300000");
+ setting1.addOption("seekSleepTime", "3000");
+ SlowIterator.sleepUninterruptibly(setting1, false);
+
+ client.tableOperations().create(tables[0], new
NewTableConfiguration().withSplits(splits)
+ .attachIterator(setting1, EnumSet.of(IteratorScope.majc)));
+
+ // This iterator is intended to cover the case of compaction being
canceled by the check after
+ // a key value is returned. The iterator is configured to ignore
interrupts.
+ IteratorSetting setting2 = new IteratorSetting(50, "sleepy",
SlowIterator.class);
+ setting2.addOption("sleepTime", "2000");
+ setting2.addOption("seekSleepTime", "2000");
+ SlowIterator.sleepUninterruptibly(setting2, true);
+
+ client.tableOperations().create(tables[1], new
NewTableConfiguration().withSplits(splits)
+ .attachIterator(setting2, EnumSet.of(IteratorScope.majc)));
+
+ // write files to each tablet, should cause compactions to start
+ for (var table : tables) {
+ for (int round = 0; round < 5; round++) {
+ try (var writer = client.createBatchWriter(table)) {
+ for (int i = 0; i < 20_000; i++) {
+ Mutation m = new Mutation(String.format("%06d", i));
+ m.put("f", "q", "v");
+ writer.addMutation(m);
+ }
+ }
+ client.tableOperations().flush(table, null, null, true);
+ }
+ }
+
+ assertEquals(2, client.instanceOperations().getTabletServers().size());
+
+ var ctx = (ClientContext) client;
+ var tableId1 = ctx.getTableId(tables[0]);
+ var tableId2 = ctx.getTableId(tables[1]);
+
+ Wait.waitFor(() -> {
+ var runningCompactions =
client.instanceOperations().getActiveCompactions().stream()
+ .map(ac -> ac.getTablet().getTable())
+ .filter(tid -> tid.equals(tableId1) ||
tid.equals(tableId2)).count();
+ log.debug("Running compactions {}", runningCompactions);
+ return runningCompactions == 40;
+ });
+
+ ((MiniAccumuloClusterImpl) getCluster()).getConfig().setNumTservers(3);
+ getCluster().getClusterControl().start(ServerType.TABLET_SERVER,
"localhost");
+
+ Wait.waitFor(() -> {
+ var servers = client.instanceOperations().getTabletServers().size();
+ log.debug("Server count {}", servers);
+ return 3 == servers;
+ });
+
+ Wait.waitFor(() -> {
+ try (var tablets =
ctx.getAmple().readTablets().forLevel(Ample.DataLevel.USER)
+ .fetch(ColumnType.LOCATION, ColumnType.PREV_ROW).build()) {
+ Map<String,Long> counts = new HashMap<>();
+ for (var tablet : tablets) {
+ if (!tablet.getTableId().equals(tableId1) &&
!tablet.getTableId().equals(tableId2)) {
+ continue;
+ }
+
+ if (tablet.getLocation() != null
+ && tablet.getLocation().getType() ==
TabletMetadata.LocationType.CURRENT) {
+ counts.merge(tablet.getLocation().getHostPort(), 1L, Long::sum);
+ }
+ }
+
+ var total = counts.values().stream().mapToLong(l -> l).sum();
+ var min = counts.values().stream().mapToLong(l -> l).min().orElse(0);
+ var max = counts.values().stream().mapToLong(l ->
l).max().orElse(100);
+ var serversSeen = counts.keySet();
+ log.debug("total:{} min:{} max:{} serversSeen:{}", total, min, max,
serversSeen);
+ return total == 40 && min == 12 && max == 14 && serversSeen.size()
== 3;
+ }
+ });
+ }
+ }
+
/**
* Counts the number of tablets and files in a table.
*/
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
index 714ace03bf..d9c8ddca12 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SlowIterator.java
@@ -18,8 +18,6 @@
*/
package org.apache.accumulo.test.functional;
-import static
org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly;
-
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
@@ -33,14 +31,17 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.IteratorEnvironment;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iterators.WrappingIterator;
+import org.apache.accumulo.core.util.UtilWaitThread;
public class SlowIterator extends WrappingIterator {
private static final String SLEEP_TIME = "sleepTime";
private static final String SEEK_SLEEP_TIME = "seekSleepTime";
+ private static final String SLEEP_UNINTERRUPTIBLY = "sleepUninterruptibly";
private long sleepTime = 0;
private long seekSleepTime = 0;
+ private boolean sleepUninterruptibly = true;
public static void setSleepTime(IteratorSetting is, long millis) {
is.addOption(SLEEP_TIME, Long.toString(millis));
@@ -50,6 +51,22 @@ public class SlowIterator extends WrappingIterator {
is.addOption(SEEK_SLEEP_TIME, Long.toString(t));
}
+ public static void sleepUninterruptibly(IteratorSetting is, boolean b) {
+ is.addOption(SLEEP_UNINTERRUPTIBLY, Boolean.toString(b));
+ }
+
+ private void sleep(long time) throws IOException {
+ if (sleepUninterruptibly) {
+ UtilWaitThread.sleepUninterruptibly(time, TimeUnit.MILLISECONDS);
+ } else {
+ try {
+ Thread.sleep(sleepTime);
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+ }
+
@Override
public SortedKeyValueIterator<Key,Value> deepCopy(IteratorEnvironment env) {
throw new UnsupportedOperationException();
@@ -57,14 +74,14 @@ public class SlowIterator extends WrappingIterator {
@Override
public void next() throws IOException {
- sleepUninterruptibly(sleepTime, TimeUnit.MILLISECONDS);
+ sleep(sleepTime);
super.next();
}
@Override
public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
throws IOException {
- sleepUninterruptibly(seekSleepTime, TimeUnit.MILLISECONDS);
+ sleep(seekSleepTime);
super.seek(range, columnFamilies, inclusive);
}
@@ -79,6 +96,10 @@ public class SlowIterator extends WrappingIterator {
if (options.containsKey(SEEK_SLEEP_TIME)) {
seekSleepTime = Long.parseLong(options.get(SEEK_SLEEP_TIME));
}
+
+ if (options.containsKey(SLEEP_UNINTERRUPTIBLY)) {
+ sleepUninterruptibly =
Boolean.parseBoolean(options.get(SLEEP_UNINTERRUPTIBLY));
+ }
}
}