This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 6e73d0dc3e Update places where one fate store is used when both should
be (#4696)
6e73d0dc3e is described below
commit 6e73d0dc3e1fb7d80492d219546490c5afe08e29
Author: Kevin Rathbun <[email protected]>
AuthorDate: Tue Jun 25 18:27:02 2024 -0400
Update places where one fate store is used when both should be (#4696)
- UpgradeCoordinator.abortIfFateTransactions now uses both stores
- FunctionalTestUtils.getFateStatus() now uses both stores. As this method
is currently only used in assertNoDanglingFateLocks, technically only the
MetaFateStore is needed, but based on the method name and if it is used for
other purposes in the future, both stores should be used.
---
.../apache/accumulo/manager/upgrade/UpgradeCoordinator.java | 8 ++++++--
.../org/apache/accumulo/test/functional/FateConcurrencyIT.java | 8 ++++----
.../apache/accumulo/test/functional/FunctionalTestUtils.java | 10 ++++++++--
3 files changed, 18 insertions(+), 8 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index 22fe50c0c9..46d29516c2 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@ -31,6 +31,7 @@ import java.util.TreeMap;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
+import java.util.stream.Stream;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
@@ -40,6 +41,7 @@ import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.ConfigCheckUtil;
import org.apache.accumulo.core.fate.MetaFateStore;
import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.volume.Volume;
@@ -307,9 +309,11 @@ public class UpgradeCoordinator {
justification = "Want to immediately stop all manager threads on upgrade
error")
private void abortIfFateTransactions(ServerContext context) {
try {
- final ReadOnlyFateStore<UpgradeCoordinator> fate = new MetaFateStore<>(
+ final ReadOnlyFateStore<UpgradeCoordinator> mfs = new MetaFateStore<>(
context.getZooKeeperRoot() + Constants.ZFATE,
context.getZooReaderWriter());
- try (var idStream = fate.list()) {
+ final ReadOnlyFateStore<UpgradeCoordinator> ufs = new
UserFateStore<>(context);
+ try (var mfsList = mfs.list(); var ufsList = ufs.list();
+ var idStream = Stream.concat(mfsList, ufsList)) {
if (idStream.findFirst().isPresent()) {
throw new AccumuloException("Aborting upgrade because there are"
+ " outstanding FATE transactions from a previous Accumulo
version."
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
index 5f05b0b64e..f0c38fe374 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/FateConcurrencyIT.java
@@ -262,13 +262,13 @@ public class FateConcurrencyIT extends
AccumuloClusterHarness {
InstanceId instanceId = context.getInstanceID();
ZooReaderWriter zk = context.getZooReader().asWriter(secret);
- MetaFateStore<String> zs =
+ MetaFateStore<String> mfs =
new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE,
zk);
var lockPath =
ServiceLock.path(ZooUtil.getRoot(instanceId) +
Constants.ZTABLE_LOCKS + "/" + tableId);
UserFateStore<String> ufs = new UserFateStore<>(context);
Map<FateInstanceType,ReadOnlyFateStore<String>> fateStores =
- Map.of(FateInstanceType.META, zs, FateInstanceType.USER, ufs);
+ Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
withLocks = admin.getStatus(fateStores, zk, lockPath, null, null,
null);
@@ -356,11 +356,11 @@ public class FateConcurrencyIT extends
AccumuloClusterHarness {
InstanceId instanceId = context.getInstanceID();
ZooReaderWriter zk = context.getZooReader().asWriter(secret);
- MetaFateStore<String> zs =
+ MetaFateStore<String> mfs =
new MetaFateStore<>(ZooUtil.getRoot(instanceId) + Constants.ZFATE,
zk);
var lockPath =
ServiceLock.path(ZooUtil.getRoot(instanceId) +
Constants.ZTABLE_LOCKS + "/" + tableId);
- AdminUtil.FateStatus fateStatus = admin.getStatus(zs, zk, lockPath,
null, null, null);
+ AdminUtil.FateStatus fateStatus = admin.getStatus(mfs, zk, lockPath,
null, null, null);
log.trace("current fates: {}", fateStatus.getTransactions().size());
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
index 6d80cf5be0..b81a23c808 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/FunctionalTestUtils.java
@@ -60,7 +60,10 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.AdminUtil;
import org.apache.accumulo.core.fate.AdminUtil.FateStatus;
+import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.MetaFateStore;
+import org.apache.accumulo.core.fate.ReadOnlyFateStore;
+import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.AccumuloTable;
@@ -231,10 +234,13 @@ public class FunctionalTestUtils {
AdminUtil<String> admin = new AdminUtil<>(false);
ServerContext context = cluster.getServerContext();
ZooReaderWriter zk = context.getZooReaderWriter();
- MetaFateStore<String> zs =
+ MetaFateStore<String> mfs =
new MetaFateStore<>(context.getZooKeeperRoot() + Constants.ZFATE,
zk);
+ UserFateStore<String> ufs = new UserFateStore<>(context);
+ Map<FateInstanceType,ReadOnlyFateStore<String>> fateStores =
+ Map.of(FateInstanceType.META, mfs, FateInstanceType.USER, ufs);
var lockPath = ServiceLock.path(context.getZooKeeperRoot() +
Constants.ZTABLE_LOCKS);
- return admin.getStatus(zs, zk, lockPath, null, null, null);
+ return admin.getStatus(fateStores, zk, lockPath, null, null, null);
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException(e);
}