This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 0b220c1d86 Move Scan Server File refs to their own table (#4690)
0b220c1d86 is described below
commit 0b220c1d86e32123697d94e704309fef48c42dbf
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Fri Jun 21 16:48:12 2024 -0400
Move Scan Server File refs to their own table (#4690)
This change moves scan server refs to a separate table from metadata in
order to improve performance. The prefix was dropped as nothing else is
stored in the table anymore.
This is a backport of the change in elasticity that was done in #4650
into 3.1
---
.../accumulo/core/metadata/AccumuloTable.java | 2 +-
.../accumulo/core/metadata/ScanServerRefStore.java | 64 ++++++++++++
.../core/metadata/ScanServerRefTabletFile.java | 4 +-
.../accumulo/core/metadata/schema/Ample.java | 44 +-------
.../core/metadata/schema/MetadataSchema.java | 13 ---
.../MiniAccumuloClusterImplTest.java | 5 +-
.../server/init/FileSystemInitializer.java | 27 ++++-
.../accumulo/server/init/InitialConfiguration.java | 51 ++++++----
.../accumulo/server/init/ZooKeeperInitializer.java | 11 +-
.../server/metadata/ScanServerRefStoreImpl.java | 113 +++++++++++++++++++++
.../accumulo/server/metadata/ServerAmpleImpl.java | 77 ++------------
.../server/util/ScanServerMetadataEntries.java | 8 +-
.../main/java/org/apache/accumulo/gc/GCRun.java | 2 +-
.../org/apache/accumulo/tserver/ScanServer.java | 8 +-
.../java/org/apache/accumulo/test/MetaSplitIT.java | 4 +-
.../org/apache/accumulo/test/NamespacesIT.java | 2 +-
.../test/ScanServerMetadataEntriesCleanIT.java | 8 +-
.../accumulo/test/ScanServerMetadataEntriesIT.java | 38 ++++---
.../apache/accumulo/test/ScanServerShutdownIT.java | 6 +-
.../accumulo/test/functional/WALSunnyDayIT.java | 6 +-
20 files changed, 294 insertions(+), 199 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/AccumuloTable.java
b/core/src/main/java/org/apache/accumulo/core/metadata/AccumuloTable.java
index 14b8b0cf30..929444fb3c 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/AccumuloTable.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/AccumuloTable.java
@@ -26,7 +26,7 @@ import org.apache.accumulo.core.data.TableId;
*/
public enum AccumuloTable {
- ROOT("root", "+r"), METADATA("metadata", "!0");
+ ROOT("root", "+r"), METADATA("metadata", "!0"), SCAN_REF("scanref",
"+scanref");
private final String name;
private final TableId tableId;
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefStore.java
b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefStore.java
new file mode 100644
index 0000000000..461427fcd2
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefStore.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.metadata;
+
+import java.util.Collection;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+public interface ScanServerRefStore {
+
+ /**
+ * Insert ScanServer references to Tablet files
+ *
+ * @param scanRefs set of scan server ref table file objects
+ */
+ default void put(Collection<ScanServerRefTabletFile> scanRefs) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Get ScanServer references to Tablet files
+ *
+ * @return stream of scan server references
+ */
+ default Stream<ScanServerRefTabletFile> list() {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Delete the set of scan server references
+ *
+ * @param refsToDelete set of scan server references to delete
+ */
+ default void delete(Collection<ScanServerRefTabletFile> refsToDelete) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Delete scan server references for this server
+ *
+ * @param serverAddress address of server, cannot be null
+ * @param serverSessionId server session id, cannot be null
+ */
+ default void delete(String serverAddress, UUID serverSessionId) {
+ throw new UnsupportedOperationException();
+ }
+
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java
b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java
index 291343e2e7..50076eaa6f 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/ScanServerRefTabletFile.java
@@ -44,7 +44,7 @@ public class ScanServerRefTabletFile extends
ReferencedTabletFile {
this.colq = colq;
}
- public String getRowSuffix() {
+ public String getRow() {
return this.getNormalizedPathStr();
}
@@ -86,7 +86,7 @@ public class ScanServerRefTabletFile extends
ReferencedTabletFile {
@Override
public String toString() {
- return "ScanServerRefTabletFile [file=" + this.getRowSuffix() + ", server
address=" + colf
+ return "ScanServerRefTabletFile [file=" + this.getRow() + ", server
address=" + colf
+ ", server lock uuid=" + colq + "]";
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index 931e415774..b212356a4a 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.core.metadata.schema;
import java.util.Collection;
import java.util.Iterator;
-import java.util.UUID;
import java.util.stream.Stream;
import org.apache.accumulo.core.data.Mutation;
@@ -31,7 +30,7 @@ import org.apache.accumulo.core.gc.ReferenceFile;
import org.apache.accumulo.core.lock.ServiceLock;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
-import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.ScanServerRefStore;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.TServerInstance;
import org.apache.accumulo.core.metadata.schema.TabletMetadata.ColumnType;
@@ -324,43 +323,6 @@ public interface Ample {
void mutate();
}
- /**
- * Insert ScanServer references to Tablet files
- *
- * @param scanRefs set of scan server ref table file objects
- */
- default void putScanServerFileReferences(Collection<ScanServerRefTabletFile>
scanRefs) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Get ScanServer references to Tablet files
- *
- * @return stream of scan server references
- */
- default Stream<ScanServerRefTabletFile> getScanServerFileReferences() {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Delete the set of scan server references
- *
- * @param refsToDelete set of scan server references to delete
- */
- default void
deleteScanServerFileReferences(Collection<ScanServerRefTabletFile>
refsToDelete) {
- throw new UnsupportedOperationException();
- }
-
- /**
- * Delete scan server references for this server
- *
- * @param serverAddress address of server, cannot be null
- * @param serverSessionId server session id, cannot be null
- */
- default void deleteScanServerFileReferences(String serverAddress, UUID
serverSessionId) {
- throw new UnsupportedOperationException();
- }
-
/**
* Create a Bulk Load In Progress flag in the metadata table
*
@@ -380,6 +342,10 @@ public interface Ample {
throw new UnsupportedOperationException();
}
+ default ScanServerRefStore scanServerRefs() {
+ throw new UnsupportedOperationException();
+ }
+
/**
* Remove all the Bulk Load transaction ids from a given table's metadata
*
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
index 0fd991296c..e4356c5247 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/MetadataSchema.java
@@ -443,17 +443,4 @@ public class MetadataSchema {
return section.getRowPrefix();
}
}
-
- public static class ScanServerFileReferenceSection {
- private static final Section section =
- new Section(RESERVED_PREFIX + "sserv", true, RESERVED_PREFIX +
"sserx", false);
-
- public static Range getRange() {
- return section.getRange();
- }
-
- public static String getRowPrefix() {
- return section.getRowPrefix();
- }
- }
}
diff --git
a/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImplTest.java
b/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImplTest.java
index c8cf747fe9..457d816b98 100644
---
a/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImplTest.java
+++
b/minicluster/src/test/java/org/apache/accumulo/miniclusterImpl/MiniAccumuloClusterImplTest.java
@@ -106,7 +106,8 @@ public class MiniAccumuloClusterImplTest {
@Timeout(60)
public void saneMonitorInfo() throws Exception {
ManagerMonitorInfo stats;
- int expectedNumTables = 3;
+ // Expecting default AccumuloTables + TEST_TABLE
+ int expectedNumTables = AccumuloTable.values().length + 1;
while (true) {
stats = accumulo.getManagerMonitorInfo();
if (stats.tableMap.size() < expectedNumTables) {
@@ -127,6 +128,8 @@ public class MiniAccumuloClusterImplTest {
"root table should exist in " + stats.tableMap.keySet());
assertTrue(stats.tableMap.containsKey(AccumuloTable.METADATA.tableId().canonical()),
"meta table should exist in " + stats.tableMap.keySet());
+
assertTrue(stats.tableMap.containsKey(AccumuloTable.SCAN_REF.tableId().canonical()),
+ "scan ref table should exist in " + stats.tableMap.keySet());
assertTrue(stats.tableMap.containsKey(testTableID),
"our test table should exist in " + stats.tableMap.keySet());
assertNotNull(stats.tServerInfo, "there should be tservers.");
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java
b/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java
index 8fdfa9219e..9db54c8f12 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/init/FileSystemInitializer.java
@@ -31,6 +31,7 @@ import java.util.TreeMap;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.admin.TimeType;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
+import org.apache.accumulo.core.conf.DefaultConfiguration;
import org.apache.accumulo.core.conf.SiteConfiguration;
import org.apache.accumulo.core.crypto.CryptoFactoryLoader;
import org.apache.accumulo.core.data.InstanceId;
@@ -42,6 +43,7 @@ import org.apache.accumulo.core.file.FileOperations;
import org.apache.accumulo.core.file.FileSKVWriter;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.ReferencedTabletFile;
+import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.metadata.schema.MetadataTime;
@@ -100,6 +102,15 @@ class FileSystemInitializer {
String tableMetadataTabletDirUri =
fs.choose(chooserEnv, context.getBaseUris()) +
Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ AccumuloTable.METADATA.tableId() + Path.SEPARATOR +
tableMetadataTabletDirName;
+
+ chooserEnv = new
VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.INIT,
+ AccumuloTable.SCAN_REF.tableId(), null, context);
+ String scanRefTableDefaultTabletDirName =
+
MetadataSchema.TabletsSection.ServerColumnFamily.DEFAULT_TABLET_DIR_NAME;
+ String scanRefTableDefaultTabletDirUri =
+ fs.choose(chooserEnv, context.getBaseUris()) +
Constants.HDFS_TABLES_DIR + Path.SEPARATOR
+ + AccumuloTable.SCAN_REF.tableId() + Path.SEPARATOR +
scanRefTableDefaultTabletDirName;
+
chooserEnv = new
VolumeChooserEnvironmentImpl(VolumeChooserEnvironment.Scope.INIT,
AccumuloTable.METADATA.tableId(), null, context);
String defaultMetadataTabletDirName =
@@ -109,11 +120,20 @@ class FileSystemInitializer {
+ AccumuloTable.METADATA.tableId() + Path.SEPARATOR +
defaultMetadataTabletDirName;
// create table and default tablets directories
- createDirectories(fs, rootTabletDirUri, tableMetadataTabletDirUri,
defaultMetadataTabletDirUri);
+ createDirectories(fs, rootTabletDirUri, tableMetadataTabletDirUri,
defaultMetadataTabletDirUri,
+ scanRefTableDefaultTabletDirUri);
+
+ String ext =
FileOperations.getNewFileExtension(DefaultConfiguration.getInstance());
+
+ // populate the metadata tablet with info about scan ref tablets
+ String metadataFileName = tableMetadataTabletDirUri + Path.SEPARATOR +
"0_1." + ext;
+ Tablet scanRefTablet =
+ new Tablet(AccumuloTable.SCAN_REF.tableId(),
scanRefTableDefaultTabletDirName, null, null);
+ createMetadataFile(fs, metadataFileName, siteConfig, scanRefTablet);
// populate the root tablet with info about the metadata table's two
initial tablets
- Tablet tablesTablet =
- new Tablet(AccumuloTable.METADATA.tableId(),
tableMetadataTabletDirName, null, splitPoint);
+ Tablet tablesTablet = new Tablet(AccumuloTable.METADATA.tableId(),
tableMetadataTabletDirName,
+ null, splitPoint, StoredTabletFile.of(new
Path(metadataFileName)).getMetadata());
Tablet defaultTablet = new Tablet(AccumuloTable.METADATA.tableId(),
defaultMetadataTabletDirName, splitPoint, null);
createMetadataFile(fs, rootTabletFileUri, siteConfig, tablesTablet,
defaultTablet);
@@ -144,6 +164,7 @@ class FileSystemInitializer {
setTableProperties(context, AccumuloTable.ROOT.tableId(),
initConfig.getRootMetaConf());
setTableProperties(context, AccumuloTable.METADATA.tableId(),
initConfig.getRootMetaConf());
setTableProperties(context, AccumuloTable.METADATA.tableId(),
initConfig.getMetaTableConf());
+ setTableProperties(context, AccumuloTable.SCAN_REF.tableId(),
initConfig.getScanRefTableConf());
}
private void setTableProperties(final ServerContext context, TableId tableId,
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
index 77773b1859..52ba4a2eda 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/init/InitialConfiguration.java
@@ -42,38 +42,44 @@ class InitialConfiguration {
private final HashMap<String,String> initialRootMetaConf = new HashMap<>();
// config for only metadata table
private final HashMap<String,String> initialMetaConf = new HashMap<>();
+ // config for only scan ref table
+ private final HashMap<String,String> initialScanRefTableConf = new
HashMap<>();
private final Configuration hadoopConf;
private final SiteConfiguration siteConf;
InitialConfiguration(Configuration hadoopConf, SiteConfiguration siteConf) {
this.hadoopConf = hadoopConf;
this.siteConf = siteConf;
+
+ // config common to all Accumulo tables
+ Map<String,String> commonConfig = new HashMap<>();
+ commonConfig.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(),
"32K");
+ commonConfig.put(Property.TABLE_FILE_REPLICATION.getKey(), "5");
+ commonConfig.put(Property.TABLE_DURABILITY.getKey(), "sync");
+ commonConfig.put(Property.TABLE_MAJC_RATIO.getKey(), "1");
+ commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "scan.vers",
+ "10," + VersioningIterator.class.getName());
+ commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() +
"scan.vers.opt.maxVersions", "1");
+ commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "minc.vers",
+ "10," + VersioningIterator.class.getName());
+ commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() +
"minc.vers.opt.maxVersions", "1");
+ commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() + "majc.vers",
+ "10," + VersioningIterator.class.getName());
+ commonConfig.put(Property.TABLE_ITERATOR_PREFIX.getKey() +
"majc.vers.opt.maxVersions", "1");
+ commonConfig.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
+ commonConfig.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(), "");
+ commonConfig.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(), "true");
+ commonConfig.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(), "true");
+
+ initialRootMetaConf.putAll(commonConfig);
initialRootConf.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
SimpleCompactionDispatcher.class.getName());
initialRootConf.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() +
"service", "root");
-
-
initialRootMetaConf.put(Property.TABLE_FILE_COMPRESSED_BLOCK_SIZE.getKey(),
"32K");
- initialRootMetaConf.put(Property.TABLE_FILE_REPLICATION.getKey(), "5");
- initialRootMetaConf.put(Property.TABLE_DURABILITY.getKey(), "sync");
- initialRootMetaConf.put(Property.TABLE_MAJC_RATIO.getKey(), "1");
initialRootMetaConf.put(Property.TABLE_SPLIT_THRESHOLD.getKey(), "64M");
initialRootMetaConf.put(Property.TABLE_CONSTRAINT_PREFIX.getKey() + "1",
MetadataConstraints.class.getName());
- initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() +
"scan.vers",
- "10," + VersioningIterator.class.getName());
- initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() +
"scan.vers.opt.maxVersions",
- "1");
- initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() +
"minc.vers",
- "10," + VersioningIterator.class.getName());
- initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() +
"minc.vers.opt.maxVersions",
- "1");
- initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() +
"majc.vers",
- "10," + VersioningIterator.class.getName());
- initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() +
"majc.vers.opt.maxVersions",
- "1");
initialRootMetaConf.put(Property.TABLE_ITERATOR_PREFIX.getKey() +
"majc.bulkLoadFilter",
"20," + MetadataBulkLoadFilter.class.getName());
- initialRootMetaConf.put(Property.TABLE_FAILURES_IGNORE.getKey(), "false");
initialRootMetaConf.put(Property.TABLE_LOCALITY_GROUP_PREFIX.getKey() +
"tablet",
String.format("%s,%s",
MetadataSchema.TabletsSection.TabletColumnFamily.NAME,
MetadataSchema.TabletsSection.CurrentLocationColumnFamily.NAME));
@@ -83,14 +89,13 @@ class InitialConfiguration {
MetadataSchema.TabletsSection.ServerColumnFamily.NAME,
MetadataSchema.TabletsSection.FutureLocationColumnFamily.NAME));
initialRootMetaConf.put(Property.TABLE_LOCALITY_GROUPS.getKey(),
"tablet,server");
-
initialRootMetaConf.put(Property.TABLE_DEFAULT_SCANTIME_VISIBILITY.getKey(),
"");
- initialRootMetaConf.put(Property.TABLE_INDEXCACHE_ENABLED.getKey(),
"true");
- initialRootMetaConf.put(Property.TABLE_BLOCKCACHE_ENABLED.getKey(),
"true");
initialMetaConf.put(Property.TABLE_COMPACTION_DISPATCHER.getKey(),
SimpleCompactionDispatcher.class.getName());
initialMetaConf.put(Property.TABLE_COMPACTION_DISPATCHER_OPTS.getKey() +
"service", "meta");
+ initialScanRefTableConf.putAll(commonConfig);
+
int max = hadoopConf.getInt("dfs.replication.max", 512);
// Hadoop 0.23 switched the min value configuration name
int min = Math.max(hadoopConf.getInt("dfs.replication.min", 1),
@@ -130,6 +135,10 @@ class InitialConfiguration {
return initialMetaConf;
}
+ HashMap<String,String> getScanRefTableConf() {
+ return initialScanRefTableConf;
+ }
+
Configuration getHadoopConf() {
return hadoopConf;
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java
b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java
index e861f31452..97b14c26e0 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/init/ZooKeeperInitializer.java
@@ -118,12 +118,11 @@ public class ZooKeeperInitializer {
TableManager.prepareNewNamespaceState(context, Namespace.ACCUMULO.id(),
Namespace.ACCUMULO.name(), ZooUtil.NodeExistsPolicy.FAIL);
- TableManager.prepareNewTableState(context, AccumuloTable.ROOT.tableId(),
- Namespace.ACCUMULO.id(), AccumuloTable.ROOT.tableName(),
TableState.ONLINE,
- ZooUtil.NodeExistsPolicy.FAIL);
- TableManager.prepareNewTableState(context,
AccumuloTable.METADATA.tableId(),
- Namespace.ACCUMULO.id(), AccumuloTable.METADATA.tableName(),
TableState.ONLINE,
- ZooUtil.NodeExistsPolicy.FAIL);
+ for (AccumuloTable table : AccumuloTable.values()) {
+ TableManager.prepareNewTableState(context, table.tableId(),
Namespace.ACCUMULO.id(),
+ table.tableName(), TableState.ONLINE, ZooUtil.NodeExistsPolicy.FAIL);
+ }
+
zoo.putPersistentData(zkInstanceRoot + Constants.ZTSERVERS,
EMPTY_BYTE_ARRAY,
ZooUtil.NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZPROBLEMS,
EMPTY_BYTE_ARRAY,
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ScanServerRefStoreImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ScanServerRefStoreImpl.java
new file mode 100644
index 0000000000..1763b9d099
--- /dev/null
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ScanServerRefStoreImpl.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.metadata;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.Scanner;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.clientImpl.ClientContext;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.metadata.ScanServerRefStore;
+import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.security.Authorizations;
+import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ScanServerRefStoreImpl implements ScanServerRefStore {
+
+ private static Logger log =
LoggerFactory.getLogger(ScanServerRefStoreImpl.class);
+
+ private final ClientContext context;
+ private final String tableName;
+
+ public ScanServerRefStoreImpl(ClientContext context, String tableName) {
+ this.context = context;
+ this.tableName = tableName;
+ }
+
+ @Override
+ public void put(Collection<ScanServerRefTabletFile> scanRefs) {
+ try (BatchWriter writer = context.createBatchWriter(tableName)) {
+ for (ScanServerRefTabletFile ref : scanRefs) {
+ Mutation m = new Mutation(ref.getRow());
+ m.put(ref.getServerAddress(), ref.getServerLockUUID(), ref.getValue());
+ writer.addMutation(m);
+ }
+ } catch (MutationsRejectedException | TableNotFoundException e) {
+ throw new IllegalStateException(
+ "Error inserting scan server file references into " + tableName, e);
+ }
+ }
+
+ @Override
+ public Stream<ScanServerRefTabletFile> list() {
+ try {
+ Scanner scanner = context.createScanner(tableName, Authorizations.EMPTY);
+ return scanner.stream().onClose(scanner::close)
+ .map(e -> new
ScanServerRefTabletFile(e.getKey().getRowData().toString(),
+ e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()));
+ } catch (TableNotFoundException e) {
+ throw new IllegalStateException(tableName + " not found!", e);
+ }
+ }
+
+ @Override
+ public void delete(String serverAddress, UUID scanServerLockUUID) {
+ Objects.requireNonNull(serverAddress, "Server address must be supplied");
+ Objects.requireNonNull(scanServerLockUUID, "Server uuid must be supplied");
+ try (Scanner scanner = context.createScanner(tableName,
Authorizations.EMPTY)) {
+ scanner.fetchColumn(new Text(serverAddress), new
Text(scanServerLockUUID.toString()));
+
+ Set<ScanServerRefTabletFile> refsToDelete =
StreamSupport.stream(scanner.spliterator(), false)
+ .map(e -> new
ScanServerRefTabletFile(e.getKey().getRowData().toString(),
+ e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()))
+ .collect(Collectors.toSet());
+
+ if (!refsToDelete.isEmpty()) {
+ this.delete(refsToDelete);
+ }
+ } catch (TableNotFoundException e) {
+ throw new IllegalStateException(tableName + " not found!", e);
+ }
+ }
+
+ @Override
+ public void delete(Collection<ScanServerRefTabletFile> refsToDelete) {
+ try (BatchWriter writer = context.createBatchWriter(tableName)) {
+ for (ScanServerRefTabletFile ref : refsToDelete) {
+ Mutation m = new Mutation(ref.getRow());
+ m.putDelete(ref.getServerAddress(), ref.getServerLockUUID());
+ writer.addMutation(m);
+ }
+ log.debug("Deleted scan server file reference entries for files: {}",
refsToDelete);
+ } catch (MutationsRejectedException | TableNotFoundException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 195540276c..c11585aff8 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -26,13 +26,8 @@ import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.UUID;
import java.util.function.Consumer;
-import java.util.stream.Collectors;
import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.IsolatedScanner;
@@ -50,7 +45,7 @@ import org.apache.accumulo.core.gc.GcCandidate;
import org.apache.accumulo.core.gc.ReferenceFile;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.RootTable;
-import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
+import org.apache.accumulo.core.metadata.ScanServerRefStore;
import org.apache.accumulo.core.metadata.StoredTabletFile;
import org.apache.accumulo.core.metadata.ValidationUtil;
import org.apache.accumulo.core.metadata.schema.Ample;
@@ -61,7 +56,6 @@ import
org.apache.accumulo.core.metadata.schema.MetadataSchema.BlipSection;
import org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.DeletesSection.SkewedKeyValue;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.ExternalCompactionSection;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
import
org.apache.accumulo.core.metadata.schema.MetadataSchema.TabletsSection.BulkFileColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.server.ServerContext;
@@ -77,10 +71,13 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
private static Logger log = LoggerFactory.getLogger(ServerAmpleImpl.class);
private ServerContext context;
+ private final ScanServerRefStore scanServerRefStore;
public ServerAmpleImpl(ServerContext context) {
super(context);
this.context = context;
+ this.scanServerRefStore =
+ new ScanServerRefStoreImpl(context,
AccumuloTable.SCAN_REF.tableName());
}
@Override
@@ -344,70 +341,8 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
}
@Override
- public void putScanServerFileReferences(Collection<ScanServerRefTabletFile>
scanRefs) {
- try (BatchWriter writer =
context.createBatchWriter(DataLevel.USER.metaTable())) {
- String prefix = ScanServerFileReferenceSection.getRowPrefix();
- for (ScanServerRefTabletFile ref : scanRefs) {
- Mutation m = new Mutation(prefix + ref.getRowSuffix());
- m.put(ref.getServerAddress(), ref.getServerLockUUID(), ref.getValue());
- writer.addMutation(m);
- }
- } catch (MutationsRejectedException | TableNotFoundException e) {
- throw new IllegalStateException(
- "Error inserting scan server file references into " +
DataLevel.USER.metaTable(), e);
- }
- }
-
- @Override
- public Stream<ScanServerRefTabletFile> getScanServerFileReferences() {
- try {
- Scanner scanner = context.createScanner(DataLevel.USER.metaTable(),
Authorizations.EMPTY);
- scanner.setRange(ScanServerFileReferenceSection.getRange());
- int pLen = ScanServerFileReferenceSection.getRowPrefix().length();
- return scanner.stream().onClose(scanner::close)
- .map(e -> new
ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen),
- e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()));
- } catch (TableNotFoundException e) {
- throw new IllegalStateException(DataLevel.USER.metaTable() + " not
found!", e);
- }
- }
-
- @Override
- public void deleteScanServerFileReferences(String serverAddress, UUID
scanServerLockUUID) {
- Objects.requireNonNull(serverAddress, "Server address must be supplied");
- Objects.requireNonNull(scanServerLockUUID, "Server uuid must be supplied");
- try (
- Scanner scanner = context.createScanner(DataLevel.USER.metaTable(),
Authorizations.EMPTY)) {
- scanner.setRange(ScanServerFileReferenceSection.getRange());
- scanner.fetchColumn(new Text(serverAddress), new
Text(scanServerLockUUID.toString()));
-
- int pLen = ScanServerFileReferenceSection.getRowPrefix().length();
- Set<ScanServerRefTabletFile> refsToDelete =
StreamSupport.stream(scanner.spliterator(), false)
- .map(e -> new
ScanServerRefTabletFile(e.getKey().getRowData().toString().substring(pLen),
- e.getKey().getColumnFamily(), e.getKey().getColumnQualifier()))
- .collect(Collectors.toSet());
-
- if (!refsToDelete.isEmpty()) {
- this.deleteScanServerFileReferences(refsToDelete);
- }
- } catch (TableNotFoundException e) {
- throw new IllegalStateException(DataLevel.USER.metaTable() + " not
found!", e);
- }
- }
-
- @Override
- public void
deleteScanServerFileReferences(Collection<ScanServerRefTabletFile>
refsToDelete) {
- try (BatchWriter writer =
context.createBatchWriter(DataLevel.USER.metaTable())) {
- String prefix = ScanServerFileReferenceSection.getRowPrefix();
- for (ScanServerRefTabletFile ref : refsToDelete) {
- Mutation m = new Mutation(prefix + ref.getRowSuffix());
- m.putDelete(ref.getServerAddress(), ref.getServerLockUUID());
- writer.addMutation(m);
- }
- log.debug("Deleted scan server file reference entries for files: {}",
refsToDelete);
- } catch (MutationsRejectedException | TableNotFoundException e) {
- throw new IllegalStateException(e);
- }
+ public ScanServerRefStore scanServerRefs() {
+ return scanServerRefStore;
}
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java
b/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java
index 61e3244c9a..71c2b0dd40 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/util/ScanServerMetadataEntries.java
@@ -39,7 +39,7 @@ public class ScanServerMetadataEntries {
Set<UUID> uuidsToDelete = new HashSet<>();
// collect all uuids that are currently in the metadata table
- context.getAmple().getScanServerFileReferences().forEach(ssrtf -> {
+ context.getAmple().scanServerRefs().list().forEach(ssrtf -> {
uuidsToDelete.add(UUID.fromString(ssrtf.getServerLockUUID().toString()));
});
@@ -55,19 +55,19 @@ public class ScanServerMetadataEntries {
if (!uuidsToDelete.isEmpty()) {
final Set<ScanServerRefTabletFile> refsToDelete = new HashSet<>();
- context.getAmple().getScanServerFileReferences().forEach(ssrtf -> {
+ context.getAmple().scanServerRefs().list().forEach(ssrtf -> {
var uuid = UUID.fromString(ssrtf.getServerLockUUID().toString());
if (uuidsToDelete.contains(uuid)) {
refsToDelete.add(ssrtf);
if (refsToDelete.size() > 5000) {
- context.getAmple().deleteScanServerFileReferences(refsToDelete);
+ context.getAmple().scanServerRefs().delete(refsToDelete);
refsToDelete.clear();
}
}
});
- context.getAmple().deleteScanServerFileReferences(refsToDelete);
+ context.getAmple().scanServerRefs().delete(refsToDelete);
}
}
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
index 4cb6f3b94c..aff958b010 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/GCRun.java
@@ -212,7 +212,7 @@ public class GCRun implements GarbageCollectionEnvironment {
return fileStream;
});
- var scanServerRefs = context.getAmple().getScanServerFileReferences()
+ var scanServerRefs = context.getAmple().scanServerRefs().list()
.map(sfr -> ReferenceFile.forScan(sfr.getTableId(), sfr));
return Stream.concat(tabletReferences, scanServerRefs);
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index cd02951c63..3ba3a1c306 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -429,7 +429,7 @@ public class ScanServer extends AbstractServer
address.server.stop();
LOG.info("Removing server scan references");
-
this.getContext().getAmple().deleteScanServerFileReferences(clientAddress.toString(),
+
this.getContext().getAmple().scanServerRefs().delete(clientAddress.toString(),
serverLockUUID);
try {
@@ -638,7 +638,7 @@ public class ScanServer extends AbstractServer
if (!filesToReserve.isEmpty()) {
scanServerMetrics.recordWriteOutReservationTime(
- () -> getContext().getAmple().putScanServerFileReferences(refs));
+ () -> getContext().getAmple().scanServerRefs().put(refs));
// After we insert the scan server refs we need to check and see if
the tablet is still
// using the file. As long as the tablet is still using the files then
the Accumulo GC
@@ -671,7 +671,7 @@ public class ScanServer extends AbstractServer
if (!filesToReserve.isEmpty()) {
LOG.info("RFFS {} tablet files changed while attempting to reference
files {}",
myReservationId, filesToReserve);
- getContext().getAmple().deleteScanServerFileReferences(refs);
+ getContext().getAmple().scanServerRefs().delete(refs);
scanServerMetrics.incrementReservationConflictCount();
return null;
}
@@ -848,7 +848,7 @@ public class ScanServer extends AbstractServer
if (!confirmed.isEmpty()) {
try {
// Do this metadata operation is done w/o holding the lock
- getContext().getAmple().deleteScanServerFileReferences(refsToDelete);
+ getContext().getAmple().scanServerRefs().delete(refsToDelete);
if (LOG.isTraceEnabled()) {
confirmed.forEach(refToDelete -> LOG.trace(
"RFFS referenced files has not been used recently, removing
reference {}",
diff --git a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
index 2133e4df5e..c9aed19795 100644
--- a/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/MetaSplitIT.java
@@ -191,8 +191,8 @@ public class MetaSplitIT extends AccumuloClusterHarness {
var expectedExtents = tables.entrySet().stream()
.filter(e ->
!e.getKey().startsWith("accumulo.")).map(Map.Entry::getValue).map(TableId::of)
.map(tid -> new KeyExtent(tid, null,
null)).collect(Collectors.toSet());
- // Verify we have 10 tablets for metadata
- assertEquals(10, expectedExtents.size());
+ // Verify we have 11 tablets for metadata
+ assertEquals(11, expectedExtents.size());
// Scan each tablet to verify data exists
var ample = ((ClientContext) client).getAmple();
diff --git a/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java
b/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java
index 3665c48ddc..80d47f23d3 100644
--- a/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/NamespacesIT.java
@@ -134,7 +134,7 @@ public class NamespacesIT extends SharedMiniClusterBase {
c.tableOperations().delete(t);
}
}
- assertEquals(2, c.tableOperations().list().size());
+ assertEquals(3, c.tableOperations().list().size());
for (String n : c.namespaceOperations().list()) {
if (!n.equals(Namespace.ACCUMULO.name()) &&
!n.equals(Namespace.DEFAULT.name())) {
c.namespaceOperations().delete(n);
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java
index a11385d955..7e06a43872 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesCleanIT.java
@@ -63,14 +63,14 @@ public class ScanServerMetadataEntriesCleanIT extends
SharedMiniClusterBase {
ServerContext ctx = getCluster().getServerContext();
- ctx.getAmple().putScanServerFileReferences(scanRefs);
- assertEquals(scanRefs.size(),
ctx.getAmple().getScanServerFileReferences().count());
+ ctx.getAmple().scanServerRefs().put(scanRefs);
+ assertEquals(scanRefs.size(),
ctx.getAmple().scanServerRefs().list().count());
Set<ScanServerRefTabletFile> scanRefs2 =
-
ctx.getAmple().getScanServerFileReferences().collect(Collectors.toSet());
+ ctx.getAmple().scanServerRefs().list().collect(Collectors.toSet());
assertEquals(scanRefs, scanRefs2);
ScanServerMetadataEntries.clean(ctx);
-
assertFalse(ctx.getAmple().getScanServerFileReferences().findAny().isPresent());
+ assertFalse(ctx.getAmple().scanServerRefs().list().findAny().isPresent());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
index ab92e5700f..99d1ce80a8 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerMetadataEntriesIT.java
@@ -51,7 +51,6 @@ import org.apache.accumulo.core.gc.Reference;
import org.apache.accumulo.core.metadata.AccumuloTable;
import org.apache.accumulo.core.metadata.ScanServerRefTabletFile;
import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
-import
org.apache.accumulo.core.metadata.schema.MetadataSchema.ScanServerFileReferenceSection;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.gc.GCRun;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
@@ -118,24 +117,24 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
ServerContext ctx = getCluster().getServerContext();
- ctx.getAmple().putScanServerFileReferences(scanRefs);
- assertEquals(scanRefs.size(),
ctx.getAmple().getScanServerFileReferences().count());
+ ctx.getAmple().scanServerRefs().put(scanRefs);
+ assertEquals(scanRefs.size(),
ctx.getAmple().scanServerRefs().list().count());
Set<ScanServerRefTabletFile> scanRefs2 =
-
ctx.getAmple().getScanServerFileReferences().collect(Collectors.toSet());
+ ctx.getAmple().scanServerRefs().list().collect(Collectors.toSet());
assertEquals(scanRefs, scanRefs2);
// attempt to delete file references then make sure they were deleted
- ctx.getAmple().deleteScanServerFileReferences(server.toString(),
serverLockUUID);
-
assertFalse(ctx.getAmple().getScanServerFileReferences().findAny().isPresent());
+ ctx.getAmple().scanServerRefs().delete(server.toString(), serverLockUUID);
+ assertFalse(ctx.getAmple().scanServerRefs().list().findAny().isPresent());
- ctx.getAmple().putScanServerFileReferences(scanRefs);
- assertEquals(scanRefs.size(),
ctx.getAmple().getScanServerFileReferences().count());
+ ctx.getAmple().scanServerRefs().put(scanRefs);
+ assertEquals(scanRefs.size(),
ctx.getAmple().scanServerRefs().list().count());
// attempt to delete file references then make sure they were deleted
- ctx.getAmple().deleteScanServerFileReferences(scanRefs);
-
assertFalse(ctx.getAmple().getScanServerFileReferences().findAny().isPresent());
+ ctx.getAmple().scanServerRefs().delete(scanRefs);
+ assertFalse(ctx.getAmple().scanServerRefs().list().findAny().isPresent());
}
@Test
@@ -162,12 +161,12 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
assertTrue(iter.hasNext());
assertNotNull(iter.next());
- assertEquals(fileCount,
ctx.getAmple().getScanServerFileReferences().count());
+ assertEquals(fileCount,
ctx.getAmple().scanServerRefs().list().count());
}
// close happens asynchronously. Let the test fail by timeout
- while
(ctx.getAmple().getScanServerFileReferences().findAny().isPresent()) {
+ while (ctx.getAmple().scanServerRefs().list().findAny().isPresent()) {
Thread.sleep(1000);
}
}
@@ -196,12 +195,12 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
assertTrue(iter.hasNext());
assertNotNull(iter.next());
- assertEquals(fileCount,
ctx.getAmple().getScanServerFileReferences().count());
+ assertEquals(fileCount,
ctx.getAmple().scanServerRefs().list().count());
}
// close happens asynchronously. Let the test fail by timeout
- while
(ctx.getAmple().getScanServerFileReferences().findAny().isPresent()) {
+ while (ctx.getAmple().scanServerRefs().list().findAny().isPresent()) {
Thread.sleep(1000);
}
}
@@ -235,8 +234,7 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
List<Entry<Key,Value>> metadataEntries = null;
try (Scanner scanner2 =
- client.createScanner(AccumuloTable.METADATA.tableName(),
Authorizations.EMPTY)) {
- scanner2.setRange(ScanServerFileReferenceSection.getRange());
+ client.createScanner(AccumuloTable.SCAN_REF.tableName(),
Authorizations.EMPTY)) {
metadataEntries =
scanner2.stream().distinct().collect(Collectors.toList());
}
assertEquals(fileCount, metadataEntries.size());
@@ -244,14 +242,12 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
Set<String> metadataScanFileRefs = new HashSet<>();
metadataEntries.forEach(m -> {
- String row = m.getKey().getRow().toString();
- assertTrue(row.startsWith("~sserv"));
- String file =
row.substring(ScanServerFileReferenceSection.getRowPrefix().length());
+ String file = m.getKey().getRow().toString();
metadataScanFileRefs.add(file);
});
assertEquals(fileCount, metadataScanFileRefs.size());
- assertEquals(fileCount,
ctx.getAmple().getScanServerFileReferences().count());
+ assertEquals(fileCount,
ctx.getAmple().scanServerRefs().list().count());
List<Reference> refs;
try (Stream<Reference> references = gc.getReferences()) {
refs = references.collect(Collectors.toList());
@@ -275,7 +271,7 @@ public class ScanServerMetadataEntriesIT extends
SharedMiniClusterBase {
client.tableOperations().delete(tableName);
}
// close happens asynchronously. Let the test fail by timeout
- while (ctx.getAmple().getScanServerFileReferences().findAny().isPresent())
{
+ while (ctx.getAmple().scanServerRefs().list().findAny().isPresent()) {
Thread.sleep(1000);
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
index 6c2d7aac6b..5633474aae 100644
--- a/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/ScanServerShutdownIT.java
@@ -107,7 +107,7 @@ public class ScanServerShutdownIT extends
SharedMiniClusterBase {
for (int i = 0; i < fileCount; i++) {
ScanServerIT.ingest(client, tableName, 10, 10, 0, "colf", true);
}
- assertEquals(0, ctx.getAmple().getScanServerFileReferences().count());
+ assertEquals(0, ctx.getAmple().scanServerRefs().list().count());
for (int i = 0; i < 3; i++) {
try (BatchScanner scanner = client.createBatchScanner(tableName,
Authorizations.EMPTY)) {
@@ -118,7 +118,7 @@ public class ScanServerShutdownIT extends
SharedMiniClusterBase {
assertTrue(iter.hasNext());
assertNotNull(iter.next());
- assertEquals(fileCount,
ctx.getAmple().getScanServerFileReferences().count());
+ assertEquals(fileCount,
ctx.getAmple().scanServerRefs().list().count());
}
}
@@ -127,7 +127,7 @@ public class ScanServerShutdownIT extends
SharedMiniClusterBase {
Wait.waitFor(() -> ((ClientContext) client).getScanServers().size() ==
0);
// The ScanServer should clean up the references on normal shutdown
- Wait.waitFor(() -> ctx.getAmple().getScanServerFileReferences().count()
== 0);
+ Wait.waitFor(() -> ctx.getAmple().scanServerRefs().list().count() == 0);
} finally {
getCluster().getClusterControl().stopAllServers(ServerType.SCAN_SERVER);
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
index 1f3eea1415..2d168f392b 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/WALSunnyDayIT.java
@@ -148,8 +148,10 @@ public class WALSunnyDayIT extends ConfigurableMacBase {
Thread.sleep(SECONDS.toMillis(5));
Map<KeyExtent,List<String>> markers = getRecoveryMarkers(c);
// log.debug("markers " + markers);
- assertEquals(1, markers.size(), "one tablet should have markers");
- assertEquals("1",
markers.keySet().iterator().next().tableId().canonical(),
+ // There should be markers for the created table and also the ScanRef
table
+ assertEquals(2, markers.size(), "two tablets should have markers");
+ assertTrue(
+ markers.keySet().stream().anyMatch(extent ->
extent.tableId().canonical().equals("1")),
"tableId of the keyExtent should be 1");
// put some data in the WAL