This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 0ff1b21f2e Adds FateOperation type (#5218)
0ff1b21f2e is described below
commit 0ff1b21f2e79f9a82347485739258b93a9bdc259
Author: Kevin Rathbun <[email protected]>
AuthorDate: Tue Jan 7 10:10:12 2025 -0500
Adds FateOperation type (#5218)
Adds `FateOperation` enum. This consolidates all fate operations under one
type (those used in thrift and those passed directly to a Fate object (outside
of thrift)). This avoids the use of String here.
- Adds a `FateOperation` enum which includes all current fate operations
- Renamed existing thrift type `FateOperation` to `TFateOperation`
- `FateOperation` includes all `TFateOperation`s and fate operations
performed outside of thrift (one example is `COMMIT_COMPACTION`)
- `FateOperation` is now the type passed around instead of a String
- Removed OBSOLETE_TABLE_BULK_IMPORT from `TFateOperation` since it is no
longer used (was bulk import v1). Keep enum integers stable to avoid confusion
across different versions of Accumulo
- New `FateOperationTest` to test `TFateOperation` and `FateOperation`
---
.../core/clientImpl/NamespaceOperationsImpl.java | 10 ++--
.../core/clientImpl/TableOperationsImpl.java | 44 ++++++++--------
.../org/apache/accumulo/core/fate/AdminUtil.java | 11 ++--
.../java/org/apache/accumulo/core/fate/Fate.java | 55 ++++++++++++++++++--
.../org/apache/accumulo/core/fate/FateStore.java | 9 ++--
.../accumulo/core/fate/user/UserFateStore.java | 8 +--
.../core/fate/zookeeper/MetaFateStore.java | 7 +--
.../apache/accumulo/core/logging/FateLogger.java | 6 +--
.../accumulo/core/manager/thrift/FateService.java | 38 +++++++-------
.../{FateOperation.java => TFateOperation.java} | 9 ++--
core/src/main/thrift/manager.thrift | 42 ++++++++--------
.../accumulo/core/fate/FateOperationTest.java | 48 ++++++++++++++++++
.../org/apache/accumulo/core/fate/TestStore.java | 6 +--
.../server/security/AuditedSecurityOperation.java | 8 +--
.../server/security/SecurityOperation.java | 4 +-
.../server/util/fateCommand/FateSummaryReport.java | 6 ++-
.../server/util/fateCommand/FateTxnDetails.java | 2 +-
.../server/util/fateCommand/TxnDetailsTest.java | 7 +--
.../accumulo/manager/FateServiceHandler.java | 58 ++++++++++++----------
.../manager/ManagerClientServiceHandler.java | 2 +-
.../coordinator/CompactionCoordinator.java | 4 +-
.../manager/metrics/fate/FateMetricValues.java | 9 ++--
.../accumulo/manager/split/SeedSplitTask.java | 5 +-
.../test/compaction/ExternalCompaction_1_IT.java | 3 +-
.../java/org/apache/accumulo/test/fate/FateIT.java | 21 ++++----
.../org/apache/accumulo/test/fate/FateStoreIT.java | 32 ++++++------
.../apache/accumulo/test/fate/FateStoreUtil.java | 8 ++-
.../accumulo/test/fate/MultipleStoresIT.java | 7 +--
.../test/functional/FateConcurrencyIT.java | 5 +-
29 files changed, 297 insertions(+), 177 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java
index 378147bea6..2e80915fb8 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/NamespaceOperationsImpl.java
@@ -56,7 +56,7 @@ import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.constraints.Constraint;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
-import org.apache.accumulo.core.manager.thrift.FateOperation;
+import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.LocalityGroupUtil;
@@ -126,7 +126,7 @@ public class NamespaceOperationsImpl extends
NamespaceOperationsHelper {
NEW_NAMESPACE_NAME.validate(namespace);
try {
- doNamespaceFateOperation(FateOperation.NAMESPACE_CREATE,
+ doNamespaceFateOperation(TFateOperation.NAMESPACE_CREATE,
Arrays.asList(ByteBuffer.wrap(namespace.getBytes(UTF_8))),
Collections.emptyMap(),
namespace);
} catch (NamespaceNotFoundException e) {
@@ -156,7 +156,7 @@ public class NamespaceOperationsImpl extends
NamespaceOperationsHelper {
Map<String,String> opts = new HashMap<>();
try {
- doNamespaceFateOperation(FateOperation.NAMESPACE_DELETE, args, opts,
namespace);
+ doNamespaceFateOperation(TFateOperation.NAMESPACE_DELETE, args, opts,
namespace);
} catch (NamespaceExistsException e) {
// should not happen
throw new AssertionError(e);
@@ -174,7 +174,7 @@ public class NamespaceOperationsImpl extends
NamespaceOperationsHelper {
List<ByteBuffer> args =
Arrays.asList(ByteBuffer.wrap(oldNamespaceName.getBytes(UTF_8)),
ByteBuffer.wrap(newNamespaceName.getBytes(UTF_8)));
Map<String,String> opts = new HashMap<>();
- doNamespaceFateOperation(FateOperation.NAMESPACE_RENAME, args, opts,
oldNamespaceName);
+ doNamespaceFateOperation(TFateOperation.NAMESPACE_RENAME, args, opts,
oldNamespaceName);
}
@Override
@@ -385,7 +385,7 @@ public class NamespaceOperationsImpl extends
NamespaceOperationsHelper {
return super.addConstraint(namespace, constraintClassName);
}
- private String doNamespaceFateOperation(FateOperation op, List<ByteBuffer>
args,
+ private String doNamespaceFateOperation(TFateOperation op, List<ByteBuffer>
args,
Map<String,String> opts, String namespace) throws
AccumuloSecurityException,
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
// caller should validate the namespace name
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 36de59f9e8..0b1ff70851 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -140,11 +140,11 @@ import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.iterators.IteratorUtil.IteratorScope;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.manager.state.tables.TableState;
-import org.apache.accumulo.core.manager.thrift.FateOperation;
import org.apache.accumulo.core.manager.thrift.FateService;
import org.apache.accumulo.core.manager.thrift.ManagerClientService;
import org.apache.accumulo.core.manager.thrift.TFateId;
import org.apache.accumulo.core.manager.thrift.TFateInstanceType;
+import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.TabletState;
@@ -274,7 +274,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
Map<String,String> opts = ntc.getProperties();
try {
- doTableFateOperation(tableName, AccumuloException.class,
FateOperation.TABLE_CREATE, args,
+ doTableFateOperation(tableName, AccumuloException.class,
TFateOperation.TABLE_CREATE, args,
opts);
} catch (TableNotFoundException e) {
// should not happen
@@ -304,7 +304,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
// This method is for retrying in the case of network failures;
// anything else it passes to the caller to deal with
- private void executeFateOperation(TFateId opid, FateOperation op,
List<ByteBuffer> args,
+ private void executeFateOperation(TFateId opid, TFateOperation op,
List<ByteBuffer> args,
Map<String,String> opts, boolean autoCleanUp)
throws ThriftSecurityException, TException,
ThriftTableOperationException {
while (true) {
@@ -372,7 +372,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
EXISTING_TABLE_NAME.validate(tableName);
try {
- return doFateOperation(FateOperation.TABLE_BULK_IMPORT2, args,
Collections.emptyMap(),
+ return doFateOperation(TFateOperation.TABLE_BULK_IMPORT2, args,
Collections.emptyMap(),
tableName);
} catch (TableExistsException | NamespaceExistsException e) {
// should not happen
@@ -427,14 +427,14 @@ public class TableOperationsImpl extends
TableOperationsHelper {
}
}
- String doFateOperation(FateOperation op, List<ByteBuffer> args,
Map<String,String> opts,
+ String doFateOperation(TFateOperation op, List<ByteBuffer> args,
Map<String,String> opts,
String tableOrNamespaceName)
throws AccumuloSecurityException, TableExistsException,
TableNotFoundException,
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
return doFateOperation(op, args, opts, tableOrNamespaceName, true);
}
- String doFateOperation(FateOperation op, List<ByteBuffer> args,
Map<String,String> opts,
+ String doFateOperation(TFateOperation op, List<ByteBuffer> args,
Map<String,String> opts,
String tableOrNamespaceName, boolean wait)
throws AccumuloSecurityException, TableExistsException,
TableNotFoundException,
AccumuloException, NamespaceExistsException, NamespaceNotFoundException {
@@ -521,7 +521,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
return handleFateOperation(() -> {
TFateInstanceType t =
FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
TFateId opid = beginFateOperation(t);
- executeFateOperation(opid, FateOperation.TABLE_SPLIT, args,
Map.of(), false);
+ executeFateOperation(opid, TFateOperation.TABLE_SPLIT, args,
Map.of(), false);
return new Pair<>(opid, splitsForTablet.getValue());
}, tableName);
} catch (TableExistsException | NamespaceExistsException |
NamespaceNotFoundException
@@ -645,8 +645,8 @@ public class TableOperationsImpl extends
TableOperationsHelper {
end == null ? EMPTY : TextUtil.getByteBuffer(end));
Map<String,String> opts = new HashMap<>();
try {
- doTableFateOperation(tableName, TableNotFoundException.class,
FateOperation.TABLE_MERGE, args,
- opts);
+ doTableFateOperation(tableName, TableNotFoundException.class,
TFateOperation.TABLE_MERGE,
+ args, opts);
} catch (TableExistsException e) {
// should not happen
throw new AssertionError(e);
@@ -665,7 +665,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
Map<String,String> opts = new HashMap<>();
try {
doTableFateOperation(tableName, TableNotFoundException.class,
- FateOperation.TABLE_DELETE_RANGE, args, opts);
+ TFateOperation.TABLE_DELETE_RANGE, args, opts);
} catch (TableExistsException e) {
// should not happen
throw new AssertionError(e);
@@ -760,7 +760,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
List<ByteBuffer> args =
List.of(ByteBuffer.wrap(tableName.getBytes(UTF_8)));
Map<String,String> opts = new HashMap<>();
try {
- doTableFateOperation(tableName, TableNotFoundException.class,
FateOperation.TABLE_DELETE,
+ doTableFateOperation(tableName, TableNotFoundException.class,
TFateOperation.TABLE_DELETE,
args, opts);
} catch (TableExistsException e) {
// should not happen
@@ -800,7 +800,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
prependPropertiesToExclude(opts, config.getPropertiesToExclude());
- doTableFateOperation(newTableName, AccumuloException.class,
FateOperation.TABLE_CLONE, args,
+ doTableFateOperation(newTableName, AccumuloException.class,
TFateOperation.TABLE_CLONE, args,
opts);
}
@@ -813,7 +813,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
List<ByteBuffer> args =
Arrays.asList(ByteBuffer.wrap(oldTableName.getBytes(UTF_8)),
ByteBuffer.wrap(newTableName.getBytes(UTF_8)));
Map<String,String> opts = new HashMap<>();
- doTableFateOperation(oldTableName, TableNotFoundException.class,
FateOperation.TABLE_RENAME,
+ doTableFateOperation(oldTableName, TableNotFoundException.class,
TFateOperation.TABLE_RENAME,
args, opts);
}
@@ -892,7 +892,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
Map<String,String> opts = new HashMap<>();
try {
- doFateOperation(FateOperation.TABLE_COMPACT, args, opts, tableName,
config.getWait());
+ doFateOperation(TFateOperation.TABLE_COMPACT, args, opts, tableName,
config.getWait());
} catch (TableExistsException | NamespaceExistsException e) {
// should not happen
throw new AssertionError(e);
@@ -912,7 +912,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
try {
doTableFateOperation(tableName, TableNotFoundException.class,
- FateOperation.TABLE_CANCEL_COMPACT, args, opts);
+ TFateOperation.TABLE_CANCEL_COMPACT, args, opts);
} catch (TableExistsException e) {
// should not happen
throw new AssertionError(e);
@@ -1455,17 +1455,17 @@ public class TableOperationsImpl extends
TableOperationsHelper {
TableId tableId = context.getTableId(tableName);
- FateOperation op = null;
+ TFateOperation op = null;
switch (newState) {
case OFFLINE:
- op = FateOperation.TABLE_OFFLINE;
+ op = TFateOperation.TABLE_OFFLINE;
if (tableName.equals(AccumuloTable.METADATA.tableName())
|| tableName.equals(AccumuloTable.ROOT.tableName())) {
throw new AccumuloException("Cannot set table to offline state");
}
break;
case ONLINE:
- op = FateOperation.TABLE_ONLINE;
+ op = TFateOperation.TABLE_ONLINE;
if (tableName.equals(AccumuloTable.METADATA.tableName())
|| tableName.equals(AccumuloTable.ROOT.tableName())) {
// Don't submit a Fate operation for this, these tables can only be
online.
@@ -1694,7 +1694,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
checkedImportDirs.stream().map(s ->
s.getBytes(UTF_8)).map(ByteBuffer::wrap).forEach(args::add);
try {
- doTableFateOperation(tableName, AccumuloException.class,
FateOperation.TABLE_IMPORT, args,
+ doTableFateOperation(tableName, AccumuloException.class,
TFateOperation.TABLE_IMPORT, args,
Collections.emptyMap());
} catch (TableNotFoundException e) {
// should not happen
@@ -1727,7 +1727,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
Map<String,String> opts = Collections.emptyMap();
try {
- doTableFateOperation(tableName, TableNotFoundException.class,
FateOperation.TABLE_EXPORT,
+ doTableFateOperation(tableName, TableNotFoundException.class,
TFateOperation.TABLE_EXPORT,
args, opts);
} catch (TableExistsException e) {
// should not happen
@@ -1782,7 +1782,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
}
private void doTableFateOperation(String tableOrNamespaceName,
- Class<? extends Exception> namespaceNotFoundExceptionClass,
FateOperation op,
+ Class<? extends Exception> namespaceNotFoundExceptionClass,
TFateOperation op,
List<ByteBuffer> args, Map<String,String> opts) throws
AccumuloSecurityException,
AccumuloException, TableExistsException, TableNotFoundException {
try {
@@ -2212,7 +2212,7 @@ public class TableOperationsImpl extends
TableOperationsHelper {
try {
doTableFateOperation(tableName, AccumuloException.class,
- FateOperation.TABLE_TABLET_AVAILABILITY, args, opts);
+ TFateOperation.TABLE_TABLET_AVAILABILITY, args, opts);
} catch (TableNotFoundException | TableExistsException e) {
// should not happen
throw new AssertionError(e);
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
index 0fe448d7d5..384099a832 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
@@ -76,14 +76,15 @@ public class AdminUtil<T> {
private final FateId fateId;
private final FateInstanceType instanceType;
private final TStatus status;
- private final String txName;
+ private final Fate.FateOperation txName;
private final List<String> hlocks;
private final List<String> wlocks;
private final String top;
private final long timeCreated;
private TransactionStatus(FateId fateId, FateInstanceType instanceType,
TStatus status,
- String txName, List<String> hlocks, List<String> wlocks, String top,
Long timeCreated) {
+ Fate.FateOperation txName, List<String> hlocks, List<String> wlocks,
String top,
+ Long timeCreated) {
this.fateId = fateId;
this.instanceType = instanceType;
@@ -115,7 +116,7 @@ public class AdminUtil<T> {
/**
* @return The name of the transaction running.
*/
- public String getTxName() {
+ public Fate.FateOperation getTxName() {
return txName;
}
@@ -361,7 +362,9 @@ public class AdminUtil<T> {
fateIds.forEach(fateId -> {
ReadOnlyFateTxStore<T> txStore = store.read(fateId);
- String txName = (String)
txStore.getTransactionInfo(Fate.TxInfo.TX_NAME);
+ // tx name will not be set if the tx is not seeded with work (it is
NEW)
+ Fate.FateOperation txName =
txStore.getTransactionInfo(Fate.TxInfo.TX_NAME) == null ? null
+ : ((Fate.FateOperation)
txStore.getTransactionInfo(Fate.TxInfo.TX_NAME));
List<String> hlocks = heldLocks.remove(fateId);
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index a45cb3a405..de6e7073ec 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -54,6 +54,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.logging.FateLogger;
+import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.util.ShutdownUtil;
import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.UtilWaitThread;
@@ -88,6 +89,53 @@ public class Fate<T> {
TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
}
+ public enum FateOperation {
+ COMMIT_COMPACTION(null),
+ NAMESPACE_CREATE(TFateOperation.NAMESPACE_CREATE),
+ NAMESPACE_DELETE(TFateOperation.NAMESPACE_DELETE),
+ NAMESPACE_RENAME(TFateOperation.NAMESPACE_RENAME),
+ SHUTDOWN_TSERVER(null),
+ SYSTEM_SPLIT(null),
+ TABLE_BULK_IMPORT2(TFateOperation.TABLE_BULK_IMPORT2),
+ TABLE_CANCEL_COMPACT(TFateOperation.TABLE_CANCEL_COMPACT),
+ TABLE_CLONE(TFateOperation.TABLE_CLONE),
+ TABLE_COMPACT(TFateOperation.TABLE_COMPACT),
+ TABLE_CREATE(TFateOperation.TABLE_CREATE),
+ TABLE_DELETE(TFateOperation.TABLE_DELETE),
+ TABLE_DELETE_RANGE(TFateOperation.TABLE_DELETE_RANGE),
+ TABLE_EXPORT(TFateOperation.TABLE_EXPORT),
+ TABLE_IMPORT(TFateOperation.TABLE_IMPORT),
+ TABLE_MERGE(TFateOperation.TABLE_MERGE),
+ TABLE_OFFLINE(TFateOperation.TABLE_OFFLINE),
+ TABLE_ONLINE(TFateOperation.TABLE_ONLINE),
+ TABLE_RENAME(TFateOperation.TABLE_RENAME),
+ TABLE_SPLIT(TFateOperation.TABLE_SPLIT),
+ TABLE_TABLET_AVAILABILITY(TFateOperation.TABLE_TABLET_AVAILABILITY);
+
+ private final TFateOperation top;
+ private static final EnumSet<FateOperation> nonThriftOps =
+ EnumSet.of(COMMIT_COMPACTION, SHUTDOWN_TSERVER, SYSTEM_SPLIT);
+
+ FateOperation(TFateOperation top) {
+ this.top = top;
+ }
+
+ public static FateOperation fromThrift(TFateOperation top) {
+ return FateOperation.valueOf(top.name());
+ }
+
+ public static EnumSet<FateOperation> getNonThriftOps() {
+ return nonThriftOps;
+ }
+
+ public TFateOperation toThrift() {
+ if (top == null) {
+ throw new IllegalStateException(this + " does not have an equivalent
thrift form");
+ }
+ return top;
+ }
+ }
+
/**
* A single thread that finds transactions to work on and queues them up. Do
not want each worker
* thread going to the store and looking for work as it would place more
load on the store.
@@ -437,14 +485,15 @@ public class Fate<T> {
return store.create();
}
- public void seedTransaction(String txName, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
+ public void seedTransaction(FateOperation txName, FateKey fateKey, Repo<T>
repo,
+ boolean autoCleanUp) {
store.seedTransaction(txName, fateKey, repo, autoCleanUp);
}
// start work in the transaction.. it is safe to call this
// multiple times for a transaction... but it will only seed once
- public void seedTransaction(String txName, FateId fateId, Repo<T> repo,
boolean autoCleanUp,
- String goalMessage) {
+ public void seedTransaction(FateOperation txName, FateId fateId, Repo<T>
repo,
+ boolean autoCleanUp, String goalMessage) {
log.info("Seeding {} {}", fateId, goalMessage);
store.seedTransaction(txName, fateId, repo, autoCleanUp);
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index d434770461..160d872712 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -50,8 +50,8 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
FateId create();
/**
- * Seeds a transaction with the given repo if it does not exists. A fateId
will be derived from
- * the fateKey. If seeded, sets the following data for the fateId in the
store.
+ * Seeds a transaction with the given repo if it does not exist. A fateId
will be derived from the
+ * fateKey. If seeded, sets the following data for the fateId in the store.
*
* <ul>
* <li>Set the tx name</li>
@@ -66,7 +66,7 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
* empty optional otherwise. If there was a failure this could
return an empty optional
* when it actually succeeded.
*/
- Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T>
repo,
+ Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey,
Repo<T> repo,
boolean autoCleanUp);
/**
@@ -84,7 +84,8 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
* failures. When there are no failures returns true if seeded and
false otherwise. If
* there was a failure this could return false when it actually
succeeded.
*/
- boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean
autoCleanUp);
+ boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T>
repo,
+ boolean autoCleanUp);
/**
* An interface that allows read/write access to the data related to a
single fate operation.
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index 195848e276..3d7c039e0f 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -40,6 +40,7 @@ import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.fate.AbstractFateStore;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
@@ -126,7 +127,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
}
@Override
- public Optional<FateId> seedTransaction(String txName, FateKey fateKey,
Repo<T> repo,
+ public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey
fateKey, Repo<T> repo,
boolean autoCleanUp) {
final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
Supplier<FateMutator<T>> mutatorFactory = () ->
newMutator(fateId).requireAbsent()
@@ -139,14 +140,15 @@ public class UserFateStore<T> extends
AbstractFateStore<T> {
}
@Override
- public boolean seedTransaction(String txName, FateId fateId, Repo<T> repo,
boolean autoCleanUp) {
+ public boolean seedTransaction(Fate.FateOperation txName, FateId fateId,
Repo<T> repo,
+ boolean autoCleanUp) {
Supplier<FateMutator<T>> mutatorFactory =
() ->
newMutator(fateId).requireStatus(TStatus.NEW).requireUnreserved().requireAbsentKey();
return seedTransaction(mutatorFactory, fateId.canonical(), txName, repo,
autoCleanUp);
}
private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory,
String logId,
- String txName, Repo<T> repo, boolean autoCleanUp) {
+ Fate.FateOperation txName, Repo<T> repo, boolean autoCleanUp) {
int maxAttempts = 5;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
var mutator = mutatorFactory.get();
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
index 4a691417c6..e639ac5712 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
@@ -172,7 +172,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
}
@Override
- public Optional<FateId> seedTransaction(String txName, FateKey fateKey,
Repo<T> repo,
+ public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey
fateKey, Repo<T> repo,
boolean autoCleanUp) {
return createAndReserve(fateKey).map(txStore -> {
try {
@@ -185,7 +185,8 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
}
@Override
- public boolean seedTransaction(String txName, FateId fateId, Repo<T> repo,
boolean autoCleanUp) {
+ public boolean seedTransaction(Fate.FateOperation txName, FateId fateId,
Repo<T> repo,
+ boolean autoCleanUp) {
return tryReserve(fateId).map(txStore -> {
try {
if (txStore.getStatus() == NEW) {
@@ -199,7 +200,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
}).orElse(false);
}
- private void seedTransaction(String txName, Repo<T> repo, boolean
autoCleanUp,
+ private void seedTransaction(Fate.FateOperation txName, Repo<T> repo,
boolean autoCleanUp,
FateTxStore<T> txStore) {
if (txStore.top() == null) {
try {
diff --git
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index 9a5984f4ed..d25e87f15e 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -150,8 +150,8 @@ public class FateLogger {
}
@Override
- public Optional<FateId> seedTransaction(String txName, FateKey fateKey,
Repo<T> repo,
- boolean autoCleanUp) {
+ public Optional<FateId> seedTransaction(Fate.FateOperation txName,
FateKey fateKey,
+ Repo<T> repo, boolean autoCleanUp) {
var optional = store.seedTransaction(txName, fateKey, repo,
autoCleanUp);
if (storeLog.isTraceEnabled()) {
optional.ifPresentOrElse(fateId -> {
@@ -166,7 +166,7 @@ public class FateLogger {
}
@Override
- public boolean seedTransaction(String txName, FateId fateId, Repo<T>
repo,
+ public boolean seedTransaction(Fate.FateOperation txName, FateId fateId,
Repo<T> repo,
boolean autoCleanUp) {
boolean seeded = store.seedTransaction(txName, fateId, repo,
autoCleanUp);
if (storeLog.isTraceEnabled()) {
diff --git
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateService.java
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateService.java
index b38fb423bc..9fd6e7808f 100644
---
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateService.java
+++
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateService.java
@@ -31,7 +31,7 @@ public class FateService {
public TFateId
beginFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
TFateInstanceType type) throws
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException,
org.apache.thrift.TException;
- public void
executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId
opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean)
throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException,
org.apache.accumulo.cor [...]
+ public void
executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId
opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean)
throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException,
org.apache.accumulo.co [...]
public java.lang.String
waitForFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId
opid) throws
org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException,
org.apache.accumulo.core.clientImpl.thrift.ThriftNotActiveServiceException,
org.apache.thrift.TException;
@@ -45,7 +45,7 @@ public class FateService {
public void
beginFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
TFateInstanceType type, org.apache.thrift.async.AsyncMethodCallback<TFateId>
resultHandler) throws org.apache.thrift.TException;
- public void
executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId
opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws
org.apache.thrift.TException;
+ public void
executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId
opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws
org.apache.thrift.TException;
public void
waitForFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId
opid, org.apache.thrift.async.AsyncMethodCallback<java.lang.String>
resultHandler) throws org.apache.thrift.TException;
@@ -110,13 +110,13 @@ public class FateService {
}
@Override
- public void
executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId
opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean)
throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException,
org.apache.accumulo.cor [...]
+ public void
executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId
opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean)
throws org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException,
org.apache.accumulo.core.clientImpl.thrift.ThriftTableOperationException,
org.apache.accumulo.co [...]
{
send_executeFateOperation(tinfo, credentials, opid, op, arguments,
options, autoClean);
recv_executeFateOperation();
}
- public void
send_executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
TFateId opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean)
throws org.apache.thrift.TException
+ public void
send_executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
TFateId opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean)
throws org.apache.thrift.TException
{
executeFateOperation_args args = new executeFateOperation_args();
args.setTinfo(tinfo);
@@ -302,7 +302,7 @@ public class FateService {
}
@Override
- public void
executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId
opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws
org.apache.thrift.TException {
+ public void
executeFateOperation(org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials, TFateId
opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler) throws
org.apache.thrift.TException {
checkReady();
executeFateOperation_call method_call = new
executeFateOperation_call(tinfo, credentials, opid, op, arguments, options,
autoClean, resultHandler, this, ___protocolFactory, ___transport);
this.___currentMethod = method_call;
@@ -313,11 +313,11 @@ public class FateService {
private org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo;
private org.apache.accumulo.core.securityImpl.thrift.TCredentials
credentials;
private TFateId opid;
- private FateOperation op;
+ private TFateOperation op;
private java.util.List<java.nio.ByteBuffer> arguments;
private java.util.Map<java.lang.String,java.lang.String> options;
private boolean autoClean;
- public
executeFateOperation_call(org.apache.accumulo.core.clientImpl.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
TFateId opid, FateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler,
org.apache.thrift.async.TAsyncClient client,
org.apache.thrift.protocol.TProtocolFactory protocolFactory, [...]
+ public
executeFateOperation_call(org.apache.accumulo.core.clientImpl.thrift.TInfo
tinfo, org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
TFateId opid, TFateOperation op, java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options, boolean autoClean,
org.apache.thrift.async.AsyncMethodCallback<Void> resultHandler,
org.apache.thrift.async.TAsyncClient client,
org.apache.thrift.protocol.TProtocolFactory protocolFactory [...]
super(client, protocolFactory, transport, resultHandler, false);
this.tinfo = tinfo;
this.credentials = credentials;
@@ -2312,9 +2312,9 @@ public class FateService {
public @org.apache.thrift.annotation.Nullable TFateId opid; // required
/**
*
- * @see FateOperation
+ * @see TFateOperation
*/
- public @org.apache.thrift.annotation.Nullable FateOperation op; // required
+ public @org.apache.thrift.annotation.Nullable TFateOperation op; //
required
public @org.apache.thrift.annotation.Nullable
java.util.List<java.nio.ByteBuffer> arguments; // required
public @org.apache.thrift.annotation.Nullable
java.util.Map<java.lang.String,java.lang.String> options; // required
public boolean autoClean; // required
@@ -2326,7 +2326,7 @@ public class FateService {
OPID((short)3, "opid"),
/**
*
- * @see FateOperation
+ * @see TFateOperation
*/
OP((short)4, "op"),
ARGUMENTS((short)5, "arguments"),
@@ -2416,7 +2416,7 @@ public class FateService {
tmpMap.put(_Fields.OPID, new
org.apache.thrift.meta_data.FieldMetaData("opid",
org.apache.thrift.TFieldRequirementType.DEFAULT,
new
org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT,
TFateId.class)));
tmpMap.put(_Fields.OP, new
org.apache.thrift.meta_data.FieldMetaData("op",
org.apache.thrift.TFieldRequirementType.DEFAULT,
- new
org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM,
FateOperation.class)));
+ new
org.apache.thrift.meta_data.EnumMetaData(org.apache.thrift.protocol.TType.ENUM,
TFateOperation.class)));
tmpMap.put(_Fields.ARGUMENTS, new
org.apache.thrift.meta_data.FieldMetaData("arguments",
org.apache.thrift.TFieldRequirementType.DEFAULT,
new
org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new
org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING
, true))));
@@ -2437,7 +2437,7 @@ public class FateService {
org.apache.accumulo.core.clientImpl.thrift.TInfo tinfo,
org.apache.accumulo.core.securityImpl.thrift.TCredentials credentials,
TFateId opid,
- FateOperation op,
+ TFateOperation op,
java.util.List<java.nio.ByteBuffer> arguments,
java.util.Map<java.lang.String,java.lang.String> options,
boolean autoClean)
@@ -2575,18 +2575,18 @@ public class FateService {
/**
*
- * @see FateOperation
+ * @see TFateOperation
*/
@org.apache.thrift.annotation.Nullable
- public FateOperation getOp() {
+ public TFateOperation getOp() {
return this.op;
}
/**
*
- * @see FateOperation
+ * @see TFateOperation
*/
- public executeFateOperation_args
setOp(@org.apache.thrift.annotation.Nullable FateOperation op) {
+ public executeFateOperation_args
setOp(@org.apache.thrift.annotation.Nullable TFateOperation op) {
this.op = op;
return this;
}
@@ -2737,7 +2737,7 @@ public class FateService {
if (value == null) {
unsetOp();
} else {
- setOp((FateOperation)value);
+ setOp((TFateOperation)value);
}
break;
@@ -3173,7 +3173,7 @@ public class FateService {
break;
case 4: // OP
if (schemeField.type == org.apache.thrift.protocol.TType.I32) {
- struct.op =
org.apache.accumulo.core.manager.thrift.FateOperation.findByValue(iprot.readI32());
+ struct.op =
org.apache.accumulo.core.manager.thrift.TFateOperation.findByValue(iprot.readI32());
struct.setOpIsSet(true);
} else {
org.apache.thrift.protocol.TProtocolUtil.skip(iprot,
schemeField.type);
@@ -3386,7 +3386,7 @@ public class FateService {
struct.setOpidIsSet(true);
}
if (incoming.get(3)) {
- struct.op =
org.apache.accumulo.core.manager.thrift.FateOperation.findByValue(iprot.readI32());
+ struct.op =
org.apache.accumulo.core.manager.thrift.TFateOperation.findByValue(iprot.readI32());
struct.setOpIsSet(true);
}
if (incoming.get(4)) {
diff --git
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/TFateOperation.java
similarity index 91%
rename from
core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java
rename to
core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/TFateOperation.java
index d03349ede0..b9cd11ca85 100644
---
a/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/FateOperation.java
+++
b/core/src/main/thrift-gen-java/org/apache/accumulo/core/manager/thrift/TFateOperation.java
@@ -25,7 +25,7 @@
package org.apache.accumulo.core.manager.thrift;
-public enum FateOperation implements org.apache.thrift.TEnum {
+public enum TFateOperation implements org.apache.thrift.TEnum {
TABLE_CREATE(0),
TABLE_CLONE(1),
TABLE_DELETE(2),
@@ -34,7 +34,6 @@ public enum FateOperation implements org.apache.thrift.TEnum {
TABLE_OFFLINE(5),
TABLE_MERGE(6),
TABLE_DELETE_RANGE(7),
- OBSOLETE_TABLE_BULK_IMPORT(8),
TABLE_COMPACT(9),
TABLE_IMPORT(10),
TABLE_EXPORT(11),
@@ -48,7 +47,7 @@ public enum FateOperation implements org.apache.thrift.TEnum {
private final int value;
- private FateOperation(int value) {
+ private TFateOperation(int value) {
this.value = value;
}
@@ -65,7 +64,7 @@ public enum FateOperation implements org.apache.thrift.TEnum {
* @return null if the value is not found.
*/
@org.apache.thrift.annotation.Nullable
- public static FateOperation findByValue(int value) {
+ public static TFateOperation findByValue(int value) {
switch (value) {
case 0:
return TABLE_CREATE;
@@ -83,8 +82,6 @@ public enum FateOperation implements org.apache.thrift.TEnum {
return TABLE_MERGE;
case 7:
return TABLE_DELETE_RANGE;
- case 8:
- return OBSOLETE_TABLE_BULK_IMPORT;
case 9:
return TABLE_COMPACT;
case 10:
diff --git a/core/src/main/thrift/manager.thrift
b/core/src/main/thrift/manager.thrift
index 11e2f78353..08d93dc185 100644
--- a/core/src/main/thrift/manager.thrift
+++ b/core/src/main/thrift/manager.thrift
@@ -49,26 +49,26 @@ enum TabletLoadState {
UNLOAD_ERROR
}
-enum FateOperation {
- TABLE_CREATE
- TABLE_CLONE
- TABLE_DELETE
- TABLE_RENAME
- TABLE_ONLINE
- TABLE_OFFLINE
- TABLE_MERGE
- TABLE_DELETE_RANGE
- OBSOLETE_TABLE_BULK_IMPORT
- TABLE_COMPACT
- TABLE_IMPORT
- TABLE_EXPORT
- TABLE_CANCEL_COMPACT
- NAMESPACE_CREATE
- NAMESPACE_DELETE
- NAMESPACE_RENAME
- TABLE_BULK_IMPORT2
- TABLE_TABLET_AVAILABILITY
- TABLE_SPLIT
+enum TFateOperation {
+ TABLE_CREATE = 0
+ TABLE_CLONE = 1
+ TABLE_DELETE = 2
+ TABLE_RENAME = 3
+ TABLE_ONLINE = 4
+ TABLE_OFFLINE = 5
+ TABLE_MERGE = 6
+ TABLE_DELETE_RANGE = 7
+ // 8 was bulk v1 that was removed
+ TABLE_COMPACT = 9
+ TABLE_IMPORT = 10
+ TABLE_EXPORT = 11
+ TABLE_CANCEL_COMPACT = 12
+ NAMESPACE_CREATE = 13
+ NAMESPACE_DELETE = 14
+ NAMESPACE_RENAME = 15
+ TABLE_BULK_IMPORT2 = 16
+ TABLE_TABLET_AVAILABILITY = 17
+ TABLE_SPLIT = 18
}
enum ManagerState {
@@ -192,7 +192,7 @@ service FateService {
1:client.TInfo tinfo
2:security.TCredentials credentials
3:TFateId opid
- 4:FateOperation op
+ 4:TFateOperation op
5:list<binary> arguments
6:map<string, string> options
7:bool autoClean
diff --git
a/core/src/test/java/org/apache/accumulo/core/fate/FateOperationTest.java
b/core/src/test/java/org/apache/accumulo/core/fate/FateOperationTest.java
new file mode 100644
index 0000000000..fd34447703
--- /dev/null
+++ b/core/src/test/java/org/apache/accumulo/core/fate/FateOperationTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.apache.accumulo.core.manager.thrift.TFateOperation;
+import org.junit.jupiter.api.Test;
+
+public class FateOperationTest {
+
+ @Test
+ public void testFateOperation() {
+ // ensures that all TFateOperation have an equivalent FateOperation
+ assertTrue(TFateOperation.values().length > 0);
+ for (var top : TFateOperation.values()) {
+ assertEquals(top, Fate.FateOperation.fromThrift(top).toThrift());
+ }
+ // ensures that all FateOperation are valid: either specified not to have
an equivalent thrift
+ // form or do have an equivalent thrift form
+ assertTrue(Fate.FateOperation.values().length > 0);
+ for (var op : Fate.FateOperation.values()) {
+ if (Fate.FateOperation.getNonThriftOps().contains(op)) {
+ assertThrows(IllegalStateException.class, op::toThrift);
+ } else {
+ assertEquals(op, Fate.FateOperation.fromThrift(op.toThrift()));
+ }
+ }
+ }
+}
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 40d0d755b1..1d2389d6fb 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -53,13 +53,13 @@ public class TestStore implements FateStore<String> {
}
@Override
- public Optional<FateId> seedTransaction(String txName, FateKey fateKey,
Repo<String> repo,
- boolean autoCleanUp) {
+ public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey
fateKey,
+ Repo<String> repo, boolean autoCleanUp) {
return Optional.empty();
}
@Override
- public boolean seedTransaction(String txName, FateId fateId, Repo<String>
repo,
+ public boolean seedTransaction(Fate.FateOperation txName, FateId fateId,
Repo<String> repo,
boolean autoCleanUp) {
return false;
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
index a89fb39689..0d10b2025f 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/security/AuditedSecurityOperation.java
@@ -38,7 +38,7 @@ import org.apache.accumulo.core.dataImpl.thrift.IterInfo;
import org.apache.accumulo.core.dataImpl.thrift.TColumn;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
-import org.apache.accumulo.core.manager.thrift.FateOperation;
+import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.SystemPermission;
@@ -685,14 +685,14 @@ public class AuditedSecurityOperation extends
SecurityOperation {
"action: %s; targetTable: %s:%s";
@Override
- public boolean canChangeTableState(TCredentials credentials, TableId
tableId, FateOperation op,
+ public boolean canChangeTableState(TCredentials credentials, TableId
tableId, TFateOperation op,
NamespaceId namespaceId) throws ThriftSecurityException {
String tableName = getTableName(tableId);
String operation = null;
- if (op == FateOperation.TABLE_ONLINE) {
+ if (op == TFateOperation.TABLE_ONLINE) {
operation = "onlineTable";
}
- if (op == FateOperation.TABLE_OFFLINE) {
+ if (op == TFateOperation.TABLE_OFFLINE) {
operation = "offlineTable";
}
try {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
index e1fd176bae..493d4570a1 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/security/SecurityOperation.java
@@ -42,7 +42,7 @@ import org.apache.accumulo.core.dataImpl.thrift.TColumn;
import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
import org.apache.accumulo.core.fate.zookeeper.ZooCache;
-import org.apache.accumulo.core.manager.thrift.FateOperation;
+import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.NamespacePermission;
@@ -524,7 +524,7 @@ public class SecurityOperation {
|| hasTablePermission(c, tableId, namespaceId,
TablePermission.DROP_TABLE, false);
}
- public boolean canChangeTableState(TCredentials c, TableId tableId,
FateOperation op,
+ public boolean canChangeTableState(TCredentials c, TableId tableId,
TFateOperation op,
NamespaceId namespaceId) throws ThriftSecurityException {
authenticate(c);
return hasSystemPermissionWithNamespaceId(c, SystemPermission.SYSTEM,
namespaceId, false)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java
b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java
index f17695265a..13b252e693 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateSummaryReport.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.accumulo.core.fate.AdminUtil;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
@@ -87,8 +88,9 @@ public class FateSummaryReport {
}
String top = txnStatus.getTop();
stepCounts.merge(Objects.requireNonNullElse(top, "?"), 1, Integer::sum);
- String runningRepo = txnStatus.getTxName();
- cmdCounts.merge(Objects.requireNonNullElse(runningRepo, "?"), 1,
Integer::sum);
+ Fate.FateOperation runningRepo = txnStatus.getTxName();
+
+ cmdCounts.merge(runningRepo == null ? "?" : runningRepo.name(), 1,
Integer::sum);
// filter transactions if provided
if (!fateIdFilter.isEmpty() &&
!fateIdFilter.contains(txnStatus.getFateId().canonical())) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
index ddf5beac8f..8d1218e618 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/fateCommand/FateTxnDetails.java
@@ -74,7 +74,7 @@ public class FateTxnDetails implements
Comparable<FateTxnDetails> {
step = txnStatus.getTop();
}
if (txnStatus.getTxName() != null) {
- txName = txnStatus.getTxName();
+ txName = txnStatus.getTxName().name();
}
if (txnStatus.getFateId() != null) {
fateId = txnStatus.getFateId().canonical();
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java
b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java
index fb3d77a706..c8f2573bab 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/util/fateCommand/TxnDetailsTest.java
@@ -34,6 +34,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.fate.AdminUtil;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.junit.jupiter.api.Test;
@@ -57,7 +58,7 @@ class TxnDetailsTest {
expect(status1.getTimeCreated()).andReturn(now -
TimeUnit.DAYS.toMillis(1)).anyTimes();
expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes();
expect(status1.getTop()).andReturn("step1").anyTimes();
- expect(status1.getTxName()).andReturn("runningTx1").anyTimes();
+
expect(status1.getTxName()).andReturn(Fate.FateOperation.TABLE_CREATE).anyTimes();
expect(status1.getFateId()).andReturn(FateId.from("FATE:USER:" +
uuid1)).anyTimes();
expect(status1.getHeldLocks()).andReturn(List.of()).anyTimes();
expect(status1.getWaitingLocks()).andReturn(List.of()).anyTimes();
@@ -66,7 +67,7 @@ class TxnDetailsTest {
expect(status2.getTimeCreated()).andReturn(now -
TimeUnit.DAYS.toMillis(7)).anyTimes();
expect(status2.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes();
expect(status2.getTop()).andReturn("step2").anyTimes();
- expect(status2.getTxName()).andReturn("runningTx2").anyTimes();
+
expect(status2.getTxName()).andReturn(Fate.FateOperation.TABLE_DELETE).anyTimes();
expect(status2.getFateId()).andReturn(FateId.from("FATE:USER:" +
uuid2)).anyTimes();
expect(status2.getHeldLocks()).andReturn(List.of()).anyTimes();
expect(status2.getWaitingLocks()).andReturn(List.of()).anyTimes();
@@ -100,7 +101,7 @@ class TxnDetailsTest {
expect(status1.getTimeCreated()).andReturn(now -
TimeUnit.DAYS.toMillis(1)).anyTimes();
expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes();
expect(status1.getTop()).andReturn("step1").anyTimes();
- expect(status1.getTxName()).andReturn("runningTx").anyTimes();
+
expect(status1.getTxName()).andReturn(Fate.FateOperation.TABLE_COMPACT).anyTimes();
expect(status1.getFateId()).andReturn(FateId.from("FATE:USER:" +
UUID.randomUUID())).anyTimes();
// incomplete lock info (W unknown ns id, no table))
expect(status1.getHeldLocks()).andReturn(List.of("R:1", "R:2",
"W:a")).anyTimes();
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
index 32303ade9d..de1bae81a8 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/FateServiceHandler.java
@@ -69,15 +69,16 @@ import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.TRange;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.manager.thrift.BulkImportState;
-import org.apache.accumulo.core.manager.thrift.FateOperation;
import org.apache.accumulo.core.manager.thrift.FateService;
import org.apache.accumulo.core.manager.thrift.TFateId;
import org.apache.accumulo.core.manager.thrift.TFateInstanceType;
+import org.apache.accumulo.core.manager.thrift.TFateOperation;
import org.apache.accumulo.core.manager.thrift.ThriftPropertyException;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.util.ByteBufferUtil;
@@ -131,10 +132,11 @@ class FateServiceHandler implements FateService.Iface {
}
@Override
- public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid,
FateOperation op,
+ public void executeFateOperation(TInfo tinfo, TCredentials c, TFateId opid,
TFateOperation top,
List<ByteBuffer> arguments, Map<String,String> options, boolean
autoCleanup)
throws ThriftSecurityException, ThriftTableOperationException,
ThriftPropertyException {
authenticate(c);
+ Fate.FateOperation op = Fate.FateOperation.fromThrift(top);
String goalMessage = op.toString() + " ";
String txUUIDStr = opid.getTxUUIDStr();
FateInstanceType type = FateInstanceType.fromThrift(opid.getType());
@@ -151,7 +153,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Create " + namespace + " namespace.";
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new CreateNamespace(c.getPrincipal(), namespace,
options)), autoCleanup,
goalMessage);
break;
@@ -170,7 +172,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Rename " + oldName + " namespace to " + newName;
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new RenameNamespace(namespaceId, oldName,
newName)), autoCleanup,
goalMessage);
break;
@@ -188,7 +190,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Delete namespace Id: " + namespaceId;
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new DeleteNamespace(namespaceId)), autoCleanup,
goalMessage);
break;
}
@@ -251,7 +253,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += "Create table " + tableName + " " + initialTableState +
" with " + splitCount
+ " splits and initial tabletAvailability of " +
initialTabletAvailability;
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new CreateTable(c.getPrincipal(), tableName,
timeType, options,
splitsPath, splitCount, splitsDirsPath, initialTableState,
initialTabletAvailability, namespaceId)),
@@ -287,7 +289,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += "Rename table " + oldTableName + "(" + tableId + ") to
" + oldTableName;
try {
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new RenameTable(namespaceId, tableId,
oldTableName, newTableName)),
autoCleanup, goalMessage);
} catch (NamespaceNotFoundException e) {
@@ -359,8 +361,8 @@ class FateServiceHandler implements FateService.Iface {
}
manager.fate(type).seedTransaction(
- op.toString(), fateId, new TraceRepo<>(new
CloneTable(c.getPrincipal(), namespaceId,
- srcTableId, tableName, propertiesToSet, propertiesToExclude,
keepOffline)),
+ op, fateId, new TraceRepo<>(new CloneTable(c.getPrincipal(),
namespaceId, srcTableId,
+ tableName, propertiesToSet, propertiesToExclude, keepOffline)),
autoCleanup, goalMessage);
break;
@@ -388,7 +390,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Delete table " + tableName + "(" + tableId + ")";
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new PreDeleteTable(namespaceId, tableId)),
autoCleanup, goalMessage);
break;
}
@@ -400,7 +402,8 @@ class FateServiceHandler implements FateService.Iface {
final boolean canOnlineOfflineTable;
try {
- canOnlineOfflineTable = manager.security.canChangeTableState(c,
tableId, op, namespaceId);
+ canOnlineOfflineTable =
+ manager.security.canChangeTableState(c, tableId, top,
namespaceId);
} catch (ThriftSecurityException e) {
throwIfTableMissingSecurityException(e, tableId, null,
TableOperation.ONLINE);
throw e;
@@ -413,7 +416,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += "Online table " + tableId;
final EnumSet<TableState> expectedCurrStates =
EnumSet.of(TableState.ONLINE, TableState.OFFLINE);
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(
new ChangeTableState(namespaceId, tableId, tableOp,
expectedCurrStates)),
autoCleanup, goalMessage);
@@ -428,7 +431,8 @@ class FateServiceHandler implements FateService.Iface {
final boolean canOnlineOfflineTable;
try {
- canOnlineOfflineTable = manager.security.canChangeTableState(c,
tableId, op, namespaceId);
+ canOnlineOfflineTable =
+ manager.security.canChangeTableState(c, tableId, top,
namespaceId);
} catch (ThriftSecurityException e) {
throwIfTableMissingSecurityException(e, tableId, null,
TableOperation.OFFLINE);
throw e;
@@ -441,7 +445,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += "Offline table " + tableId;
final EnumSet<TableState> expectedCurrStates =
EnumSet.of(TableState.ONLINE, TableState.OFFLINE);
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(
new ChangeTableState(namespaceId, tableId, tableOp,
expectedCurrStates)),
autoCleanup, goalMessage);
@@ -477,7 +481,7 @@ class FateServiceHandler implements FateService.Iface {
startRowStr, endRowStr);
goalMessage += "Merge table " + tableName + "(" + tableId + ") splits
from " + startRowStr
+ " to " + endRowStr;
- manager.fate(type).seedTransaction(op.toString(), fateId, new
TraceRepo<>(
+ manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(
new TableRangeOp(MergeInfo.Operation.MERGE, namespaceId, tableId,
startRow, endRow)),
autoCleanup, goalMessage);
break;
@@ -509,7 +513,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage +=
"Delete table " + tableName + "(" + tableId + ") range " +
startRow + " to " + endRow;
- manager.fate(type).seedTransaction(op.toString(), fateId, new
TraceRepo<>(
+ manager.fate(type).seedTransaction(op, fateId, new TraceRepo<>(
new TableRangeOp(MergeInfo.Operation.DELETE, namespaceId, tableId,
startRow, endRow)),
autoCleanup, goalMessage);
break;
@@ -535,7 +539,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Compact table (" + tableId + ") with config " +
compactionConfig;
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new CompactRange(namespaceId, tableId,
compactionConfig)), autoCleanup,
goalMessage);
break;
@@ -559,7 +563,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Cancel compaction of table (" + tableId + ")";
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new CancelCompactions(namespaceId, tableId)),
autoCleanup, goalMessage);
break;
}
@@ -600,10 +604,10 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Import table with new name: " + tableName + " from " +
exportDirs;
- manager.fate(type).seedTransaction(op.toString(), fateId,
- new TraceRepo<>(new ImportTable(c.getPrincipal(), tableName,
exportDirs, namespaceId,
- keepMappings, keepOffline)),
- autoCleanup, goalMessage);
+ manager.fate(type)
+ .seedTransaction(op, fateId, new TraceRepo<>(new
ImportTable(c.getPrincipal(),
+ tableName, exportDirs, namespaceId, keepMappings,
keepOffline)), autoCleanup,
+ goalMessage);
break;
}
case TABLE_EXPORT: {
@@ -630,7 +634,7 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage += "Export table " + tableName + "(" + tableId + ") to " +
exportDir;
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new ExportTable(namespaceId, tableName, tableId,
exportDir)),
autoCleanup, goalMessage);
break;
@@ -667,7 +671,7 @@ class FateServiceHandler implements FateService.Iface {
manager.updateBulkImportStatus(dir, BulkImportState.INITIAL);
goalMessage += "Bulk import (v2) " + dir + " to " + tableName + "(" +
tableId + ")";
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new PrepBulkImport(tableId, dir, setTime)),
autoCleanup, goalMessage);
break;
}
@@ -710,7 +714,7 @@ class FateServiceHandler implements FateService.Iface {
goalMessage += "Set availability for table: " + tableName + "(" +
tableId + ") range: "
+ tRange + " to: " + tabletAvailability.name();
- manager.fate(type).seedTransaction(op.toString(), fateId,
+ manager.fate(type).seedTransaction(op, fateId,
new TraceRepo<>(new LockTable(tableId, namespaceId, tRange,
tabletAvailability)),
autoCleanup, goalMessage);
break;
@@ -781,8 +785,8 @@ class FateServiceHandler implements FateService.Iface {
}
goalMessage = "Splitting " + extent + " for user into " +
(splits.size() + 1) + " tablets";
- manager.fate(type).seedTransaction(op.toString(), fateId, new
PreSplit(extent, splits),
- autoCleanup, goalMessage);
+ manager.fate(type).seedTransaction(op, fateId, new PreSplit(extent,
splits), autoCleanup,
+ goalMessage);
break;
}
default:
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
index a669798e8b..63137097a2 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/ManagerClientServiceHandler.java
@@ -321,7 +321,7 @@ public class ManagerClientServiceHandler implements
ManagerClientService.Iface {
String msg = "Shutdown tserver " + tabletServer;
- fate.seedTransaction("ShutdownTServer", fateId,
+ fate.seedTransaction(Fate.FateOperation.SHUTDOWN_TSERVER, fateId,
new TraceRepo<>(
new ShutdownTServer(doomed,
manager.tserverSet.getResourceGroup(doomed), force)),
false, msg);
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index fce920e4cb..e3bec68c82 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -839,8 +839,8 @@ public class CompactionCoordinator
// Start a fate transaction to commit the compaction.
CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid);
var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid,
extent, ecm, stats));
- localFate.seedTransaction("COMMIT_COMPACTION",
FateKey.forCompactionCommit(ecid), renameOp,
- true);
+ localFate.seedTransaction(Fate.FateOperation.COMMIT_COMPACTION,
+ FateKey.forCompactionCommit(ecid), renameOp, true);
}
@Override
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java
index 19ba624306..473f1284a5 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/metrics/fate/FateMetricValues.java
@@ -25,6 +25,7 @@ import java.util.Map;
import java.util.TreeMap;
import org.apache.accumulo.core.fate.AdminUtil;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus;
@@ -101,11 +102,9 @@ public abstract class FateMetricValues {
// incr count for op type for for in_progress transactions.
if (ReadOnlyFateStore.TStatus.IN_PROGRESS.equals(tx.getStatus())) {
- String opType = tx.getTxName();
- if (opType == null || opType.isEmpty()) {
- opType = "UNKNOWN";
- }
- opTypeCounters.merge(opType, 1L, Long::sum);
+ Fate.FateOperation opType = tx.getTxName();
+ String opTypeStr = opType == null ? "UNKNOWN" : opType.name();
+ opTypeCounters.merge(opTypeStr, 1L, Long::sum);
}
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
index 7b56ea4388..8270bc423f 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.manager.split;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.manager.Manager;
@@ -41,8 +42,8 @@ public class SeedSplitTask implements Runnable {
public void run() {
try {
var fateInstanceType = FateInstanceType.fromTableId((extent.tableId()));
- manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT",
FateKey.forSplit(extent),
- new FindSplits(extent), true);
+
manager.fate(fateInstanceType).seedTransaction(Fate.FateOperation.SYSTEM_SPLIT,
+ FateKey.forSplit(extent), new FindSplits(extent), true);
} catch (Exception e) {
log.error("Failed to split {}", extent, e);
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index c8905cd850..3d6a0e3dba 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -74,6 +74,7 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
@@ -363,7 +364,7 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
// should never run. Its purpose is to prevent the dead compaction detector
// from deleting the id.
Repo<Manager> repo = new FakeRepo();
- var fateId = fateStore.seedTransaction("COMPACTION_COMMIT",
+ var fateId =
fateStore.seedTransaction(Fate.FateOperation.COMMIT_COMPACTION,
FateKey.forCompactionCommit(allCids.get(tableId).get(0)), repo,
true).orElseThrow();
// Read the tablet metadata
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index f493292368..e7b3e073c9 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -25,6 +25,7 @@ import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.IN_PROGRES
import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.UNKNOWN;
+import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -244,7 +245,7 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
FateId fateId = fate.startTransaction();
assertEquals(TStatus.NEW, getTxStatus(sctx, fateId));
- fate.seedTransaction("TestOperation", fateId, new
TestRepo("testTransactionStatus"), true,
+ fate.seedTransaction(TEST_FATE_OP, fateId, new
TestRepo("testTransactionStatus"), true,
"Test Op");
assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId));
// wait for call() to be called
@@ -306,7 +307,7 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
assertTrue(fate.cancel(fateId));
assertTrue(
FAILED_IN_PROGRESS == getTxStatus(sctx, fateId) || FAILED ==
getTxStatus(sctx, fateId));
- fate.seedTransaction("TestOperation", fateId, new
TestRepo("testCancelWhileNew"), true,
+ fate.seedTransaction(TEST_FATE_OP, fateId, new
TestRepo("testCancelWhileNew"), true,
"Test Op");
Wait.waitFor(() -> FAILED == getTxStatus(sctx, fateId));
// nothing should have run
@@ -337,8 +338,8 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
FateId fateId = fate.startTransaction();
LOG.debug("Starting test testCancelWhileSubmitted with {}", fateId);
assertEquals(NEW, getTxStatus(sctx, fateId));
- fate.seedTransaction("TestOperation", fateId,
- new TestRepo("testCancelWhileSubmittedAndRunning"), false, "Test
Op");
+ fate.seedTransaction(TEST_FATE_OP, fateId, new
TestRepo("testCancelWhileSubmittedAndRunning"),
+ false, "Test Op");
Wait.waitFor(() -> IN_PROGRESS == getTxStatus(sctx, fateId));
// This is false because the transaction runner has reserved the FaTe
// transaction.
@@ -372,7 +373,7 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
FateId fateId = fate.startTransaction();
LOG.debug("Starting test testCancelWhileInCall with {}", fateId);
assertEquals(NEW, getTxStatus(sctx, fateId));
- fate.seedTransaction("TestOperation", fateId, new
TestRepo("testCancelWhileInCall"), true,
+ fate.seedTransaction(TEST_FATE_OP, fateId, new
TestRepo("testCancelWhileInCall"), true,
"Test Op");
assertEquals(SUBMITTED, getTxStatus(sctx, fateId));
// wait for call() to be called
@@ -488,8 +489,8 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
FateId fateId = fate.startTransaction();
assertEquals(NEW, getTxStatus(sctx, fateId));
- fate.seedTransaction("TestOperationFails", fateId,
- new TestOperationFails(1, ExceptionLocation.CALL), false, "Test Op
Fails");
+ fate.seedTransaction(TEST_FATE_OP, fateId, new TestOperationFails(1,
ExceptionLocation.CALL),
+ false, "Test Op Fails");
// Wait for all the undo() calls to complete
undoLatch.await();
assertEquals(expectedUndoOrder, TestOperationFails.undoOrder);
@@ -502,7 +503,7 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
undoLatch = new CountDownLatch(TestOperationFails.TOTAL_NUM_OPS);
fateId = fate.startTransaction();
assertEquals(NEW, getTxStatus(sctx, fateId));
- fate.seedTransaction("TestOperationFails", fateId,
+ fate.seedTransaction(TEST_FATE_OP, fateId,
new TestOperationFails(1, ExceptionLocation.IS_READY), false, "Test
Op Fails");
// Wait for all the undo() calls to complete
undoLatch.await();
@@ -546,8 +547,8 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
FateId fateId = fate.startTransaction();
transactions.add(fateId);
assertEquals(TStatus.NEW, getTxStatus(sctx, fateId));
- fate.seedTransaction("TestOperation", fateId, new
DeferredTestRepo("testDeferredOverflow"),
- true, "Test Op");
+ fate.seedTransaction(TEST_FATE_OP, fateId, new
DeferredTestRepo("testDeferredOverflow"), true,
+ "Test Op");
assertEquals(TStatus.SUBMITTED, getTxStatus(sctx, fateId));
}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
index 1980dcf4ee..d029ebb489 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
@@ -19,6 +19,7 @@
package org.apache.accumulo.test.fate;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -298,8 +299,8 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
FateKey fateKey2 =
FateKey.forCompactionCommit(ExternalCompactionId.generate(UUID.randomUUID()));
- var fateId1 = store.seedTransaction("TEST", fateKey1, new TestRepo(),
true).orElseThrow();
- var fateId2 = store.seedTransaction("TEST", fateKey2, new TestRepo(),
true).orElseThrow();
+ var fateId1 = store.seedTransaction(TEST_FATE_OP, fateKey1, new
TestRepo(), true).orElseThrow();
+ var fateId2 = store.seedTransaction(TEST_FATE_OP, fateKey2, new
TestRepo(), true).orElseThrow();
assertNotEquals(fateId1, fateId2);
@@ -333,11 +334,11 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new
Text("aaa"));
FateKey fateKey = FateKey.forSplit(ke);
- var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(),
true).orElseThrow();
+ var fateId = store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(),
true).orElseThrow();
// second call is empty
- assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(),
true).isEmpty());
- assertFalse(store.seedTransaction("TEST", fateId, new TestRepo(), true));
+ assertTrue(store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(),
true).isEmpty());
+ assertFalse(store.seedTransaction(TEST_FATE_OP, fateId, new TestRepo(),
true));
var txStore = store.reserve(fateId);
try {
@@ -362,7 +363,7 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new
Text("aaa"));
FateKey fateKey = FateKey.forSplit(ke);
- var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(),
true).orElseThrow();
+ var fateId = store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(),
true).orElseThrow();
var txStore = store.reserve(fateId);
try {
@@ -371,7 +372,7 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
// We have an existing transaction with the same key in progress
// so should return an empty Optional
- assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(),
true).isEmpty());
+ assertTrue(store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(),
true).isEmpty());
assertEquals(TStatus.IN_PROGRESS, txStore.getStatus());
} finally {
txStore.setStatus(TStatus.SUCCESSFUL);
@@ -383,7 +384,8 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
try {
// After deletion, make sure we can create again with the same key
- var fateId2 = store.seedTransaction("TEST", fateKey, new TestRepo(),
true).orElseThrow();
+ var fateId2 =
+ store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(),
true).orElseThrow();
txStore = store.reserve(fateId);
assertEquals(fateId, fateId2);
assertTrue(txStore.timeCreated() > 0);
@@ -424,10 +426,10 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
FateKey fateKey1 = FateKey.forSplit(ke1);
FateKey fateKey2 = FateKey.forSplit(ke2);
- var fateId1 = store.seedTransaction("TEST", fateKey1, new TestRepo(),
true).orElseThrow();
+ var fateId1 = store.seedTransaction(TEST_FATE_OP, fateKey1, new
TestRepo(), true).orElseThrow();
var txStore = store.reserve(fateId1);
try {
- assertTrue(store.seedTransaction("TEST", fateKey2, new TestRepo(),
true).isEmpty());
+ assertTrue(store.seedTransaction(TEST_FATE_OP, fateKey2, new TestRepo(),
true).isEmpty());
assertEquals(fateKey1, txStore.getKey().orElseThrow());
} finally {
txStore.delete();
@@ -447,14 +449,14 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new
Text("aaa"));
FateKey fateKey = FateKey.forSplit(ke);
- var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(),
true).orElseThrow();
+ var fateId = store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(),
true).orElseThrow();
// After seeding a fate transaction using a key we can simulate a
collision with
// a random FateId by deleting the key out of Fate and calling seed again
to
// verify it detects the key is missing. Then we can continue and see if
we can still use
// the existing transaction.
deleteKey(fateId, sctx);
- assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(),
true).isEmpty());
+ assertTrue(store.seedTransaction(TEST_FATE_OP, fateKey, new TestRepo(),
true).isEmpty());
var txStore = store.reserve(fateId);
// We should still be able to use the existing transaction
@@ -600,8 +602,8 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
// have 10 threads all try to seed the same fate key, only one should
succeed.
List<Future<Optional<FateId>>> futures = new ArrayList<>(10);
for (int i = 0; i < 10; i++) {
- futures.add(
- executor.submit(() -> store.seedTransaction("TEST", fateKey, new
TestRepo(), true)));
+ futures.add(executor
+ .submit(() -> store.seedTransaction(TEST_FATE_OP, fateKey, new
TestRepo(), true)));
}
int idsSeen = 0;
@@ -683,7 +685,7 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
Map<FateKey,FateId> fateKeyIds = new HashMap<>();
for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) {
- var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(),
true).orElseThrow();
+ var fateId = store.seedTransaction(TEST_FATE_OP, fateKey, new
TestRepo(), true).orElseThrow();
fateKeyIds.put(fateKey, fateId);
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java
index 1f96bd9b94..83bb8a3e5b 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreUtil.java
@@ -30,6 +30,7 @@ import
org.apache.accumulo.core.client.admin.TabletAvailability;
import org.apache.accumulo.core.client.admin.TabletInformation;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.test.zookeeper.ZooKeeperTestingServer;
@@ -39,9 +40,14 @@ import org.junit.jupiter.api.io.TempDir;
import com.google.common.collect.MoreCollectors;
/**
- * A class with utility methods for testing UserFateStore and MetaFateStore
+ * A class with utilities for testing {@link
org.apache.accumulo.core.fate.user.UserFateStore} and
+ * {@link org.apache.accumulo.core.fate.zookeeper.MetaFateStore}
*/
public class FateStoreUtil {
+ // A FateOperation for testing purposes when a FateOperation is needed but
whose value doesn't
+ // matter
+ public static final Fate.FateOperation TEST_FATE_OP =
Fate.FateOperation.TABLE_CREATE;
+
/**
* Create the fate table with the exact configuration as the real Fate user
instance table
* including table properties and TabletAvailability. For use in testing
UserFateStore
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
index 292f1fdfb4..9dd6577018 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.fate;
+import static org.apache.accumulo.test.fate.FateStoreUtil.TEST_FATE_OP;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -253,10 +254,10 @@ public abstract class MultipleStoresIT extends
SharedMiniClusterBase {
// Start half the txns using fate1, and the other half using fate2
if (i % 2 == 0) {
fateId = fate1.startTransaction();
- fate1.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true,
"test");
+ fate1.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(),
true, "test");
} else {
fateId = fate2.startTransaction();
- fate2.seedTransaction("op" + i, fateId, new SleepingTestRepo(), true,
"test");
+ fate2.seedTransaction(TEST_FATE_OP, fateId, new SleepingTestRepo(),
true, "test");
}
allIds.add(fateId);
}
@@ -311,7 +312,7 @@ public abstract class MultipleStoresIT extends
SharedMiniClusterBase {
for (int i = 0; i < numFateIds; i++) {
FateId fateId;
fateId = fate1.startTransaction();
- fate1.seedTransaction("op" + i, fateId, new LatchTestRepo(), true,
"test");
+ fate1.seedTransaction(TEST_FATE_OP, fateId, new LatchTestRepo(), true,
"test");
allIds.add(fateId);
}
assertEquals(numFateIds, allIds.size());
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 5e5775110f..f74944692e 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -50,6 +50,7 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.AdminUtil;
+import org.apache.accumulo.core.fate.Fate;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
import org.apache.accumulo.core.fate.user.UserFateStore;
@@ -421,10 +422,10 @@ public class FateConcurrencyIT extends
AccumuloClusterHarness {
log.trace("Fate id: {}, status: {}", tx.getFateId(), tx.getStatus());
String top = tx.getTop();
- String txName = tx.getTxName();
+ Fate.FateOperation txName = tx.getTxName();
return top != null && txName != null && top.contains("CompactionDriver")
- && tx.getTxName().equals("TABLE_COMPACT");
+ && txName == Fate.FateOperation.TABLE_COMPACT;
}
/**