This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 8cd904138a fixes race condition w/ split,compaction relative to
offline (#4629)
8cd904138a is described below
commit 8cd904138a85197a03dc71542c6a0410400c8505
Author: Keith Turner <[email protected]>
AuthorDate: Mon Jun 3 16:09:51 2024 -0400
fixes race condition w/ split,compaction relative to offline (#4629)
These commit wraps up #3412 and includes the following changes
* compactors now cancel running compactions when the table state is no
longer online, this avoids doing work that will never be used
* coordinator will no longer start commiting compactions when the tablet
is offline, this avoid race condition w/ offline+wait of table
* fixed bug found in tablet refresher where it was not properly handling
concurrent tablet unloads (found by testing concurrent compaction and offline)
* compaction fate operation that drives table compaction will now fail
when table is offline (this change may be needed in older versions)
* reordered when the split fate operation checks for table offline to
avoid race condition with offline+wait of table
* added multiple ITs to test running split and compaction concurrently
with offline table operation
---
.../org/apache/accumulo/compactor/Compactor.java | 8 ++
.../coordinator/CompactionCoordinator.java | 14 +++
.../manager/tableOps/bulkVer2/TabletRefresher.java | 49 ++++++++-
.../manager/tableOps/compact/CompactionDriver.java | 6 ++
.../split/AllocateDirsAndEnsureOnline.java | 110 +++++++++++++++++++++
.../accumulo/manager/tableOps/split/PreSplit.java | 29 +-----
.../accumulo/manager/tableOps/split/SplitInfo.java | 2 +-
.../apache/accumulo/test/fate/ManagerRepoIT.java | 52 ++++++++++
.../accumulo/test/functional/CompactionIT.java | 94 +++++++++++++++++-
.../apache/accumulo/test/functional/SplitIT.java | 52 +++++++++-
10 files changed, 379 insertions(+), 37 deletions(-)
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 4c5357e35a..d05324da53 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -75,6 +75,7 @@ import org.apache.accumulo.core.lock.ServiceLockData;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptor;
import org.apache.accumulo.core.lock.ServiceLockData.ServiceDescriptors;
import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
+import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
@@ -204,6 +205,13 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
return;
}
+ var tableState = getContext().getTableState(extent.tableId());
+ if (tableState != TableState.ONLINE) {
+ LOG.info("Cancelling compaction {} because table state is {}", ecid,
tableState);
+ JOB_HOLDER.cancel(job.getExternalCompactionId());
+ return;
+ }
+
if (job.getKind() == TCompactionKind.USER) {
var cconf =
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index 7642003985..f9872c3687 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -740,6 +740,20 @@ public class CompactionCoordinator
var tabletMeta =
ctx.getAmple().readTablet(extent, ECOMP, SELECTED, LOCATION, FILES,
COMPACTED, OPID);
+ var tableState = manager.getContext().getTableState(extent.tableId());
+ if (tableState != TableState.ONLINE) {
+ // Its important this check is done after the compaction id is set in
the metadata table to
+ // avoid race conditions with the client code that waits for tables to
go offline. That code
+ // looks for compaction ids in the metadata table after setting the
table state. When that
+ // client code sees nothing for a tablet its important that nothing will
changes the tablets
+ // files after that point in time which this check ensure.
+ LOG.debug("Not committing compaction {} for {} because of table state
{}", ecid, extent,
+ tableState);
+ // cleanup metadata table and files related to the compaction
+ compactionsFailed(Map.of(ecid, extent));
+ return;
+ }
+
if (!CommitCompaction.canCommitCompaction(ecid, tabletMeta)) {
return;
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
index cb963e583a..8bff563391 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/TabletRefresher.java
@@ -23,9 +23,11 @@ import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toList;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -35,6 +37,7 @@ import java.util.function.Supplier;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
+import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.metadata.TServerInstance;
@@ -106,8 +109,8 @@ public class TabletRefresher {
// Ask tablet server to reload the metadata for these tablets. The
tablet server returns
// the list of extents it was hosting but was unable to refresh (the
tablets could be in
- // the process of loading). If it is not currently hosting the tablet
it treats that as
- // refreshed and does not return anything for it.
+ // the process of loading). If it is not currently hosting the tablet
it does not return
+ // anything for it.
Future<List<TKeyExtent>> future = threadPool
.submit(() -> sendSyncRefreshRequest(context, logId,
entry.getKey(), entry.getValue()));
@@ -141,6 +144,48 @@ public class TabletRefresher {
.removeIf(location ->
!liveTservers.contains(location.getServerInstance()));
}
+ if (!refreshesNeeded.isEmpty()) {
+ // look for any tablets where the location changed, these tablets will
no longer need a
+ // refresh because when the tablet loads at the new location it will
see the new tablet
+ // metadata
+ HashMap<KeyExtent,TabletMetadata.Location> prevLocations = new
HashMap<>();
+ refreshesNeeded.forEach((loc, extents) -> {
+ for (TKeyExtent te : extents) {
+ var extent = KeyExtent.fromThrift(te);
+ prevLocations.put(extent, loc);
+ }
+ });
+
+ // Build a map of tablets that exist and their current location. No
need to includes tablets
+ // that no longer exists or do not have a location as later logic is
ok w/ these being null.
+ HashMap<KeyExtent,TabletMetadata.Location> currLocations = new
HashMap<>();
+ try (var tablets =
+
context.getAmple().readTablets().forTablets(prevLocations.keySet(),
Optional.empty())
+ .fetch(ColumnType.LOCATION).build()) {
+ tablets.forEach(tablet -> {
+ if (tablet.getLocation() != null) {
+ currLocations.put(tablet.getExtent(), tablet.getLocation());
+ }
+ });
+ }
+
+ refreshesNeeded.clear();
+
+ var finalrefreshesNeeded = refreshesNeeded;
+ // rebuild refreshesNeeded only including those where the location is
still the same
+ prevLocations.forEach((extent, prevLoc) -> {
+ var currLoc = currLocations.get(extent);
+ // currLoc may be null and this is ok because it should not be equal
then
+ if (prevLoc.equals(currLoc)) {
+ finalrefreshesNeeded.computeIfAbsent(currLoc, k -> new
ArrayList<>())
+ .add(extent.toThrift());
+ } else {
+ log.trace("The location of {} changed from {} to {}, so refresh no
longer needed",
+ extent, prevLoc, currLoc);
+ }
+ });
+ }
+
if (!refreshesNeeded.isEmpty()) {
try {
retry.waitForNextAttempt(log, logId + " waiting for " +
refreshesNeeded.size()
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
index d1ace6c816..74fd66b49d 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/compact/CompactionDriver.java
@@ -46,6 +46,7 @@ import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.logging.TabletLogger;
+import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.AbstractTabletFile;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -113,6 +114,11 @@ class CompactionDriver extends ManagerRepo {
TableOperationsImpl.TABLE_DELETED_MSG);
}
+ if (manager.getContext().getTableState(tableId) != TableState.ONLINE) {
+ throw new AcceptableThriftTableOperationException(tableId.canonical(),
null,
+ TableOperation.COMPACT, TableOperationExceptionType.OFFLINE, "The
table is not online.");
+ }
+
long t1 = System.currentTimeMillis();
int tabletsToWaitFor = updateAndCheckTablets(manager, fateId);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java
new file mode 100644
index 0000000000..0b090cbcd2
--- /dev/null
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/AllocateDirsAndEnsureOnline.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.manager.tableOps.split;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
+import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
+import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
+import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.Repo;
+import org.apache.accumulo.core.manager.state.tables.TableState;
+import org.apache.accumulo.core.metadata.schema.Ample;
+import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.core.metadata.schema.TabletOperationId;
+import org.apache.accumulo.core.metadata.schema.TabletOperationType;
+import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
+import org.apache.accumulo.server.tablets.TabletNameGenerator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AllocateDirsAndEnsureOnline extends ManagerRepo {
+
+ private static final long serialVersionUID = 1L;
+ private static final Logger log = LoggerFactory.getLogger(PreSplit.class);
+
+ private final SplitInfo splitInfo;
+
+ public AllocateDirsAndEnsureOnline(SplitInfo splitInfo) {
+ this.splitInfo = splitInfo;
+ }
+
+ @Override
+ public Repo<Manager> call(FateId fateId, Manager manager) throws Exception {
+ // This check of table state is done after setting the operation id to
avoid a race condition
+ // with the client code that waits for a table to go offline. That client
code sets the table
+ // state and then scans the metadata table looking for split operations
ids. If split checks
+ // tables state before setting the opid then there is race condition with
the client. Setting it
+ // after ensures that in the case when the client does not see any split
op id in the metadata
+ // table that it knows that any splits starting after that point in time
will not complete. This
+ // order is needed because the split fate operation does not acquire a
table lock in zookeeper.
+ if (manager.getContext().getTableState(splitInfo.getOriginal().tableId())
+ != TableState.ONLINE) {
+
+ var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId);
+
+ // attempt to delete the operation id
+ try (var tabletsMutator =
manager.getContext().getAmple().conditionallyMutateTablets()) {
+
+ Ample.RejectionHandler rejectionHandler = new Ample.RejectionHandler()
{
+
+ @Override
+ public boolean callWhenTabletDoesNotExists() {
+ return true;
+ }
+
+ @Override
+ public boolean test(TabletMetadata tabletMetadata) {
+ // if the tablet no longer exists or our operation id is not set
then consider a success
+ return tabletMetadata == null ||
!opid.equals(tabletMetadata.getOperationId());
+ }
+ };
+
+
tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireOperation(opid)
+
.requireAbsentLocation().requireAbsentLogs().deleteOperation().submit(rejectionHandler);
+
+ var result = tabletsMutator.process().get(splitInfo.getOriginal());
+
+ if (result.getStatus() != Ample.ConditionalResult.Status.ACCEPTED) {
+ throw new IllegalStateException(
+ "Failed to delete operation id " + splitInfo.getOriginal());
+ }
+ }
+
+ throw new AcceptableThriftTableOperationException(
+ splitInfo.getOriginal().tableId().canonical(), null,
TableOperation.SPLIT,
+ TableOperationExceptionType.OFFLINE,
+ "Unable to split tablet because the table is offline");
+ } else {
+ // Create the dir name here for the next step. If the next step fails it
will always have the
+ // same dir name each time it runs again making it idempotent.
+ List<String> dirs = new ArrayList<>();
+
+ splitInfo.getSplits().forEach(split -> {
+ String dirName =
TabletNameGenerator.createTabletDirectoryName(manager.getContext(), split);
+ dirs.add(dirName);
+ log.trace("{} allocated dir name {}", fateId, dirName);
+ });
+ return new UpdateTablets(splitInfo, dirs);
+ }
+ }
+}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
index 906e953f45..6d89878f95 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/PreSplit.java
@@ -23,20 +23,14 @@ import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.OPID;
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.PREV_ROW;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.SortedSet;
-import
org.apache.accumulo.core.clientImpl.AcceptableThriftTableOperationException;
-import org.apache.accumulo.core.clientImpl.thrift.TableOperation;
-import org.apache.accumulo.core.clientImpl.thrift.TableOperationExceptionType;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.Repo;
-import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
@@ -44,7 +38,6 @@ import
org.apache.accumulo.core.metadata.schema.TabletOperationId;
import org.apache.accumulo.core.metadata.schema.TabletOperationType;
import org.apache.accumulo.manager.Manager;
import org.apache.accumulo.manager.tableOps.ManagerRepo;
-import org.apache.accumulo.server.tablets.TabletNameGenerator;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,15 +82,6 @@ public class PreSplit extends ManagerRepo {
return 1000;
}
} else {
- // do not set the operation id on the tablet if the table is offline
- if (manager.getContext().getTableState(splitInfo.getOriginal().tableId())
- != TableState.ONLINE) {
- throw new AcceptableThriftTableOperationException(
- splitInfo.getOriginal().tableId().canonical(), null,
TableOperation.SPLIT,
- TableOperationExceptionType.OFFLINE,
- "Unable to split tablet because the table is offline");
- }
-
try (var tabletsMutator =
manager.getContext().getAmple().conditionallyMutateTablets()) {
tabletsMutator.mutateTablet(splitInfo.getOriginal()).requireAbsentOperation()
@@ -155,18 +139,7 @@ public class PreSplit extends ManagerRepo {
"Tablet unexpectedly had walogs %s %s %s", fateId,
tabletMetadata.getLogs(),
tabletMetadata.getExtent());
- // Create the dir name here for the next step. If the next step fails it
will always have the
- // same dir name each time it runs again making it idempotent.
-
- List<String> dirs = new ArrayList<>();
-
- splitInfo.getSplits().forEach(split -> {
- String dirName =
TabletNameGenerator.createTabletDirectoryName(manager.getContext(), split);
- dirs.add(dirName);
- log.trace("{} allocated dir name {}", fateId, dirName);
- });
-
- return new UpdateTablets(splitInfo, dirs);
+ return new AllocateDirsAndEnsureOnline(splitInfo);
}
@Override
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
index b8f8c7adff..7d97e6a34e 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/split/SplitInfo.java
@@ -37,7 +37,7 @@ public class SplitInfo implements Serializable {
private final byte[] endRow;
private final byte[][] splits;
- SplitInfo(KeyExtent extent, SortedSet<Text> splits) {
+ public SplitInfo(KeyExtent extent, SortedSet<Text> splits) {
this.tableId = extent.tableId();
this.prevEndRow = extent.prevEndRow() == null ? null :
TextUtil.getBytes(extent.prevEndRow());
this.endRow = extent.endRow() == null ? null :
TextUtil.getBytes(extent.endRow());
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
index 978fb3c491..e49de2fa9f 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/ManagerRepoIT.java
@@ -28,13 +28,16 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.nio.file.Path;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
+import java.util.TreeSet;
import java.util.UUID;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.clientImpl.ClientContext;
+import
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
@@ -55,10 +58,13 @@ import org.apache.accumulo.manager.tableOps.merge.MergeInfo;
import org.apache.accumulo.manager.tableOps.merge.MergeInfo.Operation;
import org.apache.accumulo.manager.tableOps.merge.MergeTablets;
import org.apache.accumulo.manager.tableOps.merge.ReserveTablets;
+import org.apache.accumulo.manager.tableOps.split.AllocateDirsAndEnsureOnline;
import org.apache.accumulo.manager.tableOps.split.FindSplits;
import org.apache.accumulo.manager.tableOps.split.PreSplit;
+import org.apache.accumulo.manager.tableOps.split.SplitInfo;
import org.apache.accumulo.test.ample.metadata.TestAmple;
import org.apache.accumulo.test.ample.metadata.TestAmple.TestServerAmpleImpl;
+import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
@@ -134,6 +140,52 @@ public class ManagerRepoIT extends SharedMiniClusterBase {
}
}
+ @Test
+ public void testSplitOffline() throws Exception {
+ String[] tableNames = getUniqueNames(2);
+ String metadataTable = tableNames[0];
+ String userTable = tableNames[1];
+
+ // This test ensures a repo involved in splitting a tablet handles an
offline table correctly
+
+ try (ClientContext client =
+ (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+ TestAmple.createMetadataTable(client, metadataTable);
+
+ // create a new table that is initially offline
+ client.tableOperations().create(userTable, new
NewTableConfiguration().createOffline());
+
+ TableId tableId =
TableId.of(client.tableOperations().tableIdMap().get(userTable));
+
+ TestServerAmpleImpl testAmple = (TestServerAmpleImpl) TestAmple
+ .create(getCluster().getServerContext(), Map.of(DataLevel.USER,
metadataTable));
+
+ testAmple.createMetadataFromExisting(client, tableId,
+ not(SplitColumnFamily.UNSPLITTABLE_COLUMN));
+
+ var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+ KeyExtent extent = new KeyExtent(tableId, null, null);
+
+ // manually set an operation id on the tablet
+ var opid = TabletOperationId.from(TabletOperationType.SPLITTING, fateId);
+ testAmple.mutateTablet(extent)
+ .putOperation(TabletOperationId.from(TabletOperationType.SPLITTING,
fateId)).mutate();
+
+ Manager manager = mockWithAmple(getCluster().getServerContext(),
testAmple);
+
+ assertEquals(opid, testAmple.readTablet(extent).getOperationId());
+
+ var eoRepo = new AllocateDirsAndEnsureOnline(
+ new SplitInfo(extent, new TreeSet<>(List.of(new Text("sp1")))));
+
+ // The repo should delete the opid and throw an exception
+ assertThrows(ThriftTableOperationException.class, () ->
eoRepo.call(fateId, manager));
+
+ // the operation id should have been cleaned up before the exception was
thrown
+ assertNull(testAmple.readTablet(extent).getOperationId());
+ }
+ }
+
@Test
public void testFindSplitsUnsplittable() throws Exception {
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
index bd26f0a469..e110786f15 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/CompactionIT.java
@@ -34,6 +34,7 @@ import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.attribute.BasicFileAttributes;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
@@ -43,9 +44,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
+import java.util.SortedSet;
import java.util.TreeSet;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
@@ -96,6 +100,7 @@ import
org.apache.accumulo.core.util.compaction.ExternalCompactionUtil;
import org.apache.accumulo.harness.AccumuloClusterHarness;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.VerifyIngest;
import org.apache.accumulo.test.VerifyIngest.VerifyParams;
import org.apache.accumulo.test.compaction.CompactionExecutorIT.TestPlanner;
@@ -1178,7 +1183,94 @@ public class CompactionIT extends AccumuloClusterHarness
{
ExternalCompactionTestUtils.assertNoCompactionMetadata(getServerContext(),
table1);
}
- private void writeRows(ClientContext client, String tableName, int rows,
boolean wait)
+ @Test
+ public void testOfflineAndCompactions() throws Exception {
+ var uniqueNames = getUniqueNames(1);
+ String table = uniqueNames[0];
+
+ // This test exercises concurrent compactions and table offline.
+
+ try (final AccumuloClient client =
Accumulo.newClient().from(getClientProps()).build()) {
+
+ SortedSet<Text> splits = new TreeSet<>();
+ for (int i = 1; i < 32; i++) {
+ splits.add(new Text(String.format("r:%04d", i)));
+ }
+
+ client.tableOperations().create(table, new
NewTableConfiguration().withSplits(splits));
+ writeRows(client, table, 33, true);
+ // create two files per tablet
+ writeRows(client, table, 33, true);
+
+ var ctx = getCluster().getServerContext();
+ var tableId = ctx.getTableId(table);
+
+ // verify assumptions of test, expect all tablets to have files
+ var files0 = getFiles(ctx, tableId);
+ assertEquals(32, files0.size());
+ assertFalse(files0.values().stream().anyMatch(Set::isEmpty));
+
+ // lower the tables compaction ratio to cause system compactions
+ client.tableOperations().setProperty(table,
Property.TABLE_MAJC_RATIO.getKey(), "1");
+
+ // start a bunch of compactions in the background
+ var executor = Executors.newCachedThreadPool();
+ List<Future<?>> futures = new ArrayList<>();
+ // start user compactions on a subset of the tables tablets, system
compactions should attempt
+ // to run on all tablets. With concurrency should get a mix.
+ for (int i = 1; i < 20; i++) {
+ var startRow = new Text(String.format("r:%04d", i - 1));
+ var endRow = new Text(String.format("r:%04d", i));
+ futures.add(executor.submit(() -> {
+ CompactionConfig config = new CompactionConfig();
+ config.setWait(true);
+ config.setStartRow(startRow);
+ config.setEndRow(endRow);
+ client.tableOperations().compact(table, config);
+ return null;
+ }));
+ }
+
+ log.debug("Waiting for offline");
+ // take tablet offline while there are concurrent compactions
+ client.tableOperations().offline(table, true);
+
+ // grab a snapshot of all the tablets files after waiting for offline,
do not expect any
+ // tablets files to change at this point
+ var files1 = getFiles(ctx, tableId);
+
+ // wait for the background compactions
+ log.debug("Waiting for futures");
+ for (var future : futures) {
+ try {
+ future.get();
+ } catch (ExecutionException ee) {
+ // its ok if some of the compactions fail because the table was
concurrently taken offline
+ assertTrue(ee.getMessage().contains("is offline"));
+ }
+ }
+
+ // grab a second snapshot of the tablets files after all the background
operations completed
+ var files2 = getFiles(ctx, tableId);
+
+ // do not expect the files to have changed after the offline operation
returned.
+ assertEquals(files1, files2);
+
+ executor.shutdown();
+ }
+ }
+
+ private Map<KeyExtent,Set<StoredTabletFile>> getFiles(ServerContext ctx,
TableId tableId) {
+ Map<KeyExtent,Set<StoredTabletFile>> files = new HashMap<>();
+ try (var tablets = ctx.getAmple().readTablets().forTable(tableId).build())
{
+ for (var tablet : tablets) {
+ files.put(tablet.getExtent(), tablet.getFiles());
+ }
+ }
+ return files;
+ }
+
+ private void writeRows(AccumuloClient client, String tableName, int rows,
boolean wait)
throws Exception {
try (BatchWriter bw = client.createBatchWriter(tableName)) {
for (int i = 0; i < rows; i++) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index 1985b96b32..f444eababe 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -25,6 +25,7 @@ import static
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSec
import static
org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType.FILES;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assumptions.assumeTrue;
@@ -42,6 +43,8 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -345,6 +348,15 @@ public class SplitIT extends AccumuloClusterHarness {
@Test
public void concurrentSplit() throws Exception {
+ concurrentSplit(false);
+ }
+
+ @Test
+ public void concurrentSplitAndOffline() throws Exception {
+ concurrentSplit(true);
+ }
+
+ public void concurrentSplit(boolean offlineTable) throws Exception {
try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
final String tableName = getUniqueNames(1)[0];
@@ -364,15 +376,15 @@ public class SplitIT extends AccumuloClusterHarness {
ExecutorService es = Executors.newFixedThreadPool(10);
final int totalFutures = 100;
final int splitsPerFuture = 4;
- final Set<Text> totalSplits = new HashSet<>();
+ final Set<Text> totalSplits = new ConcurrentSkipListSet<>();
List<Callable<Void>> tasks = new ArrayList<>(totalFutures);
for (int i = 0; i < totalFutures; i++) {
final Pair<Integer,Integer> splitBounds =
getRandomSplitBounds(numRows);
final TreeSet<Text> splits =
TestIngest.getSplitPoints(splitBounds.getFirst().longValue(),
splitBounds.getSecond().longValue(), splitsPerFuture);
- totalSplits.addAll(splits);
tasks.add(() -> {
c.tableOperations().addSplits(tableName, splits);
+ totalSplits.addAll(splits);
return null;
});
}
@@ -381,19 +393,49 @@ public class SplitIT extends AccumuloClusterHarness {
List<Future<Void>> futures =
tasks.parallelStream().map(es::submit).collect(Collectors.toList());
+ Set<Text> splitsAfterOffline = null;
+ if (offlineTable) {
+ // run offline concurrently with split operation
+ c.tableOperations().offline(tableName, true);
+ splitsAfterOffline =
Set.copyOf(c.tableOperations().listSplits(tableName));
+ }
+
log.debug("Waiting for futures to complete");
for (Future<?> f : futures) {
- f.get();
+ try {
+ f.get();
+ } catch (ExecutionException ee) {
+ if (offlineTable && ee.getMessage().contains("is offline")) {
+ // Some exceptions are expected when concurrently taking the table
offline.
+ log.debug(ee.getMessage());
+ } else {
+ throw ee;
+ }
+ }
}
- es.shutdown();
- log.debug("Checking that {} splits were created ", totalSplits.size());
+ if (offlineTable) {
+ // The splits seen immediately after offline() call should not change
after all the futures
+ // complete. This ensures that nothing changes in the tablet after the
offline+wait call
+ // returns.
+ assertEquals(splitsAfterOffline, new
HashSet<>(c.tableOperations().listSplits(tableName)),
+ "Splits changed after offline");
+
+ // table will be scanned for verification, so bring it online
+ c.tableOperations().online(tableName);
+ } else {
+ assertFalse(totalSplits.isEmpty());
+ }
+ log.debug("Checking that {} splits were created ", totalSplits.size());
assertEquals(totalSplits, new
HashSet<>(c.tableOperations().listSplits(tableName)),
"Did not see expected splits");
log.debug("Verifying {} rows ingested into {}", numRows, tableName);
VerifyIngest.verifyIngest(c, params);
+
+ es.shutdown();
+
}
}