This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 7da7ac89cc Fixes discrepancy between `admin fate delete` and
`UserFateStore.delete` (#5016)
7da7ac89cc is described below
commit 7da7ac89cc54a4c8e660eb713b0afd5c525c0ec9
Author: Kevin Rathbun <[email protected]>
AuthorDate: Tue Nov 5 08:37:31 2024 -0500
Fixes discrepancy between `admin fate delete` and `UserFateStore.delete`
(#5016)
* Fixes discrepancy between `admin fate delete` and `UserFateStore.delete`
`admin fate delete` allows users to delete a transaction. If this
transaction is a USER transaction, `UserFateStore.delete()` is how this is
achieved. The admin code allows transactions with a status of SUBMITTED,
IN_PROGRESS, NEW, FAILED, FAILED_IN_PROGRESS, SUCCESSFUL to call `delete()`,
but the `delete()` code only allows NEW, SUBMITTED, SUCCESSFUL, FAILED. Admin
now calls a force delete to delete the transaction regardless of state.
forceDelete() is a new method only to be used by Admin.
---
.../main/java/org/apache/accumulo/core/fate/AdminUtil.java | 2 +-
core/src/main/java/org/apache/accumulo/core/fate/Fate.java | 2 +-
.../main/java/org/apache/accumulo/core/fate/FateStore.java | 6 ++++++
.../org/apache/accumulo/core/fate/WrappedFateTxStore.java | 12 +++++++++++-
.../org/apache/accumulo/core/fate/user/UserFateStore.java | 12 ++++++++++++
.../apache/accumulo/core/fate/zookeeper/MetaFateStore.java | 5 +++++
.../java/org/apache/accumulo/core/logging/FateLogger.java | 13 ++++++++-----
.../test/java/org/apache/accumulo/core/fate/TestStore.java | 6 ++++++
8 files changed, 50 insertions(+), 8 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
index 97c47b39ca..059044b754 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
@@ -476,7 +476,7 @@ public class AdminUtil<T> {
case FAILED_IN_PROGRESS:
case SUCCESSFUL:
System.out.printf("Deleting transaction: %s (%s)%n", fateIdStr, ts);
- txStore.delete();
+ txStore.forceDelete();
state = true;
break;
}
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index 1df4c36eec..1350cce652 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -344,7 +344,7 @@ public class Fate<T> {
*/
public Fate(T environment, FateStore<T> store, boolean runDeadResCleaner,
Function<Repo<T>,String> toLogStrFunc, AccumuloConfiguration conf) {
- this.store = FateLogger.wrap(store, toLogStrFunc);
+ this.store = FateLogger.wrap(store, toLogStrFunc, false);
this.environment = environment;
final ThreadPoolExecutor pool =
ThreadPools.getServerThreadPools().createExecutorService(conf,
Property.MANAGER_FATE_THREADPOOL_SIZE, true);
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index ae193d8df8..09ee12dd94 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -104,6 +104,12 @@ public interface FateStore<T> extends ReadOnlyFateStore<T>
{
*/
void delete();
+ /**
+ * Force remove the transaction from the store regardless of the status.
Only to be used by
+ * {@link AdminUtil}
+ */
+ void forceDelete();
+
/**
* Return the given transaction to the store.
*
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
index f121a902e0..d282e304f1 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/WrappedFateTxStore.java
@@ -24,11 +24,15 @@ import java.util.EnumSet;
import java.util.List;
import java.util.Optional;
+import com.google.common.base.Preconditions;
+
public class WrappedFateTxStore<T> implements FateStore.FateTxStore<T> {
protected final FateStore.FateTxStore<T> wrapped;
+ private final boolean allowForceDel;
- public WrappedFateTxStore(FateStore.FateTxStore<T> wrapped) {
+ public WrappedFateTxStore(FateStore.FateTxStore<T> wrapped, boolean
allowForceDel) {
this.wrapped = wrapped;
+ this.allowForceDel = allowForceDel;
}
@Override
@@ -86,6 +90,12 @@ public class WrappedFateTxStore<T> implements
FateStore.FateTxStore<T> {
wrapped.delete();
}
+ @Override
+ public void forceDelete() {
+ Preconditions.checkState(allowForceDel, "Force delete is not allowed");
+ wrapped.forceDelete();
+ }
+
@Override
public long timeCreated() {
return wrapped.timeCreated();
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index 7446d1fafe..efd0cbc62f 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -544,6 +544,18 @@ public class UserFateStore<T> extends AbstractFateStore<T>
{
this.deleted = true;
}
+ @Override
+ public void forceDelete() {
+ verifyReservedAndNotDeleted(true);
+
+ var mutator = newMutator(fateId);
+ // allow deletion of all txns other than UNKNOWN
+ mutator.requireStatus(TStatus.NEW, TStatus.SUBMITTED,
TStatus.SUCCESSFUL, TStatus.FAILED,
+ TStatus.FAILED_IN_PROGRESS, TStatus.IN_PROGRESS);
+ mutator.delete().mutate();
+ this.deleted = true;
+ }
+
private Optional<Integer> findTop() {
return scanTx(scanner -> {
scanner.setRange(getRow(fateId));
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
index d19db17004..d6da05e844 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
@@ -369,6 +369,11 @@ public class MetaFateStore<T> extends AbstractFateStore<T>
{
}
}
+ @Override
+ public void forceDelete() {
+ delete();
+ }
+
@Override
public void setTransactionInfo(Fate.TxInfo txInfo, Serializable so) {
verifyReservedAndNotDeleted(true);
diff --git
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index 17a64541c9..4a9f2517c0 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -51,8 +51,9 @@ public class FateLogger {
private final Function<Repo<T>,String> toLogString;
- private LoggingFateTxStore(FateTxStore<T> wrapped,
Function<Repo<T>,String> toLogString) {
- super(wrapped);
+ private LoggingFateTxStore(FateTxStore<T> wrapped,
Function<Repo<T>,String> toLogString,
+ boolean allowForceDel) {
+ super(wrapped, allowForceDel);
this.toLogString = toLogString;
}
@@ -97,19 +98,21 @@ public class FateLogger {
}
}
- public static <T> FateStore<T> wrap(FateStore<T> store,
Function<Repo<T>,String> toLogString) {
+ public static <T> FateStore<T> wrap(FateStore<T> store,
Function<Repo<T>,String> toLogString,
+ boolean allowForceDel) {
// only logging operations that change the persisted data, not operations
that only read data
return new FateStore<>() {
@Override
public FateTxStore<T> reserve(FateId fateId) {
- return new LoggingFateTxStore<>(store.reserve(fateId), toLogString);
+ return new LoggingFateTxStore<>(store.reserve(fateId), toLogString,
allowForceDel);
}
@Override
public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
- return store.tryReserve(fateId).map(ftxs -> new
LoggingFateTxStore<>(ftxs, toLogString));
+ return store.tryReserve(fateId)
+ .map(ftxs -> new LoggingFateTxStore<>(ftxs, toLogString,
allowForceDel));
}
@Override
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 859fe5040a..2c54464663 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -199,6 +199,12 @@ public class TestStore implements FateStore<String> {
statuses.remove(fateId);
}
+ @Override
+ public void forceDelete() {
+ throw new UnsupportedOperationException(
+ this.getClass().getSimpleName() + " should not be calling
forceDelete()");
+ }
+
@Override
public void unreserve(Duration deferTime) {
if (!reserved.remove(fateId)) {