This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new bfaa80f0c1 Single conditional writer for UserFateStore (#5670)
bfaa80f0c1 is described below
commit bfaa80f0c1b19e14f9a942918c32caf2f42711d2
Author: Kevin Rathbun <[email protected]>
AuthorDate: Tue Jun 24 16:35:46 2025 -0400
Single conditional writer for UserFateStore (#5670)
* Single conditional writer for UserFateStore
Avoids creating multiple conditional writers for UserFateStore.
* FateStore now extends AutoCloseable
* MetaFateStore.close() is no-op
* UserFateStore.close() closes the conditional writer
* All tests using a FateStore now call close()
* The real fate stores are closed on the shutdown of fate
* UserFateStore now only creates a single conditional writer
* Num threads for this conditional writer set by a new prop
MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX
* writer in UserFateStore is a memoized Supplier so it's not
created when using a ReadOnlyFateStore.
closes #5660
---
.../org/apache/accumulo/core/conf/Property.java | 4 +
.../java/org/apache/accumulo/core/fate/Fate.java | 3 +
.../org/apache/accumulo/core/fate/FateStore.java | 4 +-
.../accumulo/core/fate/user/FateMutatorImpl.java | 12 +-
.../accumulo/core/fate/user/UserFateStore.java | 33 +-
.../core/fate/zookeeper/MetaFateStore.java | 5 +
.../apache/accumulo/core/logging/FateLogger.java | 5 +
.../org/apache/accumulo/core/fate/TestStore.java | 5 +
.../org/apache/accumulo/server/util/Admin.java | 63 +++-
.../util/checkCommand/TableLocksCheckRunner.java | 77 ++--
.../test/compaction/ExternalCompaction_1_IT.java | 28 +-
.../accumulo/test/fate/MultipleStoresITBase.java | 405 +++++++++++----------
.../meta/MetaFateExecutionOrderIT_SimpleSuite.java | 13 +-
.../apache/accumulo/test/fate/meta/MetaFateIT.java | 7 +-
.../test/fate/meta/MetaFateOpsCommandsIT.java | 8 +-
.../test/fate/meta/MetaFatePoolsWatcherIT.java | 7 +-
.../fate/meta/MetaFateStatusEnforcementIT.java | 6 +
.../test/fate/meta/MetaFateStoreFateIT.java | 14 +-
.../fate/user/FateMutatorImplIT_SimpleSuite.java | 95 +++--
.../user/UserFateExecutionOrderIT_SimpleSuite.java | 9 +-
.../test/fate/user/UserFateIT_SimpleSuite.java | 9 +-
.../test/fate/user/UserFateOpsCommandsIT.java | 14 +-
.../user/UserFatePoolsWatcherIT_SimpleSuite.java | 9 +-
.../UserFateStatusEnforcementIT_SimpleSuite.java | 1 +
.../fate/user/UserFateStoreFateIT_SimpleSuite.java | 9 +-
.../test/functional/FateConcurrencyIT.java | 8 +-
.../test/functional/FunctionalTestUtils.java | 4 +-
27 files changed, 500 insertions(+), 357 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index f19798ccbb..988d61a059 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -447,6 +447,10 @@ public enum Property {
MANAGER_WAL_CLOSER_IMPLEMENTATION("manager.wal.closer.implementation",
"org.apache.accumulo.server.manager.recovery.HadoopLogCloser",
PropertyType.CLASSNAME,
"A class that implements a mechanism to steal write access to a
write-ahead log.", "2.1.0"),
+
MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX("manager.fate.conditional.writer.threads.max",
"3",
+ PropertyType.COUNT,
+ "Maximum number of threads to use for writing data to tablet servers of
the FATE system table.",
+ "4.0.0"),
MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL("manager.fate.metrics.min.update.interval",
"60s",
PropertyType.TIMEDURATION, "Limit calls from metric sinks to zookeeper
to update interval.",
"1.9.3"),
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 560c7da0b2..6a577333a1 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -532,6 +532,9 @@ public class Fate<T> {
if (deadResCleanerExecutor != null) {
deadResCleanerExecutor.shutdownNow();
}
+
+ // ensure store resources are cleaned up
+ store.close();
}
private boolean anyFateExecutorIsAlive() {
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index 3f5a8ec040..c73d5768bf 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -41,7 +41,7 @@ import org.apache.hadoop.io.DataInputBuffer;
* transaction's operation, possibly pushing more operations onto the
transaction as each step
* successfully completes. If a step fails, the stack can be unwound, undoing
each operation.
*/
-public interface FateStore<T> extends ReadOnlyFateStore<T> {
+public interface FateStore<T> extends ReadOnlyFateStore<T>, AutoCloseable {
/**
* Create a new fate transaction id
@@ -269,4 +269,6 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
*/
FateTxStore<T> reserve(FateId fateId);
+ @Override
+ void close();
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
index 264198cf93..4910e1e757 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
@@ -24,6 +24,7 @@ import static
org.apache.accumulo.core.fate.user.UserFateStore.getRowId;
import static org.apache.accumulo.core.fate.user.UserFateStore.invertRepo;
import java.util.Objects;
+import java.util.function.Supplier;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
@@ -58,14 +59,17 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
private final String tableName;
private final FateId fateId;
private final ConditionalMutation mutation;
+ private final Supplier<ConditionalWriter> writer;
private boolean requiredUnreserved = false;
public static final int INITIAL_ITERATOR_PRIO = 1000000;
- public FateMutatorImpl(ClientContext context, String tableName, FateId
fateId) {
+ public FateMutatorImpl(ClientContext context, String tableName, FateId
fateId,
+ Supplier<ConditionalWriter> writer) {
this.context = Objects.requireNonNull(context);
this.tableName = Objects.requireNonNull(tableName);
- this.fateId = fateId;
+ this.fateId = Objects.requireNonNull(fateId);
this.mutation = new ConditionalMutation(new Text(getRowId(fateId)));
+ this.writer = Objects.requireNonNull(writer);
}
@Override
@@ -237,8 +241,8 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
return Status.ACCEPTED;
} else {
- try (ConditionalWriter writer =
context.createConditionalWriter(tableName)) {
- ConditionalWriter.Result result = writer.write(mutation);
+ try {
+ ConditionalWriter.Result result = writer.get().write(mutation);
switch (result.getStatus()) {
case ACCEPTED:
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index d3d117c9fa..f38c50a2e9 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -41,9 +41,11 @@ import java.util.stream.Stream;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.ConditionalWriterConfig;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
@@ -73,6 +75,7 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
public class UserFateStore<T> extends AbstractFateStore<T> {
@@ -80,6 +83,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
private final ClientContext context;
private final String tableName;
+ private final Supplier<ConditionalWriter> writer;
private static final FateInstanceType fateInstanceType =
FateInstanceType.USER;
private static final com.google.common.collect.Range<Integer> REPO_RANGE =
@@ -108,6 +112,14 @@ public class UserFateStore<T> extends AbstractFateStore<T>
{
super(lockID, isLockHeld, maxDeferred, fateIdGenerator);
this.context = Objects.requireNonNull(context);
this.tableName = Objects.requireNonNull(tableName);
+ this.writer = Suppliers.memoize(() -> {
+ try {
+ return createConditionalWriterForFateTable(this.tableName);
+ } catch (TableNotFoundException e) {
+ throw new IllegalStateException(
+ "Incorrect use of UserFateStore, table " + tableName + " does not
exist.");
+ }
+ });
}
@Override
@@ -383,7 +395,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
}
private FateMutatorImpl<T> newMutator(FateId fateId) {
- return new FateMutatorImpl<>(context, tableName, fateId);
+ return new FateMutatorImpl<>(context, tableName, fateId, writer);
}
private <R> R scanTx(Function<Scanner,R> func) {
@@ -491,15 +503,15 @@ public class UserFateStore<T> extends
AbstractFateStore<T> {
}
final Map<FateId,ConditionalWriter.Status> resultsMap = new HashMap<>();
- try (ConditionalWriter writer =
context.createConditionalWriter(tableName)) {
- Iterator<ConditionalWriter.Result> results = writer
+ try {
+ Iterator<ConditionalWriter.Result> results = writer.get()
.write(pending.values().stream().map(pair ->
pair.getFirst().getMutation()).iterator());
while (results.hasNext()) {
var result = results.next();
var row = new Text(result.getMutation().getRow());
resultsMap.put(FateId.from(FateInstanceType.USER, row.toString()),
result.getStatus());
}
- } catch (AccumuloException | AccumuloSecurityException |
TableNotFoundException e) {
+ } catch (AccumuloException | AccumuloSecurityException e) {
throw new IllegalStateException(e);
}
return resultsMap;
@@ -689,4 +701,17 @@ public class UserFateStore<T> extends AbstractFateStore<T>
{
"Position %s is not in the valid range of [0,%s]", position,
MAX_REPOS);
return position;
}
+
+ private ConditionalWriter createConditionalWriterForFateTable(String
tableName)
+ throws TableNotFoundException {
+ int maxThreads =
+
context.getConfiguration().getCount(Property.MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX);
+ ConditionalWriterConfig cwConfig = new
ConditionalWriterConfig().setMaxWriteThreads(maxThreads);
+ return context.createConditionalWriter(tableName, cwConfig);
+ }
+
+ @Override
+ public void close() {
+ writer.get().close();
+ }
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
index 2d769bce6a..72ce79e1cc 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
@@ -653,6 +653,11 @@ public class MetaFateStore<T> extends AbstractFateStore<T>
{
}
}
+ @Override
+ public void close() {
+ // no-op
+ }
+
protected static class FateData<T> {
final TStatus status;
final Optional<FateKey> fateKey;
diff --git
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index 29ba43fa0a..331401fc6b 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -193,6 +193,11 @@ public class FateLogger {
public void deleteDeadReservations() {
store.deleteDeadReservations();
}
+
+ @Override
+ public void close() {
+ store.close();
+ }
};
}
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 8e881fddad..e4d057fc10 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -291,4 +291,9 @@ public class TestStore implements FateStore<String> {
public boolean isDeferredOverflow() {
return false;
}
+
+ @Override
+ public void close() {
+ // no-op
+ }
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
index d5bb3f4261..0d7d30d350 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/Admin.java
@@ -959,7 +959,6 @@ public class Admin implements KeywordExecutable {
var zTableLocksPath = context.getServerPaths().createTableLocksPath();
var zk = context.getZooSession();
ServiceLock adminLock = null;
- Map<FateInstanceType,FateStore<Admin>> fateStores;
Map<FateInstanceType,ReadOnlyFateStore<Admin>> readOnlyFateStores = null;
try {
@@ -967,20 +966,22 @@ public class Admin implements KeywordExecutable {
cancelSubmittedFateTxs(context, fateOpsCommand.fateIdList);
} else if (fateOpsCommand.fail) {
adminLock = createAdminLock(context);
- fateStores = createFateStores(context, zk, adminLock);
- for (String fateIdStr : fateOpsCommand.fateIdList) {
- if (!admin.prepFail(fateStores, fateIdStr)) {
- throw new AccumuloException("Could not fail transaction: " +
fateIdStr);
+ try (var fateStores = createFateStores(context, zk, adminLock)) {
+ for (String fateIdStr : fateOpsCommand.fateIdList) {
+ if (!admin.prepFail(fateStores.getStoresMap(), fateIdStr)) {
+ throw new AccumuloException("Could not fail transaction: " +
fateIdStr);
+ }
}
}
} else if (fateOpsCommand.delete) {
adminLock = createAdminLock(context);
- fateStores = createFateStores(context, zk, adminLock);
- for (String fateIdStr : fateOpsCommand.fateIdList) {
- if (!admin.prepDelete(fateStores, fateIdStr)) {
- throw new AccumuloException("Could not delete transaction: " +
fateIdStr);
+ try (var fateStores = createFateStores(context, zk, adminLock)) {
+ for (String fateIdStr : fateOpsCommand.fateIdList) {
+ if (!admin.prepDelete(fateStores.getStoresMap(), fateIdStr)) {
+ throw new AccumuloException("Could not delete transaction: " +
fateIdStr);
+ }
+ admin.deleteLocks(zk, zTableLocksPath, fateIdStr);
}
- admin.deleteLocks(zk, zTableLocksPath, fateIdStr);
}
}
@@ -991,7 +992,7 @@ public class Admin implements KeywordExecutable {
getCmdLineStatusFilters(fateOpsCommand.states);
EnumSet<FateInstanceType> typesFilter =
getCmdLineInstanceTypeFilters(fateOpsCommand.instanceTypes);
- readOnlyFateStores = createReadOnlyFateStores(context, zk,
Constants.ZFATE);
+ readOnlyFateStores = createReadOnlyFateStores(context, zk);
admin.print(readOnlyFateStores, zk, zTableLocksPath, new
Formatter(System.out),
fateIdFilter, statusFilter, typesFilter);
// print line break at the end
@@ -1000,7 +1001,7 @@ public class Admin implements KeywordExecutable {
if (fateOpsCommand.summarize) {
if (readOnlyFateStores == null) {
- readOnlyFateStores = createReadOnlyFateStores(context, zk,
Constants.ZFATE);
+ readOnlyFateStores = createReadOnlyFateStores(context, zk);
}
summarizeFateTx(context, fateOpsCommand, admin, readOnlyFateStores,
zTableLocksPath);
}
@@ -1011,20 +1012,19 @@ public class Admin implements KeywordExecutable {
}
}
- private Map<FateInstanceType,FateStore<Admin>>
createFateStores(ServerContext context,
- ZooSession zk, ServiceLock adminLock) throws InterruptedException,
KeeperException {
+ private FateStores createFateStores(ServerContext context, ZooSession zk,
ServiceLock adminLock)
+ throws InterruptedException, KeeperException {
var lockId = adminLock.getLockID();
MetaFateStore<Admin> mfs = new MetaFateStore<>(zk, lockId, null);
UserFateStore<Admin> ufs =
new UserFateStore<>(context, SystemTables.FATE.tableName(), lockId,
null);
- return Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
+ return new FateStores(FateInstanceType.META, mfs, FateInstanceType.USER,
ufs);
}
- private Map<FateInstanceType,ReadOnlyFateStore<Admin>>
- createReadOnlyFateStores(ServerContext context, ZooSession zk, String
fateZkPath)
- throws InterruptedException, KeeperException {
- MetaFateStore<Admin> readOnlyMFS = new MetaFateStore<>(zk, null, null);
- UserFateStore<Admin> readOnlyUFS =
+ private Map<FateInstanceType,ReadOnlyFateStore<Admin>>
createReadOnlyFateStores(
+ ServerContext context, ZooSession zk) throws InterruptedException,
KeeperException {
+ ReadOnlyFateStore<Admin> readOnlyMFS = new MetaFateStore<>(zk, null, null);
+ ReadOnlyFateStore<Admin> readOnlyUFS =
new UserFateStore<>(context, SystemTables.FATE.tableName(), null,
null);
return Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER,
readOnlyUFS);
}
@@ -1343,4 +1343,27 @@ public class Admin implements KeywordExecutable {
System.out.println("-".repeat(50));
System.out.println();
}
+
+ /**
+ * Wrapper around the fate stores
+ */
+ private static class FateStores implements AutoCloseable {
+ private final Map<FateInstanceType,FateStore<Admin>> storesMap;
+
+ private FateStores(FateInstanceType type1, FateStore<Admin> store1,
FateInstanceType type2,
+ FateStore<Admin> store2) {
+ storesMap = Map.of(type1, store1, type2, store2);
+ }
+
+ private Map<FateInstanceType,FateStore<Admin>> getStoresMap() {
+ return storesMap;
+ }
+
+ @Override
+ public void close() {
+ for (var fs : storesMap.values()) {
+ fs.close();
+ }
+ }
+ }
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java
b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java
index 47f9e90e4b..a771f595a9 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/checkCommand/TableLocksCheckRunner.java
@@ -61,54 +61,55 @@ public class TableLocksCheckRunner implements CheckRunner {
final AdminUtil<Admin> admin = new AdminUtil<>();
final var zTableLocksPath =
context.getServerPaths().createTableLocksPath();
final var zk = context.getZooSession();
- final MetaFateStore<Admin> mfs = new MetaFateStore<>(zk, null, null);
- final UserFateStore<Admin> ufs =
- new UserFateStore<>(context, SystemTables.FATE.tableName(), null,
null);
+ try (final MetaFateStore<Admin> mfs = new MetaFateStore<>(zk, null, null);
final UserFateStore<
+ Admin> ufs = new UserFateStore<>(context,
SystemTables.FATE.tableName(), null, null)) {
- log.trace("Ensuring table and namespace locks are valid...");
+ log.trace("Ensuring table and namespace locks are valid...");
- var tableIds = context.tableOperations().tableIdMap().values();
- var namespaceIds = context.namespaceOperations().namespaceIdMap().values();
- List<String> lockedIds =
-
context.getZooSession().asReader().getChildren(zTableLocksPath.toString());
- boolean locksExist = !lockedIds.isEmpty();
+ var tableIds = context.tableOperations().tableIdMap().values();
+ var namespaceIds =
context.namespaceOperations().namespaceIdMap().values();
+ List<String> lockedIds =
+
context.getZooSession().asReader().getChildren(zTableLocksPath.toString());
+ boolean locksExist = !lockedIds.isEmpty();
- if (locksExist) {
- lockedIds.removeAll(tableIds);
- lockedIds.removeAll(namespaceIds);
- if (!lockedIds.isEmpty()) {
- status = Admin.CheckCommand.CheckStatus.FAILED;
- log.warn("...Some table and namespace locks are INVALID (the
table/namespace DNE): "
- + lockedIds);
+ if (locksExist) {
+ lockedIds.removeAll(tableIds);
+ lockedIds.removeAll(namespaceIds);
+ if (!lockedIds.isEmpty()) {
+ status = Admin.CheckCommand.CheckStatus.FAILED;
+ log.warn("...Some table and namespace locks are INVALID (the
table/namespace DNE): "
+ + lockedIds);
+ } else {
+ log.trace("...locks are valid");
+ }
} else {
- log.trace("...locks are valid");
+ log.trace("...no locks present");
}
- } else {
- log.trace("...no locks present");
- }
- log.trace("Ensuring table and namespace locks are associated with a FATE
op...");
+ log.trace("Ensuring table and namespace locks are associated with a FATE
op...");
- if (locksExist) {
- final var fateStatus =
- admin.getStatus(Map.of(FateInstanceType.META, mfs,
FateInstanceType.USER, ufs), zk,
- zTableLocksPath, null, null, null);
- if (!fateStatus.getDanglingHeldLocks().isEmpty()
- || !fateStatus.getDanglingWaitingLocks().isEmpty()) {
- status = Admin.CheckCommand.CheckStatus.FAILED;
- log.warn("The following locks did not have an associated FATE
operation\n");
- for (Map.Entry<FateId,List<String>> entry :
fateStatus.getDanglingHeldLocks().entrySet()) {
- log.warn("fateId: " + entry.getKey() + " locked: " +
entry.getValue());
- }
- for (Map.Entry<FateId,List<String>> entry :
fateStatus.getDanglingWaitingLocks()
- .entrySet()) {
- log.warn("fateId: " + entry.getKey() + " locking: " +
entry.getValue());
+ if (locksExist) {
+ final var fateStatus =
+ admin.getStatus(Map.of(FateInstanceType.META, mfs,
FateInstanceType.USER, ufs), zk,
+ zTableLocksPath, null, null, null);
+ if (!fateStatus.getDanglingHeldLocks().isEmpty()
+ || !fateStatus.getDanglingWaitingLocks().isEmpty()) {
+ status = Admin.CheckCommand.CheckStatus.FAILED;
+ log.warn("The following locks did not have an associated FATE
operation\n");
+ for (Map.Entry<FateId,List<String>> entry :
fateStatus.getDanglingHeldLocks()
+ .entrySet()) {
+ log.warn("fateId: " + entry.getKey() + " locked: " +
entry.getValue());
+ }
+ for (Map.Entry<FateId,List<String>> entry :
fateStatus.getDanglingWaitingLocks()
+ .entrySet()) {
+ log.warn("fateId: " + entry.getKey() + " locking: " +
entry.getValue());
+ }
+ } else {
+ log.trace("...locks are valid");
}
} else {
- log.trace("...locks are valid");
+ log.trace("...no locks present");
}
- } else {
- log.trace("...no locks present");
}
return status;
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index 6365df4a80..7c9a4c5d56 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -252,10 +252,10 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
@Test
public void testCompactionCommitAndDeadDetectionRoot() throws Exception {
var ctx = getCluster().getServerContext();
- FateStore<Manager> metaFateStore =
- new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(), null);
- try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build();
+ FateStore<Manager> metaFateStore =
+ new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(),
null)) {
var tableId = ctx.getTableId(SystemTables.ROOT.tableName());
var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
var fateId = createCompactionCommitAndDeadMetadata(c, metaFateStore,
@@ -271,10 +271,10 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
@Test
public void testCompactionCommitAndDeadDetectionMeta() throws Exception {
var ctx = getCluster().getServerContext();
- FateStore<Manager> metaFateStore =
- new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(), null);
- try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build();
+ FateStore<Manager> metaFateStore =
+ new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(),
null)) {
// Metadata table by default already has 2 tablets
var tableId = ctx.getTableId(SystemTables.METADATA.tableName());
var allCids = new HashMap<TableId,List<ExternalCompactionId>>();
@@ -293,9 +293,9 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
var ctx = getCluster().getServerContext();
final String tableName = getUniqueNames(1)[0];
- try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
- UserFateStore<Manager> userFateStore =
- new UserFateStore<>(ctx, SystemTables.FATE.tableName(),
testLock.getLockID(), null);
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build();
+ UserFateStore<Manager> userFateStore =
+ new UserFateStore<>(ctx, SystemTables.FATE.tableName(),
testLock.getLockID(), null)) {
SortedSet<Text> splits = new TreeSet<>();
splits.add(new Text(row(MAX_DATA / 2)));
c.tableOperations().create(tableName, new
NewTableConfiguration().withSplits(splits));
@@ -317,11 +317,11 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
var ctx = getCluster().getServerContext();
final String userTable = getUniqueNames(1)[0];
- try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
- UserFateStore<Manager> userFateStore =
- new UserFateStore<>(ctx, SystemTables.FATE.tableName(),
testLock.getLockID(), null);
- FateStore<Manager> metaFateStore =
- new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(), null);
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build();
+ FateStore<Manager> userFateStore =
+ new UserFateStore<>(ctx, SystemTables.FATE.tableName(),
testLock.getLockID(), null);
+ FateStore<Manager> metaFateStore =
+ new MetaFateStore<>(ctx.getZooSession(), testLock.getLockID(),
null)) {
SortedSet<Text> splits = new TreeSet<>();
splits.add(new Text(row(MAX_DATA / 2)));
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java
b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java
index 70fa454965..ae6d485d05 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresITBase.java
@@ -73,53 +73,55 @@ public abstract class MultipleStoresITBase extends
SharedMiniClusterBase {
final ZooUtil.LockID lock1 = new ZooUtil.LockID("/locks", "L1", 50);
final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
Map<FateId,FateStore.FateReservation> activeReservations;
- final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1,
null);
- final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2,
null);
- final FateId fakeFateId = FateId.from(store1.type(), UUID.randomUUID());
+ try (final FateStore<SleepingTestEnv> store1 =
testStoreFactory.create(lock1, null);
+ final FateStore<SleepingTestEnv> store2 =
testStoreFactory.create(lock2, null)) {
+ final FateId fakeFateId = FateId.from(store1.type(), UUID.randomUUID());
- // Create the fate ids using store1
- for (int i = 0; i < numFateIds; i++) {
- assertTrue(allIds.add(store1.create()));
- }
- assertEquals(numFateIds, allIds.size());
-
- // Reserve half the fate ids using store1 and rest using store2, after
reserving a fate id in
- // one, should not be able to reserve the same in the other. Should also
not matter that all the
- // ids were created using store1
- int count = 0;
- for (FateId fateId : allIds) {
- if (count % 2 == 0) {
- reservations.add(store1.reserve(fateId));
- assertTrue(store2.tryReserve(fateId).isEmpty());
- } else {
- reservations.add(store2.reserve(fateId));
- assertTrue(store1.tryReserve(fateId).isEmpty());
+ // Create the fate ids using store1
+ for (int i = 0; i < numFateIds; i++) {
+ assertTrue(allIds.add(store1.create()));
}
- count++;
- }
- // Try to reserve a non-existent fate id
- assertTrue(store1.tryReserve(fakeFateId).isEmpty());
- assertTrue(store2.tryReserve(fakeFateId).isEmpty());
- // Both stores should return the same reserved transactions
- activeReservations = store1.getActiveReservations();
- assertEquals(allIds, activeReservations.keySet());
- activeReservations = store2.getActiveReservations();
- assertEquals(allIds, activeReservations.keySet());
-
- // Test setting/getting the TStatus and unreserving the transactions
- for (int i = 0; i < allIds.size(); i++) {
- var reservation = reservations.get(i);
- assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus());
- reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED);
- assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED,
reservation.getStatus());
- reservation.delete();
- reservation.unreserve(Duration.ofMillis(0));
- // Attempt to set a status on a tx that has been unreserved (should
throw exception)
- assertThrows(IllegalStateException.class,
- () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW));
+ assertEquals(numFateIds, allIds.size());
+
+ // Reserve half the fate ids using store1 and rest using store2, after
reserving a fate id in
+ // one, should not be able to reserve the same in the other. Should also
not matter that all
+ // the
+ // ids were created using store1
+ int count = 0;
+ for (FateId fateId : allIds) {
+ if (count % 2 == 0) {
+ reservations.add(store1.reserve(fateId));
+ assertTrue(store2.tryReserve(fateId).isEmpty());
+ } else {
+ reservations.add(store2.reserve(fateId));
+ assertTrue(store1.tryReserve(fateId).isEmpty());
+ }
+ count++;
+ }
+ // Try to reserve a non-existent fate id
+ assertTrue(store1.tryReserve(fakeFateId).isEmpty());
+ assertTrue(store2.tryReserve(fakeFateId).isEmpty());
+ // Both stores should return the same reserved transactions
+ activeReservations = store1.getActiveReservations();
+ assertEquals(allIds, activeReservations.keySet());
+ activeReservations = store2.getActiveReservations();
+ assertEquals(allIds, activeReservations.keySet());
+
+ // Test setting/getting the TStatus and unreserving the transactions
+ for (int i = 0; i < allIds.size(); i++) {
+ var reservation = reservations.get(i);
+ assertEquals(ReadOnlyFateStore.TStatus.NEW, reservation.getStatus());
+ reservation.setStatus(ReadOnlyFateStore.TStatus.SUBMITTED);
+ assertEquals(ReadOnlyFateStore.TStatus.SUBMITTED,
reservation.getStatus());
+ reservation.delete();
+ reservation.unreserve(Duration.ofMillis(0));
+ // Attempt to set a status on a tx that has been unreserved (should
throw exception)
+ assertThrows(IllegalStateException.class,
+ () -> reservation.setStatus(ReadOnlyFateStore.TStatus.NEW));
+ }
+ assertTrue(store1.getActiveReservations().isEmpty());
+ assertTrue(store2.getActiveReservations().isEmpty());
}
- assertTrue(store1.getActiveReservations().isEmpty());
- assertTrue(store2.getActiveReservations().isEmpty());
}
@Test
@@ -132,11 +134,12 @@ public abstract class MultipleStoresITBase extends
SharedMiniClusterBase {
// Tests that reserve() doesn't hang indefinitely and instead throws an
error
// on reserve() a non-existent transaction.
final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
- final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock,
null);
- final FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID());
+ try (final FateStore<SleepingTestEnv> store =
testStoreFactory.create(lock, null)) {
+ final FateId fakeFateId = FateId.from(store.type(), UUID.randomUUID());
- var err = assertThrows(IllegalStateException.class, () ->
store.reserve(fakeFateId));
- assertTrue(err.getMessage().contains(fakeFateId.canonical()));
+ var err = assertThrows(IllegalStateException.class, () ->
store.reserve(fakeFateId));
+ assertTrue(err.getMessage().contains(fakeFateId.canonical()));
+ }
}
@Test
@@ -150,30 +153,32 @@ public abstract class MultipleStoresITBase extends
SharedMiniClusterBase {
final Set<FateId> allIds = new HashSet<>();
final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new
ArrayList<>();
final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
- final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock,
null);
-
- // Create some FateIds and ensure that they can be reserved
- for (int i = 0; i < numFateIds; i++) {
- FateId fateId = store.create();
- assertTrue(allIds.add(fateId));
- var reservation = store.tryReserve(fateId);
- assertFalse(reservation.isEmpty());
- reservations.add(reservation.orElseThrow());
- }
- assertEquals(numFateIds, allIds.size());
+ try (final FateStore<SleepingTestEnv> store =
testStoreFactory.create(lock, null)) {
- // Try to reserve again, should not reserve
- for (FateId fateId : allIds) {
- assertTrue(store.tryReserve(fateId).isEmpty());
- }
+ // Create some FateIds and ensure that they can be reserved
+ for (int i = 0; i < numFateIds; i++) {
+ FateId fateId = store.create();
+ assertTrue(allIds.add(fateId));
+ var reservation = store.tryReserve(fateId);
+ assertFalse(reservation.isEmpty());
+ reservations.add(reservation.orElseThrow());
+ }
+ assertEquals(numFateIds, allIds.size());
- // Unreserve all the FateIds
- for (var reservation : reservations) {
- reservation.unreserve(Duration.ofMillis(0));
- }
- // Try to unreserve again (should throw exception)
- for (var reservation : reservations) {
- assertThrows(IllegalStateException.class, () ->
reservation.unreserve(Duration.ofMillis(0)));
+ // Try to reserve again, should not reserve
+ for (FateId fateId : allIds) {
+ assertTrue(store.tryReserve(fateId).isEmpty());
+ }
+
+ // Unreserve all the FateIds
+ for (var reservation : reservations) {
+ reservation.unreserve(Duration.ofMillis(0));
+ }
+ // Try to unreserve again (should throw exception)
+ for (var reservation : reservations) {
+ assertThrows(IllegalStateException.class,
+ () -> reservation.unreserve(Duration.ofMillis(0)));
+ }
}
}
@@ -188,39 +193,40 @@ public abstract class MultipleStoresITBase extends
SharedMiniClusterBase {
final Set<FateId> allIds = new HashSet<>();
final List<FateStore.FateTxStore<SleepingTestEnv>> reservations = new
ArrayList<>();
final ZooUtil.LockID lock = new ZooUtil.LockID("/locks", "L1", 50);
- final FateStore<SleepingTestEnv> store = testStoreFactory.create(lock,
null);
-
- // Create some FateIds and ensure that they can be reserved
- for (int i = 0; i < numFateIds; i++) {
- FateId fateId = store.create();
- assertTrue(allIds.add(fateId));
- var reservation = store.tryReserve(fateId);
- assertFalse(reservation.isEmpty());
- reservations.add(reservation.orElseThrow());
- }
- assertEquals(numFateIds, allIds.size());
+ try (final FateStore<SleepingTestEnv> store =
testStoreFactory.create(lock, null)) {
- // Unreserve all
- for (var reservation : reservations) {
- reservation.unreserve(Duration.ofMillis(0));
- }
+ // Create some FateIds and ensure that they can be reserved
+ for (int i = 0; i < numFateIds; i++) {
+ FateId fateId = store.create();
+ assertTrue(allIds.add(fateId));
+ var reservation = store.tryReserve(fateId);
+ assertFalse(reservation.isEmpty());
+ reservations.add(reservation.orElseThrow());
+ }
+ assertEquals(numFateIds, allIds.size());
- // Ensure they can be reserved again, and delete and unreserve this time
- for (FateId fateId : allIds) {
- // Verify that the tx status is still NEW after unreserving since it
hasn't been deleted
- assertEquals(ReadOnlyFateStore.TStatus.NEW,
store.read(fateId).getStatus());
- var reservation = store.tryReserve(fateId);
- assertFalse(reservation.isEmpty());
- reservation.orElseThrow().delete();
- reservation.orElseThrow().unreserve(Duration.ofMillis(0));
- }
+ // Unreserve all
+ for (var reservation : reservations) {
+ reservation.unreserve(Duration.ofMillis(0));
+ }
+
+ // Ensure they can be reserved again, and delete and unreserve this time
+ for (FateId fateId : allIds) {
+ // Verify that the tx status is still NEW after unreserving since it
hasn't been deleted
+ assertEquals(ReadOnlyFateStore.TStatus.NEW,
store.read(fateId).getStatus());
+ var reservation = store.tryReserve(fateId);
+ assertFalse(reservation.isEmpty());
+ reservation.orElseThrow().delete();
+ reservation.orElseThrow().unreserve(Duration.ofMillis(0));
+ }
- for (FateId fateId : allIds) {
- // Verify that the tx is now unknown since it has been deleted
- assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN,
store.read(fateId).getStatus());
- // Attempt to reserve a deleted txn, should throw an exception and not
wait indefinitely
- var err = assertThrows(IllegalStateException.class, () ->
store.reserve(fateId));
- assertTrue(err.getMessage().contains(fateId.canonical()));
+ for (FateId fateId : allIds) {
+ // Verify that the tx is now unknown since it has been deleted
+ assertEquals(ReadOnlyFateStore.TStatus.UNKNOWN,
store.read(fateId).getStatus());
+ // Attempt to reserve a deleted txn, should throw an exception and not
wait indefinitely
+ var err = assertThrows(IllegalStateException.class, () ->
store.reserve(fateId));
+ assertTrue(err.getMessage().contains(fateId.canonical()));
+ }
}
}
@@ -239,42 +245,43 @@ public abstract class MultipleStoresITBase extends
SharedMiniClusterBase {
final ZooUtil.LockID lock2 = new ZooUtil.LockID("/locks", "L2", 52);
final Set<ZooUtil.LockID> liveLocks = new HashSet<>();
final Predicate<ZooUtil.LockID> isLockHeld = liveLocks::contains;
- final FateStore<SleepingTestEnv> store1 = testStoreFactory.create(lock1,
isLockHeld);
- final FateStore<SleepingTestEnv> store2 = testStoreFactory.create(lock2,
isLockHeld);
+ try (final FateStore<SleepingTestEnv> store1 =
testStoreFactory.create(lock1, isLockHeld);
+ final FateStore<SleepingTestEnv> store2 =
testStoreFactory.create(lock2, isLockHeld)) {
- liveLocks.add(lock1);
- liveLocks.add(lock2);
-
- Fate<SleepingTestEnv> fate1 = new Fate<>(testEnv1, store1, true,
Object::toString,
- DefaultConfiguration.getInstance(), new
ScheduledThreadPoolExecutor(2));
- Fate<SleepingTestEnv> fate2 = new Fate<>(testEnv2, store2, false,
Object::toString,
- DefaultConfiguration.getInstance(), new
ScheduledThreadPoolExecutor(2));
+ liveLocks.add(lock1);
+ liveLocks.add(lock2);
- try {
- for (int i = 0; i < numFateIds; i++) {
- FateId fateId;
- // Start half the txns using fate1, and the other half using fate2
- if (i % 2 == 0) {
- fateId = fate1.startTransaction();
- fate1.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(),
true, "test");
- } else {
- fateId = fate2.startTransaction();
- fate2.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(),
true, "test");
+ Fate<SleepingTestEnv> fate1 = new Fate<>(testEnv1, store1, true,
Object::toString,
+ DefaultConfiguration.getInstance(), new
ScheduledThreadPoolExecutor(2));
+ Fate<SleepingTestEnv> fate2 = new Fate<>(testEnv2, store2, false,
Object::toString,
+ DefaultConfiguration.getInstance(), new
ScheduledThreadPoolExecutor(2));
+
+ try {
+ for (int i = 0; i < numFateIds; i++) {
+ FateId fateId;
+ // Start half the txns using fate1, and the other half using fate2
+ if (i % 2 == 0) {
+ fateId = fate1.startTransaction();
+ fate1.seedTransaction(TEST_FATE_OP, fateId, new
SleepingTestRepo(), true, "test");
+ } else {
+ fateId = fate2.startTransaction();
+ fate2.seedTransaction(TEST_FATE_OP, fateId, new
SleepingTestRepo(), true, "test");
+ }
+ allIds.add(fateId);
}
- allIds.add(fateId);
- }
- assertEquals(numFateIds, allIds.size());
+ assertEquals(numFateIds, allIds.size());
- // Should be able to wait for completion on any fate instance
- for (FateId fateId : allIds) {
- fate2.waitForCompletion(fateId);
+ // Should be able to wait for completion on any fate instance
+ for (FateId fateId : allIds) {
+ fate2.waitForCompletion(fateId);
+ }
+ // Ensure that all txns have been executed and have only been executed
once
+ assertTrue(Collections.disjoint(testEnv1.executedOps,
testEnv2.executedOps));
+ assertEquals(allIds, Sets.union(testEnv1.executedOps,
testEnv2.executedOps));
+ } finally {
+ fate1.shutdown(1, TimeUnit.MINUTES);
+ fate2.shutdown(1, TimeUnit.MINUTES);
}
- // Ensure that all txns have been executed and have only been executed
once
- assertTrue(Collections.disjoint(testEnv1.executedOps,
testEnv2.executedOps));
- assertEquals(allIds, Sets.union(testEnv1.executedOps,
testEnv2.executedOps));
- } finally {
- fate1.shutdown(1, TimeUnit.MINUTES);
- fate2.shutdown(1, TimeUnit.MINUTES);
}
}
@@ -304,73 +311,79 @@ public abstract class MultipleStoresITBase extends
SharedMiniClusterBase {
final AccumuloConfiguration config =
FateTestUtil.createTestFateConfig(numThreads);
Map<FateId,FateStore.FateReservation> reservations;
- final FateStore<LatchTestEnv> store1 = testStoreFactory.create(lock1,
isLockHeld);
- liveLocks.add(lock1);
- Fate<LatchTestEnv> fate1 = null;
- Fate<LatchTestEnv> fate2 = null;
-
- try {
- fate1 = new FastFate<>(testEnv1, store1, true, Object::toString, config);
- // Ensure nothing is reserved yet
- assertTrue(store1.getActiveReservations().isEmpty());
-
- // Create transactions
- for (int i = 0; i < numFateIds; i++) {
- FateId fateId;
- fateId = fate1.startTransaction();
- fate1.seedTransaction(TEST_FATE_OP, fateId, new LatchTestRepo(), true,
"test");
- allIds.add(fateId);
- }
- assertEquals(numFateIds, allIds.size());
-
- // Wait for all the fate worker threads to start working on the
transactions
- Wait.waitFor(() -> testEnv1.numWorkers.get() == numFateIds);
- // Each fate worker will be hung up working (IN_PROGRESS) on a single
transaction
+ try (final FateStore<LatchTestEnv> store1 = testStoreFactory.create(lock1,
isLockHeld)) {
+ liveLocks.add(lock1);
+ Fate<LatchTestEnv> fate1 = null;
+ Fate<LatchTestEnv> fate2 = null;
- // Verify store1 has the transactions reserved and that they were
reserved with lock1
- reservations = store1.getActiveReservations();
- assertEquals(allIds, reservations.keySet());
- reservations.values().forEach(res -> assertEquals(lock1,
res.getLockID()));
-
- final FateStore<LatchTestEnv> store2 = testStoreFactory.create(lock2,
isLockHeld);
-
- // Verify store2 can see the reserved transactions even though they were
reserved using
- // store1
- reservations = store2.getActiveReservations();
- assertEquals(allIds, reservations.keySet());
- reservations.values().forEach(res -> assertEquals(lock1,
res.getLockID()));
-
- // Simulate what would happen if the Manager using the Fate object
(fate1) died.
- // isLockHeld would return false for the LockId of the Manager that died
(in this case, lock1)
- // and true for the new Manager's lock (lock2)
- liveLocks.remove(lock1);
- liveLocks.add(lock2);
+ try {
+ fate1 = new FastFate<>(testEnv1, store1, true, Object::toString,
config);
+ // Ensure nothing is reserved yet
+ assertTrue(store1.getActiveReservations().isEmpty());
- // Create the new Fate/start the Fate threads (the work finder and the
workers).
- // Don't run another dead reservation cleaner since we already have one
running from fate1.
- fate2 = new Fate<>(testEnv2, store2, false, Object::toString, config,
- new ScheduledThreadPoolExecutor(2));
-
- // Wait for the "dead" reservations to be deleted and picked up again
(reserved using
- // fate2/store2/lock2 now).
- // They are considered "dead" if they are held by lock1 in this test. We
don't have to worry
- // about fate1/store1/lock1 being used to reserve the transactions again
since all
- // the workers for fate1 are hung up
- Wait.waitFor(() -> {
- Map<FateId,FateStore.FateReservation> store2Reservations =
store2.getActiveReservations();
- boolean allReservedWithLock2 =
- store2Reservations.values().stream().allMatch(entry ->
entry.getLockID().equals(lock2));
- return store2Reservations.keySet().equals(allIds) &&
allReservedWithLock2;
- }, fate1.getDeadResCleanupDelay().toMillis() * 2);
- } finally {
- // Finish work and shutdown
- testEnv1.workersLatch.countDown();
- testEnv2.workersLatch.countDown();
- if (fate1 != null) {
- fate1.shutdown(1, TimeUnit.MINUTES);
- }
- if (fate2 != null) {
- fate2.shutdown(1, TimeUnit.MINUTES);
+ // Create transactions
+ for (int i = 0; i < numFateIds; i++) {
+ FateId fateId;
+ fateId = fate1.startTransaction();
+ fate1.seedTransaction(TEST_FATE_OP, fateId, new LatchTestRepo(),
true, "test");
+ allIds.add(fateId);
+ }
+ assertEquals(numFateIds, allIds.size());
+
+ // Wait for all the fate worker threads to start working on the
transactions
+ Wait.waitFor(() -> testEnv1.numWorkers.get() == numFateIds);
+ // Each fate worker will be hung up working (IN_PROGRESS) on a single
transaction
+
+ // Verify store1 has the transactions reserved and that they were
reserved with lock1
+ reservations = store1.getActiveReservations();
+ assertEquals(allIds, reservations.keySet());
+ reservations.values().forEach(res -> assertEquals(lock1,
res.getLockID()));
+
+ try (final FateStore<LatchTestEnv> store2 =
testStoreFactory.create(lock2, isLockHeld)) {
+
+ // Verify store2 can see the reserved transactions even though they
were reserved using
+ // store1
+ reservations = store2.getActiveReservations();
+ assertEquals(allIds, reservations.keySet());
+ reservations.values().forEach(res -> assertEquals(lock1,
res.getLockID()));
+
+ // Simulate what would happen if the Manager using the Fate object
(fate1) died.
+ // isLockHeld would return false for the LockId of the Manager that
died (in this case,
+ // lock1)
+ // and true for the new Manager's lock (lock2)
+ liveLocks.remove(lock1);
+ liveLocks.add(lock2);
+
+ // Create the new Fate/start the Fate threads (the work finder and
the workers).
+ // Don't run another dead reservation cleaner since we already have
one running from
+ // fate1.
+ fate2 = new Fate<>(testEnv2, store2, false, Object::toString, config,
+ new ScheduledThreadPoolExecutor(2));
+
+ // Wait for the "dead" reservations to be deleted and picked up
again (reserved using
+ // fate2/store2/lock2 now).
+ // They are considered "dead" if they are held by lock1 in this
test. We don't have to
+ // worry
+ // about fate1/store1/lock1 being used to reserve the transactions
again since all
+ // the workers for fate1 are hung up
+ Wait.waitFor(() -> {
+ Map<FateId,FateStore.FateReservation> store2Reservations =
+ store2.getActiveReservations();
+ boolean allReservedWithLock2 = store2Reservations.values().stream()
+ .allMatch(entry -> entry.getLockID().equals(lock2));
+ return store2Reservations.keySet().equals(allIds) &&
allReservedWithLock2;
+ }, fate1.getDeadResCleanupDelay().toMillis() * 2);
+ }
+ } finally {
+ // Finish work and shutdown
+ testEnv1.workersLatch.countDown();
+ testEnv2.workersLatch.countDown();
+ if (fate1 != null) {
+ fate1.shutdown(1, TimeUnit.MINUTES);
+ }
+ if (fate2 != null) {
+ fate2.shutdown(1, TimeUnit.MINUTES);
+ }
}
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT_SimpleSuite.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT_SimpleSuite.java
index bddab38cc6..b890f09f1a 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT_SimpleSuite.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateExecutionOrderIT_SimpleSuite.java
@@ -25,6 +25,7 @@ import java.util.UUID;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.zookeeper.ZooSession;
@@ -44,11 +45,13 @@ public class MetaFateExecutionOrderIT_SimpleSuite extends
FateExecutionOrderITBa
try (var zk = new ZooSession(getClass().getSimpleName() + ".mkdirs",
conf)) {
zk.asReaderWriter().mkdirs(ZK_ROOT);
}
- try (var zk = new ZooSession(getClass().getSimpleName() + ".fakeroot",
- conf.get(Property.INSTANCE_ZK_HOST) + ZK_ROOT,
- (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
- conf.get(Property.INSTANCE_SECRET))) {
- testMethod.execute(new MetaFateStore<>(zk, createDummyLockID(), null),
sctx);
+ try (
+ var zk = new ZooSession(getClass().getSimpleName() + ".fakeroot",
+ conf.get(Property.INSTANCE_ZK_HOST) + ZK_ROOT,
+ (int) conf.getTimeInMillis(Property.INSTANCE_ZK_TIMEOUT),
+ conf.get(Property.INSTANCE_SECRET));
+ FateStore<FeoTestEnv> fs = new MetaFateStore<>(zk,
createDummyLockID(), null)) {
+ testMethod.execute(fs, sctx);
}
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
index dd7082709c..cfd28ecda1 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateIT.java
@@ -30,6 +30,7 @@ import java.io.UncheckedIOException;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.zookeeper.ZooSession;
@@ -64,8 +65,10 @@ public class MetaFateIT extends FateITBase {
expect(sctx.getZooSession()).andReturn(zk).anyTimes();
replay(sctx);
- testMethod.execute(
- new MetaFateStore<>(zk, createDummyLockID(), null, maxDeferred,
fateIdGenerator), sctx);
+ try (FateStore<TestEnv> fs =
+ new MetaFateStore<>(zk, createDummyLockID(), null, maxDeferred,
fateIdGenerator)) {
+ testMethod.execute(fs, sctx);
+ }
}
@Override
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
index 2f53ee7697..4f81d2857e 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateOpsCommandsIT.java
@@ -42,7 +42,9 @@ public class MetaFateOpsCommandsIT extends
FateOpsCommandsITBase {
ServerContext context = getCluster().getServerContext();
var zk = context.getZooSession();
// test should not be reserving txns or checking reservations, so null
lockID and isLockHeld
- testMethod.execute(new MetaFateStore<>(zk, null, null), context);
+ try (FateStore<LatchTestEnv> fs = new MetaFateStore<>(zk, null, null)) {
+ testMethod.execute(fs, context);
+ }
}
/**
@@ -62,7 +64,9 @@ public class MetaFateOpsCommandsIT extends
FateOpsCommandsITBase {
ZooUtil.LockID lockID = testLock.getLockID();
Predicate<ZooUtil.LockID> isLockHeld =
lock -> ServiceLock.isLockHeld(context.getZooCache(), lock);
- testMethod.execute(new MetaFateStore<>(zk, lockID, isLockHeld), context);
+ try (FateStore<LatchTestEnv> fs = new MetaFateStore<>(zk, lockID,
isLockHeld)) {
+ testMethod.execute(fs, context);
+ }
} finally {
if (testLock != null) {
testLock.unlock();
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolsWatcherIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolsWatcherIT.java
index 1fc61c1c00..5c5b26d2e0 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolsWatcherIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFatePoolsWatcherIT.java
@@ -26,6 +26,7 @@ import static org.easymock.EasyMock.replay;
import java.io.File;
import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.test.fate.FatePoolsWatcherITBase;
@@ -56,7 +57,9 @@ public class MetaFatePoolsWatcherIT extends
FatePoolsWatcherITBase {
expect(sctx.getZooSession()).andReturn(zk).anyTimes();
replay(sctx);
- testMethod.execute(
- new MetaFateStore<>(zk, createDummyLockID(), null, maxDeferred,
fateIdGenerator), sctx);
+ try (FateStore<PoolResizeTestEnv> fs =
+ new MetaFateStore<>(zk, createDummyLockID(), null, maxDeferred,
fateIdGenerator)) {
+ testMethod.execute(fs, sctx);
+ }
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java
index 9f7ad526ae..4735e9ee6e 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStatusEnforcementIT.java
@@ -26,6 +26,7 @@ import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.test.fate.FateStatusEnforcementITBase;
import org.apache.accumulo.test.fate.FateTestUtil;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
@@ -50,4 +51,9 @@ public class MetaFateStatusEnforcementIT extends
FateStatusEnforcementITBase {
fateId = store.create();
txStore = store.reserve(fateId);
}
+
+ @AfterEach
+ public void afterEachTeardown() {
+ store.close();
+ }
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
index 755ba42d7d..f41f993d99 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/meta/MetaFateStoreFateIT.java
@@ -73,13 +73,13 @@ public class MetaFateStoreFateIT extends FateStoreITBase {
ServerContext sctx = createMock(ServerContext.class);
expect(sctx.getZooSession()).andReturn(FateTestUtil.MetaFateZKSetup.getZk()).anyTimes();
replay(sctx);
- MetaFateStore<TestEnv> store = new
MetaFateStore<>(FateTestUtil.MetaFateZKSetup.getZk(),
- createDummyLockID(), null, maxDeferred, fateIdGenerator);
-
- // Check that the store has no transactions before and after each test
- assertEquals(0, store.list().count());
- testMethod.execute(store, sctx);
- assertEquals(0, store.list().count());
+ try (FateStore<TestEnv> store = new
MetaFateStore<>(FateTestUtil.MetaFateZKSetup.getZk(),
+ createDummyLockID(), null, maxDeferred, fateIdGenerator)) {
+ // Check that the store has no transactions before and after each test
+ assertEquals(0, store.list().count());
+ testMethod.execute(store, sctx);
+ assertEquals(0, store.list().count());
+ }
}
@Override
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT_SimpleSuite.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT_SimpleSuite.java
index 0260827dbf..acfe46589d 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT_SimpleSuite.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT_SimpleSuite.java
@@ -25,9 +25,12 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
import java.time.Duration;
import java.util.UUID;
+import java.util.function.Supplier;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.client.AccumuloClient;
+import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.clientImpl.ClientContext;
@@ -46,6 +49,8 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Suppliers;
+
public class FateMutatorImplIT_SimpleSuite extends SharedMiniClusterBase {
Logger log = LoggerFactory.getLogger(FateMutatorImplIT_SimpleSuite.class);
@@ -67,6 +72,16 @@ public class FateMutatorImplIT_SimpleSuite extends
SharedMiniClusterBase {
return Duration.ofMinutes(5);
}
+ private Supplier<ConditionalWriter> createWriterSupplier(AccumuloClient
client, String table) {
+ return Suppliers.memoize(() -> {
+ try {
+ return client.createConditionalWriter(table);
+ } catch (TableNotFoundException e) {
+ throw new IllegalStateException();
+ }
+ });
+ }
+
@Test
public void putRepo() throws Exception {
final String table = getUniqueNames(1)[0];
@@ -77,20 +92,22 @@ public class FateMutatorImplIT_SimpleSuite extends
SharedMiniClusterBase {
var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
+ Supplier<ConditionalWriter> writer = createWriterSupplier(client, table);
+
// add some repos in order
- var fateMutator = new FateMutatorImpl<TestEnv>(context, table, fateId);
+ var fateMutator = new FateMutatorImpl<TestEnv>(context, table, fateId,
writer);
fateMutator.putRepo(100, new TestRepo("test")).mutate();
- var fateMutator1 = new FateMutatorImpl<TestEnv>(context, table, fateId);
+ var fateMutator1 = new FateMutatorImpl<TestEnv>(context, table, fateId,
writer);
fateMutator1.putRepo(99, new TestRepo("test")).mutate();
- var fateMutator2 = new FateMutatorImpl<TestEnv>(context, table, fateId);
+ var fateMutator2 = new FateMutatorImpl<TestEnv>(context, table, fateId,
writer);
fateMutator2.putRepo(98, new TestRepo("test")).mutate();
// make sure we cant add a repo that has already been added
- var fateMutator3 = new FateMutatorImpl<TestEnv>(context, table, fateId);
+ var fateMutator3 = new FateMutatorImpl<TestEnv>(context, table, fateId,
writer);
assertThrows(IllegalStateException.class,
() -> fateMutator3.putRepo(98, new TestRepo("test")).mutate(),
"Repo in position 98 already exists. Expected to not be able to add
it again.");
- var fateMutator4 = new FateMutatorImpl<TestEnv>(context, table, fateId);
+ var fateMutator4 = new FateMutatorImpl<TestEnv>(context, table, fateId,
writer);
assertThrows(IllegalStateException.class,
() -> fateMutator4.putRepo(99, new TestRepo("test")).mutate(),
"Repo in position 99 already exists. Expected to not be able to add
it again.");
@@ -107,67 +124,71 @@ public class FateMutatorImplIT_SimpleSuite extends
SharedMiniClusterBase {
var fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
- // use require status passing all statuses. without the status column
present this should fail
+ Supplier<ConditionalWriter> writer = createWriterSupplier(client, table);
+
+ // use require status passing all statuses. without the status column
present this should
+ // fail
assertThrows(IllegalStateException.class,
- () -> new FateMutatorImpl<>(context, table, fateId)
+ () -> new FateMutatorImpl<>(context, table, fateId, writer)
.requireStatus(ReadOnlyFateStore.TStatus.values())
.putStatus(ReadOnlyFateStore.TStatus.NEW).mutate());
assertEquals(0, client.createScanner(table).stream().count());
- var status = new FateMutatorImpl<>(context, table, fateId)
+ var status = new FateMutatorImpl<>(context, table, fateId, writer)
.requireStatus(ReadOnlyFateStore.TStatus.values())
.putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate();
assertEquals(REJECTED, status);
assertEquals(0, client.createScanner(table).stream().count());
- // use require status without passing any statuses to require that the
status column is absent
- status = new FateMutatorImpl<>(context, table, fateId).requireStatus()
+ // use require status without passing any statuses to require that the
status column is
+ // absent
+ status = new FateMutatorImpl<>(context, table, fateId,
writer).requireStatus()
.putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate();
assertEquals(ACCEPTED, status);
- // try again with requiring an absent status column. this time it should
fail because we just
+ // try again with requiring an absent status column. this time it should
fail because we
+ // just
// put status NEW
assertThrows(IllegalStateException.class,
- () -> new FateMutatorImpl<>(context, table, fateId).requireStatus()
+ () -> new FateMutatorImpl<>(context, table, fateId,
writer).requireStatus()
.putStatus(ReadOnlyFateStore.TStatus.NEW).mutate(),
"Expected to not be able to use requireStatus() without passing any
statuses");
- status = new FateMutatorImpl<>(context, table, fateId).requireStatus()
+ status = new FateMutatorImpl<>(context, table, fateId,
writer).requireStatus()
.putStatus(ReadOnlyFateStore.TStatus.NEW).tryMutate();
assertEquals(REJECTED, status,
"Expected to not be able to use requireStatus() without passing any
statuses");
// now use require same with the current status, NEW passed in
- status =
- new FateMutatorImpl<>(context, table,
fateId).requireStatus(ReadOnlyFateStore.TStatus.NEW)
- .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate();
+ status = new FateMutatorImpl<>(context, table, fateId, writer)
+ .requireStatus(ReadOnlyFateStore.TStatus.NEW)
+ .putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate();
assertEquals(ACCEPTED, status);
// use require same with an array of statuses, none of which are the
current status
// (SUBMITTED)
assertThrows(IllegalStateException.class,
- () -> new FateMutatorImpl<>(context, table, fateId)
+ () -> new FateMutatorImpl<>(context, table, fateId, writer)
.requireStatus(ReadOnlyFateStore.TStatus.NEW,
ReadOnlyFateStore.TStatus.UNKNOWN)
.putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).mutate(),
"Expected to not be able to use requireStatus() with statuses that
do not match the current status");
- status = new FateMutatorImpl<>(context, table, fateId)
+ status = new FateMutatorImpl<>(context, table, fateId, writer)
.requireStatus(ReadOnlyFateStore.TStatus.NEW,
ReadOnlyFateStore.TStatus.UNKNOWN)
.putStatus(ReadOnlyFateStore.TStatus.SUBMITTED).tryMutate();
assertEquals(REJECTED, status,
"Expected to not be able to use requireStatus() with statuses that
do not match the current status");
- // use require same with an array of statuses, one of which is the
current status (SUBMITTED)
- status = new FateMutatorImpl<>(context, table, fateId)
+ // use require same with an array of statuses, one of which is the
current status
+ // (SUBMITTED)
+ status = new FateMutatorImpl<>(context, table, fateId, writer)
.requireStatus(ReadOnlyFateStore.TStatus.UNKNOWN,
ReadOnlyFateStore.TStatus.SUBMITTED)
.putStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS).tryMutate();
assertEquals(ACCEPTED, status);
// one more time check that we can use require same with the current
status (IN_PROGRESS)
- status = new FateMutatorImpl<>(context, table, fateId)
+ status = new FateMutatorImpl<>(context, table, fateId, writer)
.requireStatus(ReadOnlyFateStore.TStatus.IN_PROGRESS)
.putStatus(ReadOnlyFateStore.TStatus.FAILED_IN_PROGRESS).tryMutate();
assertEquals(ACCEPTED, status);
-
}
-
}
@Test
@@ -183,29 +204,33 @@ public class FateMutatorImplIT_SimpleSuite extends
SharedMiniClusterBase {
var reservation = FateReservation.from(lockID, UUID.randomUUID());
var wrongReservation = FateReservation.from(lockID, UUID.randomUUID());
+ Supplier<ConditionalWriter> writer = createWriterSupplier(client, table);
+
// Ensure that reserving is the only thing we can do
- var status =
- new FateMutatorImpl<>(context, table,
fateId).putUnreserveTx(reservation).tryMutate();
+ var status = new FateMutatorImpl<>(context, table, fateId,
writer).putUnreserveTx(reservation)
+ .tryMutate();
assertEquals(REJECTED, status);
- status = new FateMutatorImpl<>(context, table,
fateId).putReservedTx(reservation).tryMutate();
+ status = new FateMutatorImpl<>(context, table, fateId,
writer).putReservedTx(reservation)
+ .tryMutate();
assertEquals(ACCEPTED, status);
// Should not be able to reserve when it is already reserved
- status =
- new FateMutatorImpl<>(context, table,
fateId).putReservedTx(wrongReservation).tryMutate();
+ status = new FateMutatorImpl<>(context, table, fateId,
writer).putReservedTx(wrongReservation)
+ .tryMutate();
assertEquals(REJECTED, status);
- status = new FateMutatorImpl<>(context, table,
fateId).putReservedTx(reservation).tryMutate();
+ status = new FateMutatorImpl<>(context, table, fateId,
writer).putReservedTx(reservation)
+ .tryMutate();
assertEquals(REJECTED, status);
// Should be able to unreserve
- status = new FateMutatorImpl<>(context, table,
fateId).putUnreserveTx(wrongReservation)
- .tryMutate();
+ status = new FateMutatorImpl<>(context, table, fateId, writer)
+ .putUnreserveTx(wrongReservation).tryMutate();
assertEquals(REJECTED, status);
- status =
- new FateMutatorImpl<>(context, table,
fateId).putUnreserveTx(reservation).tryMutate();
+ status = new FateMutatorImpl<>(context, table, fateId,
writer).putUnreserveTx(reservation)
+ .tryMutate();
assertEquals(ACCEPTED, status);
- status =
- new FateMutatorImpl<>(context, table,
fateId).putUnreserveTx(reservation).tryMutate();
+ status = new FateMutatorImpl<>(context, table, fateId,
writer).putUnreserveTx(reservation)
+ .tryMutate();
assertEquals(REJECTED, status);
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT_SimpleSuite.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT_SimpleSuite.java
index 9e9387d6a9..264096e06d 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT_SimpleSuite.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateExecutionOrderIT_SimpleSuite.java
@@ -24,6 +24,7 @@ import static
org.apache.accumulo.test.fate.TestLock.createDummyLockID;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.test.fate.FateExecutionOrderITBase;
@@ -32,11 +33,11 @@ public class UserFateExecutionOrderIT_SimpleSuite extends
FateExecutionOrderITBa
public void executeTest(FateTestExecutor<FeoTestEnv> testMethod, int
maxDeferred,
AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
var table = getUniqueNames(1)[0];
- try (ClientContext client =
- (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+ try (ClientContext client = (ClientContext)
Accumulo.newClient().from(getClientProps()).build();
+ FateStore<FeoTestEnv> fs = new UserFateStore<>(client, table,
createDummyLockID(), null,
+ maxDeferred, fateIdGenerator)) {
createFateTable(client, table);
- testMethod.execute(new UserFateStore<>(client, table,
createDummyLockID(), null, maxDeferred,
- fateIdGenerator), getCluster().getServerContext());
+ testMethod.execute(fs, getCluster().getServerContext());
client.tableOperations().delete(table);
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT_SimpleSuite.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT_SimpleSuite.java
index cfed04bea9..0719768683 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT_SimpleSuite.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateIT_SimpleSuite.java
@@ -34,6 +34,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.fate.user.schema.FateSchema;
@@ -65,11 +66,11 @@ public class UserFateIT_SimpleSuite extends FateITBase {
public void executeTest(FateTestExecutor<TestEnv> testMethod, int
maxDeferred,
FateIdGenerator fateIdGenerator) throws Exception {
table = getUniqueNames(1)[0];
- try (ClientContext client =
- (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+ try (ClientContext client = (ClientContext)
Accumulo.newClient().from(getClientProps()).build();
+ FateStore<TestEnv> fs = new UserFateStore<>(client, table,
createDummyLockID(), null,
+ maxDeferred, fateIdGenerator)) {
createFateTable(client, table);
- testMethod.execute(new UserFateStore<>(client, table,
createDummyLockID(), null, maxDeferred,
- fateIdGenerator), getCluster().getServerContext());
+ testMethod.execute(fs, getCluster().getServerContext());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java
index fca55dc04c..7d84a72f8a 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateOpsCommandsIT.java
@@ -40,9 +40,11 @@ public class UserFateOpsCommandsIT extends
FateOpsCommandsITBase {
public void executeTest(FateTestExecutor<LatchTestEnv> testMethod, int
maxDeferred,
AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
var context = getCluster().getServerContext();
- // the test should not be reserving or checking reservations, so null
lockID and isLockHeld
- testMethod.execute(new UserFateStore<>(context,
SystemTables.FATE.tableName(), null, null),
- context);
+ try (FateStore<LatchTestEnv> fs =
+ new UserFateStore<>(context, SystemTables.FATE.tableName(), null,
null)) {
+ // the test should not be reserving or checking reservations, so null
lockID and isLockHeld
+ testMethod.execute(fs, context);
+ }
}
/**
@@ -61,8 +63,10 @@ public class UserFateOpsCommandsIT extends
FateOpsCommandsITBase {
ZooUtil.LockID lockID = testLock.getLockID();
Predicate<ZooUtil.LockID> isLockHeld =
lock -> ServiceLock.isLockHeld(context.getZooCache(), lock);
- testMethod.execute(
- new UserFateStore<>(context, SystemTables.FATE.tableName(), lockID,
isLockHeld), context);
+ try (FateStore<LatchTestEnv> fs =
+ new UserFateStore<>(context, SystemTables.FATE.tableName(), lockID,
isLockHeld)) {
+ testMethod.execute(fs, context);
+ }
} finally {
if (testLock != null) {
testLock.unlock();
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolsWatcherIT_SimpleSuite.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolsWatcherIT_SimpleSuite.java
index 5c3030a6df..5c4d05d516 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolsWatcherIT_SimpleSuite.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFatePoolsWatcherIT_SimpleSuite.java
@@ -24,6 +24,7 @@ import static
org.apache.accumulo.test.fate.TestLock.createDummyLockID;
import org.apache.accumulo.core.client.Accumulo;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.test.fate.FatePoolsWatcherITBase;
@@ -48,11 +49,11 @@ public class UserFatePoolsWatcherIT_SimpleSuite extends
FatePoolsWatcherITBase {
public void executeTest(FateTestExecutor<PoolResizeTestEnv> testMethod, int
maxDeferred,
AbstractFateStore.FateIdGenerator fateIdGenerator) throws Exception {
table = getUniqueNames(1)[0];
- try (ClientContext client =
- (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+ try (ClientContext client = (ClientContext)
Accumulo.newClient().from(getClientProps()).build();
+ FateStore<PoolResizeTestEnv> fs = new UserFateStore<>(client, table,
createDummyLockID(),
+ null, maxDeferred, fateIdGenerator)) {
createFateTable(client, table);
- testMethod.execute(new UserFateStore<>(client, table,
createDummyLockID(), null, maxDeferred,
- fateIdGenerator), getCluster().getServerContext());
+ testMethod.execute(fs, getCluster().getServerContext());
}
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT_SimpleSuite.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT_SimpleSuite.java
index 48dc90ffe1..971d336cfe 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT_SimpleSuite.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStatusEnforcementIT_SimpleSuite.java
@@ -57,6 +57,7 @@ public class UserFateStatusEnforcementIT_SimpleSuite extends
FateStatusEnforceme
@AfterEach
public void afterEachTeardown() {
+ store.close();
client.close();
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT_SimpleSuite.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT_SimpleSuite.java
index c1f902a5a7..5072f0176b 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT_SimpleSuite.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/UserFateStoreFateIT_SimpleSuite.java
@@ -30,6 +30,7 @@ import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.fate.AbstractFateStore.FateIdGenerator;
import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.fate.user.schema.FateSchema.TxColumnFamily;
import org.apache.accumulo.harness.SharedMiniClusterBase;
@@ -54,11 +55,11 @@ public class UserFateStoreFateIT_SimpleSuite extends
FateStoreITBase {
public void executeTest(FateTestExecutor<TestEnv> testMethod, int
maxDeferred,
FateIdGenerator fateIdGenerator) throws Exception {
String table = getUniqueNames(1)[0] + "fatestore";
- try (ClientContext client =
- (ClientContext) Accumulo.newClient().from(getClientProps()).build()) {
+ try (ClientContext client = (ClientContext)
Accumulo.newClient().from(getClientProps()).build();
+ FateStore<TestEnv> fs = new UserFateStore<>(client, table,
createDummyLockID(), null,
+ maxDeferred, fateIdGenerator)) {
createFateTable(client, table);
- testMethod.execute(new UserFateStore<>(client, table,
createDummyLockID(), null, maxDeferred,
- fateIdGenerator), getCluster().getServerContext());
+ testMethod.execute(fs, getCluster().getServerContext());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 66355ccd72..5b7993403b 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -256,8 +256,8 @@ public class FateConcurrencyIT extends
AccumuloClusterHarness {
try {
var zk = context.getZooSession();
- MetaFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null,
null);
- UserFateStore<String> readOnlyUFS =
+ ReadOnlyFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null,
null);
+ ReadOnlyFateStore<String> readOnlyUFS =
new UserFateStore<>(context, SystemTables.FATE.tableName(), null,
null);
var lockPath = context.getServerPaths().createTableLocksPath(tableId);
Map<FateInstanceType,ReadOnlyFateStore<String>> readOnlyFateStores =
@@ -348,7 +348,7 @@ public class FateConcurrencyIT extends
AccumuloClusterHarness {
log.trace("tid: {}", tableId);
var zk = context.getZooSession();
- MetaFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null, null);
+ ReadOnlyFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null,
null);
var lockPath = context.getServerPaths().createTableLocksPath(tableId);
AdminUtil.FateStatus fateStatus =
admin.getStatus(readOnlyMFS, zk, lockPath, null, null, null);
@@ -378,7 +378,7 @@ public class FateConcurrencyIT extends
AccumuloClusterHarness {
log.trace("tid: {}", tableId);
- UserFateStore<String> readOnlyUFS =
+ ReadOnlyFateStore<String> readOnlyUFS =
new UserFateStore<>(context, SystemTables.FATE.tableName(), null,
null);
AdminUtil.FateStatus fateStatus = admin.getStatus(readOnlyUFS, null,
null, null);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 2618b6c2f8..9f7e2df223 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -231,9 +231,9 @@ public class FunctionalTestUtils {
AdminUtil<String> admin = new AdminUtil<>();
ServerContext context = cluster.getServerContext();
var zk = context.getZooSession();
- MetaFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null, null);
- UserFateStore<String> readOnlyUFS =
+ ReadOnlyFateStore<String> readOnlyUFS =
new UserFateStore<>(context, SystemTables.FATE.tableName(), null,
null);
+ ReadOnlyFateStore<String> readOnlyMFS = new MetaFateStore<>(zk, null,
null);
Map<FateInstanceType,ReadOnlyFateStore<String>> readOnlyFateStores =
Map.of(FateInstanceType.META, readOnlyMFS, FateInstanceType.USER,
readOnlyUFS);
var lockPath = context.getServerPaths().createTableLocksPath();