This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 76fffdedf3 Modified Tablet Mutator implementations to always update
srv:lock (#4421)
76fffdedf3 is described below
commit 76fffdedf32d4df03412ade09ec367d909fe79f3
Author: Dave Marion <[email protected]>
AuthorDate: Mon Apr 29 15:29:28 2024 -0400
Modified Tablet Mutator implementations to always update srv:lock (#4421)
Modified the tablet mutating implementations (TabletMutator,
ConditionalTabletMutator, etc.) to always update the srv:lock
column so that the MetadataConstraints filter on the server side
validates that the update comes from a "valid" server. I had to
modify MiniAccumuloClusterImpl to acquire a ServiceLock for the ITs to
use where the test code is updating the tablet metadata. The changes
in the other classes are mainly just making sure that the tablet
mutator classes are getting the ServiceLock.
Closes #4420
---
.../org/apache/accumulo/core/lock/ServiceLock.java | 12 +
.../accumulo/core/metadata/schema/Ample.java | 3 -
.../metadata/schema/TabletMetadataBuilder.java | 6 -
.../core/metadata/schema/TabletMutatorBase.java | 3 +-
.../miniclusterImpl/MiniAccumuloClusterImpl.java | 279 ++++++++++++---------
.../org/apache/accumulo/server/ServerContext.java | 18 ++
.../manager/state/AbstractTabletStateStore.java | 4 +-
.../server/manager/state/MetaDataStateStore.java | 5 +-
.../server/manager/state/RootTabletStateStore.java | 4 +-
.../metadata/ConditionalTabletMutatorImpl.java | 7 +
.../server/metadata/RootTabletMutatorImpl.java | 7 +
.../server/metadata/TabletMutatorImpl.java | 11 +-
.../accumulo/server/util/MetadataTableUtil.java | 1 -
.../manager/state/ZooTabletStateStoreTest.java | 9 +-
.../ConditionalTabletsMutatorImplTest.java | 24 +-
.../org/apache/accumulo/compactor/Compactor.java | 1 +
.../apache/accumulo/gc/SimpleGarbageCollector.java | 7 +-
.../java/org/apache/accumulo/manager/Manager.java | 1 +
.../accumulo/manager/TabletGroupWatcher.java | 2 -
.../manager/tableOps/create/PopulateMetadata.java | 1 -
.../manager/tableOps/merge/MergeTabletsTest.java | 8 +-
.../manager/tableOps/split/UpdateTabletsTest.java | 4 +
.../java/org/apache/accumulo/monitor/Monitor.java | 1 +
.../org/apache/accumulo/tserver/ScanServer.java | 1 +
.../org/apache/accumulo/tserver/TabletServer.java | 1 +
.../accumulo/tserver/tablet/ScanfileManager.java | 10 +-
.../org/apache/accumulo/tserver/tablet/Tablet.java | 8 +-
.../accumulo/test/functional/AccumuloClientIT.java | 10 +-
.../apache/accumulo/test/functional/SplitIT.java | 9 +-
.../accumulo/test/functional/SplitRecoveryIT.java | 57 +----
.../accumulo/test/performance/NullTserver.java | 101 ++++++--
31 files changed, 380 insertions(+), 235 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java
b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java
index 43388052a3..ef80d4e52d 100644
--- a/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java
+++ b/core/src/main/java/org/apache/accumulo/core/lock/ServiceLock.java
@@ -32,6 +32,7 @@ import
org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.core.util.time.NanoTime;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
@@ -558,6 +559,17 @@ public class ServiceLock implements Watcher {
LOG.debug("[{}] Deleting all at path {} due to unlock", vmLockPrefix,
pathToDelete);
ZooUtil.recursiveDelete(zooKeeper, pathToDelete, NodeMissingPolicy.SKIP);
+ // Wait for the delete to happen on the server before exiting method
+ NanoTime start = NanoTime.now();
+ while (zooKeeper.exists(pathToDelete, null) != null) {
+ Thread.onSpinWait();
+ if (NanoTime.now().subtract(start).toSeconds() > 10) {
+ start = NanoTime.now();
+ LOG.debug("[{}] Still waiting for zookeeper to delete all at {}",
vmLockPrefix,
+ pathToDelete);
+ }
+ }
+
localLw.lostLock(LockLossReason.LOCK_DELETED);
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index d85553f75e..27856c899b 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -36,7 +36,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.gc.GcCandidate;
import org.apache.accumulo.core.gc.ReferenceFile;
-import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
@@ -364,8 +363,6 @@ public interface Ample {
T deleteLocation(Location location);
- T putZooLock(String zookeeperRoot, ServiceLock zooLock);
-
T putDirName(String dirName);
T putWal(LogEntry logEntry);
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
index fb79fc8066..8b7e96e59e 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMetadataBuilder.java
@@ -51,7 +51,6 @@ import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
-import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TServerInstance;
@@ -144,11 +143,6 @@ public class TabletMetadataBuilder implements
Ample.TabletUpdates<TabletMetadata
throw new UnsupportedOperationException();
}
- @Override
- public TabletMetadataBuilder putZooLock(String zookeeperRoot, ServiceLock
zooLock) {
- throw new UnsupportedOperationException();
- }
-
@Override
public TabletMetadataBuilder putDirName(String dirName) {
fetched.add(DIR);
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
index 9cc8143dc3..4fc978f150 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/TabletMutatorBase.java
@@ -177,8 +177,7 @@ public abstract class TabletMutatorBase<T extends
Ample.TabletUpdates<T>>
return getThis();
}
- @Override
- public T putZooLock(String zookeeperRoot, ServiceLock zooLock) {
+ protected T putZooLock(String zookeeperRoot, ServiceLock zooLock) {
Preconditions.checkState(updatesEnabled, "Cannot make updates after
calling mutate.");
ServerColumnFamily.LOCK_COLUMN.put(mutation,
new Value(zooLock.getLockID().serialize(zookeeperRoot + "/")));
diff --git
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
index 68c8cbd798..ba0cfe926f 100644
---
a/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
+++
b/minicluster/src/main/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImpl.java
@@ -46,6 +46,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -53,7 +54,7 @@ import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -79,7 +80,14 @@ import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.InstanceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.fate.zookeeper.ZooSession;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
+import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
+import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
+import org.apache.accumulo.core.lock.ServiceLockData;
+import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.manager.thrift.ManagerGoalState;
import org.apache.accumulo.core.manager.thrift.ManagerMonitorInfo;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
@@ -110,11 +118,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.Code;
-import org.apache.zookeeper.ZKUtil;
import org.apache.zookeeper.ZooKeeper;
-import org.apache.zookeeper.ZooKeeper.States;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -148,6 +154,9 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
private boolean initialized = false;
private ExecutorService executor;
+ private ServiceLock miniLock;
+ private ZooKeeper zk;
+ private AccumuloClient client;
/**
*
@@ -255,7 +264,17 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
File siteFile = new File(config.getConfDir(), "accumulo.properties");
writeConfigProperties(siteFile, config.getSiteConfig());
this.siteConfig = SiteConfiguration.fromFile(siteFile).build();
- this.context = Suppliers.memoize(() -> new ServerContext(siteConfig));
+ this.context = Suppliers.memoize(() -> new ServerContext(siteConfig) {
+
+ @Override
+ public ServiceLock getServiceLock() {
+ // Override getServiceLock because any call to setServiceLock
+ // will set the SingletonManager.MODE to SERVER and we may not
+ // want that side-effect.
+ return miniLock;
+ }
+
+ });
if (!config.useExistingInstance() && !config.useExistingZooKeepers()) {
zooCfgFile = new File(config.getConfDir(), "zoo.cfg");
@@ -646,7 +665,85 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
}
control.start(ServerType.COMPACTOR);
- verifyUp();
+ final AtomicBoolean lockAcquired = new AtomicBoolean(false);
+ final CountDownLatch lockWatcherInvoked = new CountDownLatch(1);
+ AccumuloLockWatcher miniLockWatcher = new AccumuloLockWatcher() {
+
+ @Override
+ public void lostLock(LockLossReason reason) {
+ log.warn("Lost lock: " + reason.toString());
+ miniLock = null;
+ }
+
+ @Override
+ public void unableToMonitorLockNode(Exception e) {
+ log.warn("Unable to monitor lock: " + e.getMessage());
+ miniLock = null;
+ }
+
+ @Override
+ public void acquiredLock() {
+ log.warn("Acquired ZK lock for MiniAccumuloClusterImpl");
+ lockAcquired.set(true);
+ lockWatcherInvoked.countDown();
+ }
+
+ @Override
+ public void failedToAcquireLock(Exception e) {
+ log.warn("Failed to acquire ZK lock for MiniAccumuloClusterImpl, msg:
" + e.getMessage());
+ lockWatcherInvoked.countDown();
+ miniLock = null;
+ }
+ };
+
+ InstanceId iid = null;
+ // It's possible start was called twice...
+ if (client == null) {
+ client = Accumulo.newClient().from(getClientProperties()).build();
+ }
+ iid = client.instanceOperations().getInstanceId();
+ // The code below does not use `getServerContext()` as that will
+ // set the SingletonManager.mode to SERVER which will cause some
+ // tests to fail
+ final Map<String,String> properties = config.getSiteConfig();
+ final int timeout = (int)
ConfigurationTypeHelper.getTimeInMillis(properties.getOrDefault(
+ Property.INSTANCE_ZK_TIMEOUT.getKey(),
Property.INSTANCE_ZK_TIMEOUT.getDefaultValue()));
+ final String secret = properties.get(Property.INSTANCE_SECRET.getKey());
+ final byte[] auth = ("accumulo:" + secret).getBytes(UTF_8);
+ zk = ZooSession.getAuthenticatedSession(config.getZooKeepers(), timeout,
"digest", auth);
+
+ // It's possible start was called twice...
+ if (miniLock == null) {
+ String zooKeeperRoot = ZooUtil.getRoot(iid);
+ UUID miniUUID = UUID.randomUUID();
+ String miniZDirPath = zooKeeperRoot + "/mini";
+ String miniZInstancePath = miniZDirPath + "/" + miniUUID.toString();
+ try {
+ if (zk.exists(miniZDirPath, null) == null) {
+ zk.create(miniZDirPath, new byte[0], ZooUtil.PUBLIC,
CreateMode.PERSISTENT);
+ log.info("Created: {}", miniZDirPath);
+ }
+ if (zk.exists(miniZInstancePath, null) == null) {
+ zk.create(miniZInstancePath, new byte[0], ZooUtil.PUBLIC,
CreateMode.PERSISTENT);
+ log.info("Created: {}", miniZInstancePath);
+ }
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Error creating path in ZooKeeper", e);
+ }
+ ServiceLockPath path = ServiceLock.path(miniZInstancePath);
+ ServiceLockData sld = new ServiceLockData(miniUUID, "localhost",
ThriftService.NONE,
+ Constants.DEFAULT_RESOURCE_GROUP_NAME);
+ miniLock = new ServiceLock(zk, path, miniUUID);
+ miniLock.lock(miniLockWatcher, sld);
+
+ lockWatcherInvoked.await();
+
+ if (!lockAcquired.get()) {
+ throw new IllegalStateException("Error creating MAC entry in
ZooKeeper");
+ }
+ }
+
+ verifyUp(iid);
printProcessSummary();
@@ -725,9 +822,7 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
}
}
- private void verifyUp() throws InterruptedException, IOException {
-
- int numTries = 10;
+ private void verifyUp(InstanceId instanceId) throws InterruptedException,
IOException {
requireNonNull(getClusterControl().managerProcess, "Error starting Manager
- no process");
waitForProcessStart(getClusterControl().managerProcess, "Manager");
@@ -762,123 +857,60 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
}
}
- try (ZooKeeper zk = new ZooKeeper(getZooKeepers(), 60000, event ->
log.warn("{}", event))) {
-
- String secret = getSiteConfiguration().get(Property.INSTANCE_SECRET);
-
- while (!(zk.getState() == States.CONNECTED)) {
- log.info("Waiting for ZK client to connect, state: {} - will retry",
zk.getState());
- Thread.sleep(1000);
- }
-
- String instanceId = null;
- for (int i = 0; i < numTries; i++) {
- if (zk.getState() == States.CONNECTED) {
- ZooUtil.digestAuth(zk, secret);
- try {
- final AtomicInteger rc = new AtomicInteger();
- final CountDownLatch waiter = new CountDownLatch(1);
- zk.sync("/", (code, arg1, arg2) -> {
- rc.set(code);
- waiter.countDown();
- }, null);
- waiter.await();
- Code code = Code.get(rc.get());
- if (code != Code.OK) {
- throw KeeperException.create(code);
- }
- String instanceNamePath =
- Constants.ZROOT + Constants.ZINSTANCES + "/" +
config.getInstanceName();
- byte[] bytes = zk.getData(instanceNamePath, null, null);
- instanceId = new String(bytes, UTF_8);
- break;
- } catch (KeeperException e) {
- log.warn("Error trying to read instance id from zookeeper: " +
e.getMessage());
- log.debug("Unable to read instance id from zookeeper.", e);
- }
- } else {
- log.warn("ZK client not connected, state: {}", zk.getState());
- }
- Thread.sleep(1000);
- }
-
- if (instanceId == null) {
- for (int i = 0; i < numTries; i++) {
- if (zk.getState() == States.CONNECTED) {
- ZooUtil.digestAuth(zk, secret);
- try {
- log.warn("******* COULD NOT FIND INSTANCE ID - DUMPING ZK
************");
- log.warn("Connected to ZooKeeper: {}", getZooKeepers());
- log.warn("Looking for instanceId at {}",
- Constants.ZROOT + Constants.ZINSTANCES + "/" +
config.getInstanceName());
- ZKUtil.visitSubTreeDFS(zk, Constants.ZROOT, false,
- (rc, path, ctx, name) -> log.warn("{}", path));
- log.warn("******* END ZK DUMP ************");
- } catch (KeeperException | InterruptedException e) {
- log.error("Error dumping zk", e);
- }
- }
- Thread.sleep(1000);
- }
- throw new IllegalStateException("Unable to find instance id from
zookeeper.");
- }
-
- String rootPath = Constants.ZROOT + "/" + instanceId;
- int tsActualCount = 0;
- try {
- while (tsActualCount < ssExpectedCount) {
- tsActualCount = 0;
- for (String child : zk.getChildren(rootPath + Constants.ZTSERVERS,
null)) {
- if (zk.getChildren(rootPath + Constants.ZTSERVERS + "/" + child,
null).isEmpty()) {
- log.info("TServer " + tsActualCount + " not yet present in
ZooKeeper");
- } else {
- tsActualCount++;
- log.info("TServer " + tsActualCount + " present in ZooKeeper");
- }
+ final String rootPath = ZooUtil.getRoot(instanceId);
+ int tsActualCount = 0;
+ try {
+ while (tsActualCount < ssExpectedCount) {
+ tsActualCount = 0;
+ for (String child : zk.getChildren(rootPath + Constants.ZTSERVERS,
null)) {
+ if (zk.getChildren(rootPath + Constants.ZTSERVERS + "/" + child,
null).isEmpty()) {
+ log.info("TServer " + tsActualCount + " not yet present in
ZooKeeper");
+ } else {
+ tsActualCount++;
+ log.info("TServer " + tsActualCount + " present in ZooKeeper");
}
- Thread.sleep(500);
}
- } catch (KeeperException e) {
- throw new IllegalStateException("Unable to read TServer information
from zookeeper.", e);
+ Thread.sleep(500);
}
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Unable to read TServer information from
zookeeper.", e);
+ }
- int ecActualCount = 0;
- try {
- while (ecActualCount < ecExpectedCount) {
- ecActualCount = 0;
- for (String queue : zk.getChildren(rootPath + Constants.ZCOMPACTORS,
null)) {
- var qc = zk.getChildren(rootPath + Constants.ZCOMPACTORS + "/" +
queue, null);
- if (qc != null) {
- ecActualCount += qc.size();
- }
+ int ecActualCount = 0;
+ try {
+ while (ecActualCount < ecExpectedCount) {
+ ecActualCount = 0;
+ for (String queue : zk.getChildren(rootPath + Constants.ZCOMPACTORS,
null)) {
+ var qc = zk.getChildren(rootPath + Constants.ZCOMPACTORS + "/" +
queue, null);
+ if (qc != null) {
+ ecActualCount += qc.size();
}
- log.info(
- "Compactor " + ecActualCount + " of " + ecExpectedCount + "
present in ZooKeeper");
- Thread.sleep(500);
}
- } catch (KeeperException e) {
- throw new IllegalStateException("Unable to read Compactor information
from zookeeper.", e);
+ log.info("Compactor " + ecActualCount + " of " + ecExpectedCount + "
present in ZooKeeper");
+ Thread.sleep(500);
}
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Unable to read Compactor information
from zookeeper.", e);
+ }
- try {
- while (zk.getChildren(rootPath + Constants.ZMANAGER_LOCK,
null).isEmpty()) {
- log.info("Manager not yet present in ZooKeeper");
- Thread.sleep(500);
- }
- } catch (KeeperException e) {
- throw new IllegalStateException("Unable to read Manager information
from zookeeper.", e);
+ try {
+ while (zk.getChildren(rootPath + Constants.ZMANAGER_LOCK,
null).isEmpty()) {
+ log.info("Manager not yet present in ZooKeeper");
+ Thread.sleep(500);
}
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Unable to read Manager information from
zookeeper.", e);
+ }
- try {
- while (zk.getChildren(rootPath + Constants.ZGC_LOCK, null).isEmpty()) {
- log.info("GC not yet present in ZooKeeper");
- Thread.sleep(500);
- }
- } catch (KeeperException e) {
- throw new IllegalStateException("Unable to read GC information from
zookeeper.", e);
+ try {
+ while (zk.getChildren(rootPath + Constants.ZGC_LOCK, null).isEmpty()) {
+ log.info("GC not yet present in ZooKeeper");
+ Thread.sleep(500);
}
-
+ } catch (KeeperException e) {
+ throw new IllegalStateException("Unable to read GC information from
zookeeper.", e);
}
+
}
private List<String> buildRemoteDebugParams(int port) {
@@ -954,6 +986,24 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
return;
}
+ if (miniLock != null) {
+ try {
+ miniLock.unlock();
+ } catch (InterruptedException | KeeperException e) {
+ log.error("Error unlocking ServiceLock for MiniAccumuloClusterImpl",
e);
+ }
+ miniLock = null;
+ this.getServerContext().clearServiceLock();
+ }
+ if (zk != null) {
+ zk.close();
+ zk = null;
+ }
+ if (client != null) {
+ client.close();
+ client = null;
+ }
+
MiniAccumuloClusterControl control = getClusterControl();
control.stop(ServerType.GARBAGE_COLLECTOR, null);
@@ -1092,4 +1142,5 @@ public class MiniAccumuloClusterImpl implements
AccumuloCluster {
public String getClientPropsPath() {
return config.getClientPropsFile().getAbsolutePath();
}
+
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index bf75821828..b9735a47bd 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -38,6 +38,7 @@ import java.util.TreeMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import org.apache.accumulo.core.Constants;
@@ -52,6 +53,7 @@ import org.apache.accumulo.core.data.NamespaceId;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.fate.zookeeper.ZooReader;
import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
+import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metrics.MetricsInfo;
import org.apache.accumulo.core.rpc.SslConnectionParams;
@@ -105,6 +107,7 @@ public class ServerContext extends ClientContext {
private final Supplier<AuditedSecurityOperation> securityOperation;
private final Supplier<CryptoServiceFactory> cryptoFactorySupplier;
private final Supplier<LowMemoryDetector> lowMemoryDetector;
+ private final AtomicReference<ServiceLock> serverLock = new
AtomicReference<>();
private final Supplier<MetricsInfo> metricsInfoSupplier;
public ServerContext(SiteConfiguration siteConfig) {
@@ -463,6 +466,21 @@ public class ServerContext extends ClientContext {
return lowMemoryDetector.get();
}
+ public void setServiceLock(ServiceLock lock) {
+ if (!serverLock.compareAndSet(null, lock)) {
+ throw new IllegalStateException("ServiceLock already set on
ServerContext");
+ }
+ }
+
+ public ServiceLock getServiceLock() {
+ return serverLock.get();
+ }
+
+ /** Intended to be called from MiniAccumuloClusterImpl only as can be
restarted **/
+ public void clearServiceLock() {
+ serverLock.set(null);
+ }
+
public MetricsInfo getMetricsInfo() {
return metricsInfoSupplier.get();
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
index a6393d4235..977aa98afb 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/AbstractTabletStateStore.java
@@ -23,7 +23,6 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
@@ -33,6 +32,7 @@ import
org.apache.accumulo.core.metadata.schema.TabletMetadata;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.Location;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.LocationType;
import org.apache.accumulo.core.tabletserver.log.LogEntry;
+import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +45,7 @@ public abstract class AbstractTabletStateStore implements
TabletStateStore {
private final Ample ample;
- protected AbstractTabletStateStore(ClientContext context) {
+ protected AbstractTabletStateStore(ServerContext context) {
this.ample = context.getAmple();
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
index f69f280ed4..972d6e927e 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/MetaDataStateStore.java
@@ -31,6 +31,7 @@ import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.ConditionalResult.Status;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.server.ServerContext;
import com.google.common.base.Preconditions;
@@ -41,7 +42,7 @@ class MetaDataStateStore extends AbstractTabletStateStore
implements TabletState
private final Ample ample;
private final DataLevel level;
- protected MetaDataStateStore(DataLevel level, ClientContext context, String
targetTableName) {
+ protected MetaDataStateStore(DataLevel level, ServerContext context, String
targetTableName) {
super(context);
this.level = level;
this.context = context;
@@ -49,7 +50,7 @@ class MetaDataStateStore extends AbstractTabletStateStore
implements TabletState
this.targetTableName = targetTableName;
}
- MetaDataStateStore(DataLevel level, ClientContext context) {
+ MetaDataStateStore(DataLevel level, ServerContext context) {
this(level, context, AccumuloTable.METADATA.tableName());
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
index 91bde4fe20..254f655bea 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/manager/state/RootTabletStateStore.java
@@ -20,17 +20,17 @@ package org.apache.accumulo.server.manager.state;
import java.util.List;
-import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.manager.state.TabletManagement;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
+import org.apache.accumulo.server.ServerContext;
import com.google.common.base.Preconditions;
class RootTabletStateStore extends MetaDataStateStore {
- RootTabletStateStore(DataLevel level, ClientContext context) {
+ RootTabletStateStore(DataLevel level, ServerContext context) {
super(level, context, AccumuloTable.ROOT.tableName());
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
index 6995941089..3e87241c6d 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletMutatorImpl.java
@@ -41,6 +41,7 @@ import org.apache.accumulo.core.data.Condition;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.iterators.SortedFilesIterator;
+import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
@@ -73,6 +74,8 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
private final BiConsumer<KeyExtent,Ample.RejectionHandler>
rejectionHandlerConsumer;
+ private final ServerContext context;
+ private final ServiceLock lock;
private final KeyExtent extent;
private boolean sawOperationRequirement = false;
@@ -87,6 +90,9 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
this.parent = parent;
this.rejectionHandlerConsumer = rejectionHandlerConsumer;
this.extent = extent;
+ this.context = context;
+ this.lock = this.context.getServiceLock();
+ Objects.requireNonNull(this.lock, "ServiceLock not set on ServerContext");
}
@Override
@@ -284,6 +290,7 @@ public class ConditionalTabletMutatorImpl extends
TabletMutatorBase<Ample.Condit
.setValue(encodePrevEndRow(extent.prevEndRow()).get());
mutation.addCondition(c);
}
+ this.putZooLock(context.getZooKeeperRoot(), lock);
getMutation();
mutationConsumer.accept(mutation);
rejectionHandlerConsumer.accept(extent, rejectionCheck);
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/RootTabletMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/RootTabletMutatorImpl.java
index 4448167fb6..04711b5607 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/RootTabletMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/RootTabletMutatorImpl.java
@@ -21,11 +21,13 @@ package org.apache.accumulo.server.metadata;
import static java.nio.charset.StandardCharsets.UTF_8;
import java.util.List;
+import java.util.Objects;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TabletId;
import org.apache.accumulo.core.data.constraints.Constraint;
import org.apache.accumulo.core.dataImpl.TabletIdImpl;
+import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.RootTable;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.RootTabletMetadata;
@@ -39,6 +41,8 @@ import org.slf4j.LoggerFactory;
public class RootTabletMutatorImpl extends
TabletMutatorBase<Ample.TabletMutator>
implements Ample.TabletMutator {
+
+ private final ServiceLock lock;
private final ServerContext context;
private static final Logger log =
LoggerFactory.getLogger(RootTabletMutatorImpl.class);
@@ -75,11 +79,14 @@ public class RootTabletMutatorImpl extends
TabletMutatorBase<Ample.TabletMutator
RootTabletMutatorImpl(ServerContext context) {
super(RootTable.EXTENT);
this.context = context;
+ this.lock = this.context.getServiceLock();
+ Objects.requireNonNull(this.lock, "ServiceLock not set on ServerContext");
}
@Override
public void mutate() {
+ this.putZooLock(this.context.getZooKeeperRoot(), lock);
Mutation mutation = getMutation();
MetadataConstraints metaConstraint = new MetadataConstraints();
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorImpl.java
index 36c0f4196a..0a976a82cc 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/TabletMutatorImpl.java
@@ -18,8 +18,11 @@
*/
package org.apache.accumulo.server.metadata;
+import java.util.Objects;
+
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletMutatorBase;
import org.apache.accumulo.server.ServerContext;
@@ -27,16 +30,22 @@ import org.apache.accumulo.server.ServerContext;
class TabletMutatorImpl extends TabletMutatorBase<Ample.TabletMutator>
implements Ample.TabletMutator {
- private BatchWriter writer;
+ private final ServerContext context;
+ private final ServiceLock lock;
+ private final BatchWriter writer;
TabletMutatorImpl(ServerContext context, KeyExtent extent, BatchWriter
batchWriter) {
super(extent);
+ this.context = context;
+ this.lock = this.context.getServiceLock();
this.writer = batchWriter;
+ Objects.requireNonNull(this.lock, "ServiceLock not set on ServerContext");
}
@Override
public void mutate() {
try {
+ this.putZooLock(this.context.getZooKeeperRoot(), lock);
writer.addMutation(getMutation());
if (closeAfterMutate != null) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
index 53dc257ef9..ed8a835e34 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/MetadataTableUtil.java
@@ -146,7 +146,6 @@ public class MetadataTableUtil {
tablet.putBulkFile(tf, fateId);
newFiles.put(tf.insert(), dfv);
});
- tablet.putZooLock(context.getZooKeeperRoot(), zooLock);
tablet.mutate();
return newFiles;
}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/ZooTabletStateStoreTest.java
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/ZooTabletStateStoreTest.java
index bbae7d3a21..7b52eaac19 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/manager/state/ZooTabletStateStoreTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/manager/state/ZooTabletStateStoreTest.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
@@ -40,10 +41,13 @@ public class ZooTabletStateStoreTest {
@Test
public void testZooTabletStateStore() throws DistributedStoreException {
+
+ ServiceLock lock = EasyMock.createMock(ServiceLock.class);
ServerContext context = MockServerContext.get();
Ample ample = EasyMock.createMock(Ample.class);
expect(context.getAmple()).andReturn(ample).anyTimes();
- EasyMock.replay(context, ample);
+ expect(context.getServiceLock()).andReturn(lock).anyTimes();
+ EasyMock.replay(lock, context, ample);
ZooTabletStateStore tstore = new ZooTabletStateStore(DataLevel.ROOT,
context);
String sessionId = "this is my unique session data";
@@ -60,5 +64,8 @@ public class ZooTabletStateStoreTest {
final List<TabletMetadata> assignmentList1 = List.of(nonRootMeta);
assertThrows(IllegalArgumentException.class, () ->
tstore.unassign(assignmentList1, null));
+
+ EasyMock.verify(lock, context, ample);
+
}
}
diff --git
a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
index e4d6bd51ff..c141eba549 100644
---
a/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
+++
b/server/base/src/test/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImplTest.java
@@ -32,8 +32,11 @@ import
org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.ConditionalMutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.fate.zookeeper.ZooUtil.LockID;
+import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
+import org.apache.accumulo.server.ServerContext;
import org.apache.hadoop.io.Text;
import org.easymock.EasyMock;
import org.junit.jupiter.api.Test;
@@ -48,9 +51,10 @@ public class ConditionalTabletsMutatorImplTest {
private int attempt = 0;
- public
TestConditionalTabletsMutator(List<Function<Text,ConditionalWriter.Status>>
statuses,
+ public TestConditionalTabletsMutator(ServerContext context,
+ List<Function<Text,ConditionalWriter.Status>> statuses,
Map<KeyExtent,TabletMetadata> failedExtents) {
- super(null);
+ super(context);
this.statuses = statuses;
this.failedExtents = failedExtents;
}
@@ -88,6 +92,15 @@ public class ConditionalTabletsMutatorImplTest {
@Test
public void testRejectionHandler() {
+ ServerContext context = EasyMock.createMock(ServerContext.class);
+
EasyMock.expect(context.getZooKeeperRoot()).andReturn("/some/path").anyTimes();
+ ServiceLock lock = EasyMock.createMock(ServiceLock.class);
+ LockID lid = EasyMock.createMock(LockID.class);
+ EasyMock.expect(lock.getLockID()).andReturn(lid).anyTimes();
+
EasyMock.expect(lid.serialize("/some/path/")).andReturn("/some/path/1234").anyTimes();
+ EasyMock.expect(context.getServiceLock()).andReturn(lock).anyTimes();
+ EasyMock.replay(context, lock, lid);
+
// this test checks the handling of conditional mutations that return a
status of unknown and
// rejected
@@ -126,8 +139,8 @@ public class ConditionalTabletsMutatorImplTest {
var statuses2 = Map.of(ke1.toMetaRow(), ConditionalWriter.Status.REJECTED,
ke2.toMetaRow(),
ConditionalWriter.Status.REJECTED);
- try (var mutator =
- new TestConditionalTabletsMutator(List.of(statuses1::get,
statuses2::get), failedExtents)) {
+ try (var mutator = new TestConditionalTabletsMutator(context,
+ List.of(statuses1::get, statuses2::get), failedExtents)) {
mutator.mutateTablet(ke1).requireAbsentOperation().putDirName("dir1")
.submit(tmeta -> tmeta.getDirName().equals("dir1"));
@@ -149,6 +162,9 @@ public class ConditionalTabletsMutatorImplTest {
assertEquals(Ample.ConditionalResult.Status.REJECTED,
results.get(ke2).getStatus());
assertEquals(Ample.ConditionalResult.Status.ACCEPTED,
results.get(ke3).getStatus());
assertEquals(Ample.ConditionalResult.Status.ACCEPTED,
results.get(ke4).getStatus());
+
+ EasyMock.verify(context, lock, tm1, tm2, tm3, tm4, lid);
+
}
}
}
diff --git
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
index 39cb8697f0..a07b934f4c 100644
---
a/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
+++
b/server/compactor/src/main/java/org/apache/accumulo/compactor/Compactor.java
@@ -591,6 +591,7 @@ public class Compactor extends AbstractServer implements
MetricsProducer, Compac
} catch (KeeperException | InterruptedException e) {
throw new RuntimeException("Error registering compactor in ZooKeeper",
e);
}
+ this.getContext().setServiceLock(compactorLock);
MetricsInfo metricsInfo = getContext().getMetricsInfo();
metricsInfo.addServiceTags(getApplicationName(), clientAddress,
getResourceGroup());
diff --git
a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 0a40057890..430031d624 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -88,6 +88,7 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
private final GcCycleMetrics gcCycleMetrics = new GcCycleMetrics();
+ private ServiceLock gcLock;
private NanoTime lastCompactorCheck = NanoTime.now();
SimpleGarbageCollector(ConfigOpts opts, String[] args) {
@@ -165,6 +166,7 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
log.error("{}", ex.getMessage(), ex);
System.exit(1);
}
+ this.getContext().setServiceLock(gcLock);
MetricsInfo metricsInfo = getContext().getMetricsInfo();
metricsInfo.addServiceTags(getApplicationName(), address,
getResourceGroup());
@@ -381,9 +383,8 @@ public class SimpleGarbageCollector extends AbstractServer
implements Iface {
UUID zooLockUUID = UUID.randomUUID();
while (true) {
- ServiceLock lock =
- new ServiceLock(getContext().getZooReaderWriter().getZooKeeper(),
path, zooLockUUID);
- if (lock.tryLock(lockWatcher, new ServiceLockData(zooLockUUID,
addr.toString(),
+ gcLock = new
ServiceLock(getContext().getZooReaderWriter().getZooKeeper(), path,
zooLockUUID);
+ if (gcLock.tryLock(lockWatcher, new ServiceLockData(zooLockUUID,
addr.toString(),
ThriftService.GC, this.getResourceGroup()))) {
log.debug("Got GC ZooKeeper lock");
return;
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 8ec092f5fc..a7bf212243 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -978,6 +978,7 @@ public class Manager extends AbstractServer
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Exception getting manager lock", e);
}
+ this.getContext().setServiceLock(getManagerLock());
// If UpgradeStatus is not at complete by this moment, then things are
currently
// upgrading.
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
index 3e8f1fdc5b..18682789c5 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/TabletGroupWatcher.java
@@ -996,8 +996,6 @@ abstract class TabletGroupWatcher extends
AccumuloDaemonThread {
vr.filesToRemove.forEach(tabletMutator::deleteFile);
vr.filesToAdd.forEach(tabletMutator::putFile);
- tabletMutator.putZooLock(manager.getContext().getZooKeeperRoot(),
manager.getManagerLock());
-
tabletMutator.submit(
tm -> tm.getLogs().containsAll(vr.logsToAdd) &&
tm.getFiles().containsAll(vr.filesToAdd
.keySet().stream().map(ReferencedTabletFile::insert).collect(Collectors.toSet())));
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
index dd6a0986f6..08cfcd194f 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/create/PopulateMetadata.java
@@ -91,7 +91,6 @@ class PopulateMetadata extends ManagerRepo {
tabletMutator.putPrevEndRow(extent.prevEndRow());
tabletMutator.putDirName(dirName);
tabletMutator.putTime(new MetadataTime(0, tableInfo.getTimeType()));
- tabletMutator.putZooLock(context.getZooKeeperRoot(), lock);
tabletMutator.putTabletAvailability(tableInfo.getInitialTabletAvailability());
tabletMutator.mutate();
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
index 9c52e7f51e..8bd5120788 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/merge/MergeTabletsTest.java
@@ -62,6 +62,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.gc.ReferenceFile;
+import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
import org.apache.accumulo.core.metadata.TServerInstance;
@@ -414,6 +415,9 @@ public class MergeTabletsTest {
EasyMock.mock(ConditionalTabletsMutatorImpl.class);
ConditionalTabletMutatorImpl tabletMutator =
EasyMock.mock(ConditionalTabletMutatorImpl.class);
+ ServiceLock managerLock = EasyMock.mock(ServiceLock.class);
+
EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes();
+
// setup reading the tablets
EasyMock.expect(manager.getContext()).andReturn(context).atLeastOnce();
EasyMock.expect(context.getAmple()).andReturn(ample).atLeastOnce();
@@ -456,11 +460,11 @@ public class MergeTabletsTest {
EasyMock.expectLastCall().once();
EasyMock.replay(manager, context, ample, tabletBuilder, tabletsMetadata,
tabletsMutator,
- tabletMutator, cr);
+ tabletMutator, cr, managerLock);
mergeTablets.call(fateId, manager);
EasyMock.verify(manager, context, ample, tabletBuilder, tabletsMetadata,
tabletsMutator,
- tabletMutator, cr);
+ tabletMutator, cr, managerLock);
}
}
diff --git
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
index ffb776f3bf..a317f8375a 100644
---
a/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
+++
b/server/manager/src/test/java/org/apache/accumulo/manager/tableOps/split/UpdateTabletsTest.java
@@ -38,6 +38,7 @@ import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
+import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.SuspendingTServer;
@@ -230,6 +231,9 @@ public class UpdateTabletsTest {
EasyMock.expect(splitter.getCachedFileInfo(tableId,
file4)).andReturn(newFileInfo("d", "j"));
EasyMock.expect(manager.getSplitter()).andReturn(splitter).atLeastOnce();
+ ServiceLock managerLock = EasyMock.mock(ServiceLock.class);
+
EasyMock.expect(context.getServiceLock()).andReturn(managerLock).anyTimes();
+
// Setup the metadata for the tablet that is going to split, set as many
columns as possible on
// it.
TabletMetadata tabletMeta = EasyMock.mock(TabletMetadata.class);
diff --git
a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
index 74ec7b89a5..51be4e13d8 100644
--- a/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
+++ b/server/monitor/src/main/java/org/apache/accumulo/monitor/Monitor.java
@@ -476,6 +476,7 @@ public class Monitor extends AbstractServer implements
HighlyAvailableService {
log.error("Failed to get Monitor ZooKeeper lock");
throw new RuntimeException(e);
}
+ this.getContext().setServiceLock(this.monitorLock);
String advertiseHost = getHostname();
if (advertiseHost.equals("0.0.0.0")) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 77db9e440f..d158d3adc9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -384,6 +384,7 @@ public class ScanServer extends AbstractServer
// We need to set the compaction manager so that we don't get an NPE in
CompactableImpl.close
ServiceLock lock = announceExistence();
+ this.getContext().setServiceLock(lock);
int threadPoolSize =
getConfiguration().getCount(Property.SSERV_WAL_SORT_MAX_CONCURRENT);
if (threadPoolSize > 0) {
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 93f3e58def..7f325c39fa 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -589,6 +589,7 @@ public class TabletServer extends AbstractServer implements
TabletHostingServer
metricsInfo.init();
announceExistence();
+ getContext().setServiceLock(tabletServerLock);
try {
walMarker.initWalMarker(getTabletSession());
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java
index a0b00127af..88d08ec16c 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/ScanfileManager.java
@@ -27,7 +27,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
@@ -62,12 +61,11 @@ class ScanfileManager {
}
static void removeScanFiles(KeyExtent extent, Set<StoredTabletFile>
scanFiles,
- ServerContext context, Location currLocation, ServiceLock zooLock) {
+ ServerContext context, Location currLocation) {
try (var mutator = context.getAmple().conditionallyMutateTablets()) {
var tabletMutator =
mutator.mutateTablet(extent).requireLocation(currLocation);
scanFiles.forEach(tabletMutator::deleteScan);
- tabletMutator.putZooLock(context.getZooKeeperRoot(), zooLock);
tabletMutator
.submit(tabletMetadata -> Collections.disjoint(scanFiles,
tabletMetadata.getScans()));
@@ -138,8 +136,7 @@ class ScanfileManager {
log.debug("Removing scan refs from metadata {} {}",
tablet.getExtent(), filesToDelete);
var currLoc =
Location.current(tablet.getTabletServer().getTabletSession());
- removeScanFiles(tablet.getExtent(), filesToDelete,
tablet.getContext(), currLoc,
- tablet.getTabletServer().getLock());
+ removeScanFiles(tablet.getExtent(), filesToDelete,
tablet.getContext(), currLoc);
}
}
}
@@ -163,8 +160,7 @@ class ScanfileManager {
if (!filesToDelete.isEmpty()) {
log.debug("Removing scan refs from metadata {} {}", tablet.getExtent(),
filesToDelete);
- removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(),
location,
- tablet.getTabletServer().getLock());
+ removeScanFiles(tablet.getExtent(), filesToDelete, tablet.getContext(),
location);
}
}
}
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
index 38cd3cff18..573e3c49f0 100644
---
a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
+++
b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/Tablet.java
@@ -60,7 +60,6 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.file.FilePrefix;
import org.apache.accumulo.core.iterators.SortedKeyValueIterator;
import org.apache.accumulo.core.iteratorsImpl.system.SourceSwitchingIterator;
-import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.logging.TabletLogger;
import org.apache.accumulo.core.manager.state.tables.TableState;
import org.apache.accumulo.core.metadata.AccumuloTable;
@@ -296,13 +295,11 @@ public class Tablet extends TabletBase {
if (entriesUsedOnTablet.get() == 0) {
log.debug("No replayed mutations applied, removing unused walog
entries for {}", extent);
- final ServiceLock zooLock = tabletServer.getLock();
final Location expectedLocation =
Location.future(this.tabletServer.getTabletSession());
try (ConditionalTabletsMutator mutator =
getContext().getAmple().conditionallyMutateTablets()) {
ConditionalTabletMutator mut =
mutator.mutateTablet(extent).requireAbsentOperation()
- .requireLocation(expectedLocation)
- .putZooLock(getContext().getZooKeeperRoot(), zooLock);
+ .requireLocation(expectedLocation);
logEntries.forEach(mut::deleteWal);
mut.submit(tabletMetadata -> tabletMetadata.getLogs().isEmpty());
@@ -491,7 +488,6 @@ public class Tablet extends TabletBase {
.requireSame(lastTabletMetadata, ColumnType.FLUSH_ID);
tablet.putFlushId(tableFlushID);
- tablet.putZooLock(context.getZooKeeperRoot(),
getTabletServer().getLock());
tablet
.submit(tabletMetadata ->
tabletMetadata.getFlushId().orElse(-1) == tableFlushID);
@@ -1371,8 +1367,6 @@ public class Tablet extends TabletBase {
unusedWalLogs.forEach(tablet::deleteWal);
- tablet.putZooLock(getContext().getZooKeeperRoot(),
tabletServer.getLock());
-
// When trying to determine if write was successful, check if the
flush nonce was updated.
// Can not check if the new file exists because of two reasons. First,
it could be compacted
// away between the write and check. Second, some flushes do not
produce a file.
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
index 694f4ee755..ea533fdb48 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/AccumuloClientIT.java
@@ -145,11 +145,11 @@ public class AccumuloClientIT extends
AccumuloClusterHarness {
Scanner scanner;
- assertEquals(0, SingletonManager.getReservationCount());
+ assertEquals(1, SingletonManager.getReservationCount());
assertEquals(Mode.CLIENT, SingletonManager.getMode());
try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
- assertEquals(1, SingletonManager.getReservationCount());
+ assertEquals(2, SingletonManager.getReservationCount());
c.tableOperations().create(tableName);
@@ -165,10 +165,10 @@ public class AccumuloClientIT extends
AccumuloClusterHarness {
// scanner created from closed client should fail
expectClosed(() -> scanner.iterator().next());
- assertEquals(0, SingletonManager.getReservationCount());
+ assertEquals(1, SingletonManager.getReservationCount());
AccumuloClient c = Accumulo.newClient().from(getClientProps()).build();
- assertEquals(1, SingletonManager.getReservationCount());
+ assertEquals(2, SingletonManager.getReservationCount());
// ensure client created after everything was closed works
Scanner scanner2 = c.createScanner(tableName, Authorizations.EMPTY);
@@ -183,7 +183,7 @@ public class AccumuloClientIT extends
AccumuloClusterHarness {
c.close();
- assertEquals(0, SingletonManager.getReservationCount());
+ assertEquals(1, SingletonManager.getReservationCount());
expectClosed(() -> c.createScanner(tableName, Authorizations.EMPTY));
expectClosed(() -> c.createConditionalWriter(tableName));
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
index a94adfbc79..1a711ba8d9 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/SplitIT.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.test.functional;
import static java.util.Collections.singletonMap;
import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.LOCK_COLUMN;
import static
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.ServerColumnFamily.OPID_COLUMN;
import static org.apache.accumulo.core.util.LazySingletons.RANDOM;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -419,7 +420,10 @@ public class SplitIT extends AccumuloClusterHarness {
.stream().collect(MoreCollectors.onlyElement());
assertEquals(extent, tabletMetadata.getExtent());
- tabletMetadata.getKeyValues();
+ // remove the srv:lock column for tests as this will change
+ // because we are changing the metadata from the IT.
+ var original = new TreeMap<>(tabletMetadata.getKeyValues());
+ assertTrue(original.keySet().removeIf(LOCK_COLUMN::hasColumns));
// Split operation should fail because of the unexpected column.
var splits = new TreeSet<>(List.of(new Text("m")));
@@ -433,8 +437,9 @@ public class SplitIT extends AccumuloClusterHarness {
// tablet should have an operation id set, but nothing else changed
var kvCopy = new TreeMap<>(tabletMetadata2.getKeyValues());
+ assertTrue(kvCopy.keySet().removeIf(LOCK_COLUMN::hasColumns));
assertTrue(kvCopy.keySet().removeIf(OPID_COLUMN::hasColumns));
- assertEquals(tabletMetadata.getKeyValues(), kvCopy);
+ assertEquals(original, kvCopy);
// remove the offending columns
tabletMutator = ctx.getAmple().mutateTablet(extent);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
index fdf663e0f3..524464d9fa 100644
---
a/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/functional/SplitRecoveryIT.java
@@ -36,25 +36,17 @@ import java.util.SortedMap;
import java.util.TreeMap;
import java.util.UUID;
-import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.clientImpl.ScannerImpl;
-import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
-import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter;
-import org.apache.accumulo.core.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.core.file.rfile.RFile;
import org.apache.accumulo.core.lock.ServiceLock;
-import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
-import org.apache.accumulo.core.lock.ServiceLock.LockWatcher;
-import org.apache.accumulo.core.lock.ServiceLockData;
-import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
import org.apache.accumulo.core.metadata.StoredTabletFile;
@@ -80,8 +72,6 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.junit.jupiter.api.Test;
-import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
-
public class SplitRecoveryIT extends ConfigurableMacBase {
@Override
@@ -94,33 +84,11 @@ public class SplitRecoveryIT extends ConfigurableMacBase {
prevEndRow == null ? null : new Text(prevEndRow));
}
- private void run(ServerContext c) throws Exception {
- var zPath = ServiceLock.path(c.getZooKeeperRoot() + "/testLock");
- ZooReaderWriter zoo = c.getZooReaderWriter();
- zoo.putPersistentData(zPath.toString(), new byte[0],
NodeExistsPolicy.OVERWRITE);
- ServiceLock zl = new ServiceLock(zoo.getZooKeeper(), zPath,
UUID.randomUUID());
- boolean gotLock = zl.tryLock(new LockWatcher() {
-
- @SuppressFBWarnings(value = "DM_EXIT",
- justification = "System.exit() is a bad idea here, but okay for now,
since it's a test")
- @Override
- public void lostLock(LockLossReason reason) {
- System.exit(-1);
-
- }
+ @Test
+ public void run() throws Exception {
- @SuppressFBWarnings(value = "DM_EXIT",
- justification = "System.exit() is a bad idea here, but okay for now,
since it's a test")
- @Override
- public void unableToMonitorLockNode(Exception e) {
- System.exit(-1);
- }
- }, new ServiceLockData(UUID.randomUUID(), "foo", ThriftService.TSERV,
- Constants.DEFAULT_RESOURCE_GROUP_NAME));
-
- if (!gotLock) {
- System.err.println("Failed to get lock " + zPath);
- }
+ ServerContext c = getCluster().getServerContext();
+ ServiceLock zl = c.getServiceLock();
// run test for a table with one tablet
runSplitRecoveryTest(c, 0, "sp", 0, zl, nke("foo0", null, null));
@@ -164,7 +132,7 @@ public class SplitRecoveryIT extends ConfigurableMacBase {
String dirName = "dir_" + i;
String tdir =
context.getTablesDirs().iterator().next() + "/" + extent.tableId() +
"/" + dirName;
- addTablet(extent, dirName, context, TimeType.LOGICAL, zl);
+ addTablet(extent, dirName, context, TimeType.LOGICAL);
SortedMap<ReferencedTabletFile,DataFileValue> dataFiles = new
TreeMap<>();
dataFiles.put(new ReferencedTabletFile(new Path(tdir + "/" +
RFile.EXTENSION + "_000_000")),
new DataFileValue(1000017 + i, 10000 + i));
@@ -366,27 +334,16 @@ public class SplitRecoveryIT extends ConfigurableMacBase {
}
}
- public static void main(String[] args) throws Exception {
- new SplitRecoveryIT().run(new ServerContext(SiteConfiguration.auto()));
- }
-
- @Test
- public void test() throws Exception {
- assertEquals(0, exec(SplitRecoveryIT.class).waitFor());
- }
-
- public static void addTablet(KeyExtent extent, String path, ServerContext
context,
- TimeType timeType, ServiceLock zooLock) {
+ public void addTablet(KeyExtent extent, String path, ServerContext context,
TimeType timeType) {
TabletMutator tablet = context.getAmple().mutateTablet(extent);
tablet.putPrevEndRow(extent.prevEndRow());
tablet.putDirName(path);
tablet.putTime(new MetadataTime(0, timeType));
- tablet.putZooLock(context.getZooKeeperRoot(), zooLock);
tablet.mutate();
}
- public static void addNewTablet(ServerContext context, KeyExtent extent,
String dirName,
+ public void addNewTablet(ServerContext context, KeyExtent extent, String
dirName,
TServerInstance tServerInstance, Map<StoredTabletFile,DataFileValue>
datafileSizes,
Map<FateId,? extends Collection<ReferencedTabletFile>> bulkLoadedFiles,
MetadataTime time,
long lastFlushID) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index b63aa5f415..16bfc441ca 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -26,7 +26,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
+import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.cli.Help;
import org.apache.accumulo.core.clientImpl.thrift.ClientService;
import org.apache.accumulo.core.clientImpl.thrift.TInfo;
@@ -52,6 +54,13 @@ import org.apache.accumulo.core.dataImpl.thrift.TRowRange;
import org.apache.accumulo.core.dataImpl.thrift.TSummaries;
import org.apache.accumulo.core.dataImpl.thrift.TSummaryRequest;
import org.apache.accumulo.core.dataImpl.thrift.UpdateErrors;
+import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
+import org.apache.accumulo.core.lock.ServiceLock;
+import org.apache.accumulo.core.lock.ServiceLock.AccumuloLockWatcher;
+import org.apache.accumulo.core.lock.ServiceLock.LockLossReason;
+import org.apache.accumulo.core.lock.ServiceLock.ServiceLockPath;
+import org.apache.accumulo.core.lock.ServiceLockData;
+import org.apache.accumulo.core.lock.ServiceLockData.ThriftService;
import org.apache.accumulo.core.manager.thrift.TabletServerStatus;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
@@ -78,6 +87,10 @@ import org.apache.accumulo.server.rpc.ThriftProcessorTypes;
import org.apache.accumulo.server.rpc.ThriftServerType;
import org.apache.thrift.TException;
import org.apache.thrift.TMultiplexedProcessor;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.ZooKeeper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import com.beust.jcommander.Parameter;
import com.google.common.net.HostAndPort;
@@ -89,6 +102,8 @@ import com.google.common.net.HostAndPort;
*/
public class NullTserver {
+ private static final Logger LOG = LoggerFactory.getLogger(NullTserver.class);
+
public static class NullTServerTabletClientHandler
implements TabletServerClientService.Iface,
TabletScanClientService.Iface,
TabletIngestClientService.Iface, TabletManagementClientService.Iface {
@@ -307,29 +322,79 @@ public class NullTserver {
10 * 1024 * 1024, null, null, -1,
context.getConfiguration().getCount(Property.RPC_BACKLOG),
context.getMetricsInfo(), HostAndPort.fromParts("0.0.0.0", opts.port));
- HostAndPort addr =
HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);
+ AccumuloLockWatcher miniLockWatcher = new AccumuloLockWatcher() {
- TableId tableId = context.getTableId(opts.tableName);
+ @Override
+ public void lostLock(LockLossReason reason) {
+ LOG.warn("Lost lock: " + reason.toString());
+ }
- // read the locations for the table
- Range tableRange = new KeyExtent(tableId, null, null).toMetaRange();
- List<Assignment> assignments = new ArrayList<>();
- try (var tablets =
context.getAmple().readTablets().forLevel(DataLevel.USER).build()) {
- long randomSessionID = opts.port;
- TServerInstance instance = new TServerInstance(addr, randomSessionID);
- var s = tablets.iterator();
+ @Override
+ public void unableToMonitorLockNode(Exception e) {
+ LOG.warn("Unable to monitor lock: " + e.getMessage());
+ }
- while (s.hasNext()) {
- TabletMetadata next = s.next();
- assignments.add(new Assignment(next.getExtent(), instance,
next.getLast()));
+ @Override
+ public void acquiredLock() {
+ LOG.debug("Acquired ZooKeeper lock for NullTserver");
}
- }
- // point them to this server
- TabletStateStore store = TabletStateStore.getStoreForLevel(DataLevel.USER,
context);
- store.setLocations(assignments);
- while (true) {
- Thread.sleep(SECONDS.toMillis(10));
+ @Override
+ public void failedToAcquireLock(Exception e) {
+ LOG.warn("Failed to acquire ZK lock for NullTserver, msg: " +
e.getMessage());
+ }
+ };
+
+ ServiceLock miniLock = null;
+ try {
+ ZooKeeper zk = context.getZooReaderWriter().getZooKeeper();
+ UUID nullTServerUUID = UUID.randomUUID();
+ String miniZDirPath = context.getZooKeeperRoot() + "/mini";
+ String miniZInstancePath = miniZDirPath + "/" +
nullTServerUUID.toString();
+ try {
+ context.getZooReaderWriter().putPersistentData(miniZDirPath, new
byte[0],
+ ZooUtil.NodeExistsPolicy.SKIP);
+ context.getZooReaderWriter().putPersistentData(miniZInstancePath, new
byte[0],
+ ZooUtil.NodeExistsPolicy.SKIP);
+ } catch (KeeperException | InterruptedException e) {
+ throw new IllegalStateException("Error creating path in ZooKeeper", e);
+ }
+ ServiceLockPath path = ServiceLock.path(miniZInstancePath);
+ ServiceLockData sld = new ServiceLockData(nullTServerUUID, "localhost",
ThriftService.TSERV,
+ Constants.DEFAULT_RESOURCE_GROUP_NAME);
+ miniLock = new ServiceLock(zk, path, UUID.randomUUID());
+ miniLock.lock(miniLockWatcher, sld);
+ context.setServiceLock(miniLock);
+ HostAndPort addr =
HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);
+
+ TableId tableId = context.getTableId(opts.tableName);
+
+ // read the locations for the table
+ Range tableRange = new KeyExtent(tableId, null, null).toMetaRange();
+ List<Assignment> assignments = new ArrayList<>();
+ try (var tablets =
context.getAmple().readTablets().forLevel(DataLevel.USER).build()) {
+ long randomSessionID = opts.port;
+ TServerInstance instance = new TServerInstance(addr, randomSessionID);
+ var s = tablets.iterator();
+
+ while (s.hasNext()) {
+ TabletMetadata next = s.next();
+ assignments.add(new Assignment(next.getExtent(), instance,
next.getLast()));
+ }
+ }
+ // point them to this server
+ final ServiceLock lock = miniLock;
+ TabletStateStore store =
TabletStateStore.getStoreForLevel(DataLevel.USER, context);
+ store.setLocations(assignments);
+
+ while (true) {
+ Thread.sleep(SECONDS.toMillis(10));
+ }
+
+ } finally {
+ if (miniLock != null) {
+ miniLock.unlock();
+ }
}
}
}