This is an automated email from the ASF dual-hosted git repository.
cshannon pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 5d3ccec0df Enable background refresh for the scan server tablet
metadata cache (#4551)
5d3ccec0df is described below
commit 5d3ccec0dfed42667608a574ae2cb4b12ffa1987
Author: Christopher L. Shannon <[email protected]>
AuthorDate: Sat May 25 08:07:35 2024 -0400
Enable background refresh for the scan server tablet metadata cache (#4551)
This adds a property to configure the scan server tablet metadata
Caffeine cache to refresh cached tablet metadata in the background on
cache hits after the refresh time has passed. The refresh time is
expressed as a percentage of the expiration time. This allows the cached
entries to refresh before expiration if they are frequently used so that
scans will not be blocked waiting on a refresh on expiration. Entries
still expire if no cache hits come after the refresh time and expiration
time passes.
See: https://github.com/ben-manes/caffeine/wiki/Refresh
This closes #4544
---
.../org/apache/accumulo/core/conf/Property.java | 11 +++-
.../org/apache/accumulo/tserver/ScanServer.java | 37 +++++++++++-
.../test/ScanServerConcurrentTabletScanIT.java | 69 ++++++++++++++++++++--
3 files changed, 107 insertions(+), 10 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index fc5a52f239..fe4b8f4c18 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -446,9 +446,18 @@ public enum Property {
"Specifies a default blocksize for the scan server caches.", "2.1.0"),
@Experimental
SSERV_CACHED_TABLET_METADATA_EXPIRATION("sserver.cache.metadata.expiration",
"5m",
- PropertyType.TIMEDURATION, "The time after which cached tablet metadata
will be refreshed.",
+ PropertyType.TIMEDURATION,
+ "The time after which cached tablet metadata will be expired if not
previously refreshed.",
"2.1.0"),
@Experimental
+
SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT("sserver.cache.metadata.refresh.percent",
".75",
+ PropertyType.FRACTION,
+ "The time after which cached tablet metadata will be refreshed,
expressed as a "
+ + "percentage of the expiration time. Cache hits after this time,
but before the "
+ + "expiration time, will trigger a background refresh for future
hits. "
+ + "Value must be less than 100%. Set to 0 will disable refresh.",
+ "2.1.3"),
+ @Experimental
SSERV_PORTSEARCH("sserver.port.search", "true", PropertyType.BOOLEAN,
"if the ports above are in use, search higher ports until one is
available.", "2.1.0"),
@Experimental
diff --git
a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
index 2ddb76e2cb..3b52ecf0fd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
@@ -39,6 +39,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
@@ -179,6 +180,7 @@ public class ScanServer extends AbstractServer
private UUID serverLockUUID;
private final TabletMetadataLoader tabletMetadataLoader;
private final LoadingCache<KeyExtent,TabletMetadata> tabletMetadataCache;
+ private final ThreadPoolExecutor tmCacheExecutor;
// tracks file reservations that are in the process of being added or
removed from the metadata
// table
private final Set<StoredTabletFile> influxFiles = new HashSet<>();
@@ -242,14 +244,38 @@ public class ScanServer extends AbstractServer
if (cacheExpiration == 0L) {
LOG.warn("Tablet metadata caching disabled, may cause excessive scans on
metadata table.");
tabletMetadataCache = null;
+ tmCacheExecutor = null;
} else {
if (cacheExpiration < 60000) {
LOG.warn(
"Tablet metadata caching less than one minute, may cause excessive
scans on metadata table.");
}
- tabletMetadataCache =
- Caffeine.newBuilder().expireAfterWrite(cacheExpiration,
TimeUnit.MILLISECONDS)
-
.scheduler(Scheduler.systemScheduler()).recordStats().build(tabletMetadataLoader);
+
+ // Get the cache refresh percentage property
+ // Value must be less than 100% as 100 or over would effectively disable
it
+ double cacheRefreshPercentage =
+
getConfiguration().getFraction(Property.SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT);
+ Preconditions.checkArgument(cacheRefreshPercentage < cacheExpiration,
+ "Tablet metadata cache refresh percentage is '%s' but must be less
than 1",
+ cacheRefreshPercentage);
+
+ tmCacheExecutor =
context.threadPools().getPoolBuilder("scanServerTmCache").numCoreThreads(8)
+ .enableThreadPoolMetrics().build();
+ var builder = Caffeine.newBuilder().expireAfterWrite(cacheExpiration,
TimeUnit.MILLISECONDS)
+
.scheduler(Scheduler.systemScheduler()).executor(tmCacheExecutor).recordStats();
+ if (cacheRefreshPercentage > 0) {
+ // Compute the refresh time as a percentage of the expiration time
+ // Cache hits after this time, but before expiration, will trigger a
background
+ // non-blocking refresh of the entry so future cache hits get an
updated entry
+ // without having to block for a refresh
+ long cacheRefresh = (long) (cacheExpiration * cacheRefreshPercentage);
+ LOG.debug("Tablet metadata refresh percentage set to {}, refresh time
set to {} ms",
+ cacheRefreshPercentage, cacheRefresh);
+ builder.refreshAfterWrite(cacheRefresh, TimeUnit.MILLISECONDS);
+ } else {
+ LOG.warn("Tablet metadata cache refresh disabled, may cause blocking
on cache expiration.");
+ }
+ tabletMetadataCache = builder.build(tabletMetadataLoader);
}
delegate = newThriftScanClientHandler(new WriteTracker());
@@ -413,6 +439,11 @@ public class ScanServer extends AbstractServer
LOG.warn("Failed to close filesystem : {}", e.getMessage(), e);
}
+ if (tmCacheExecutor != null) {
+ LOG.debug("Shutting down TabletMetadataCache executor");
+ tmCacheExecutor.shutdownNow();
+ }
+
gcLogger.logGCInfo(getConfiguration());
LOG.info("stop requested. exiting ... ");
try {
diff --git
a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
index beea10ba26..8e22b9e203 100644
---
a/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/ScanServerConcurrentTabletScanIT.java
@@ -46,6 +46,7 @@ import
org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
+import org.apache.accumulo.test.util.Wait;
import org.apache.zookeeper.KeeperException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -81,7 +82,7 @@ public class ScanServerConcurrentTabletScanIT extends
SharedMiniClusterBase {
SharedMiniClusterBase.stopMiniCluster();
}
- private void startScanServer(boolean cacheEnabled)
+ private void startScanServer(String cacheExpiration, String cacheRefresh)
throws IOException, KeeperException, InterruptedException {
String zooRoot = getCluster().getServerContext().getZooKeeperRoot();
@@ -91,8 +92,8 @@ public class ScanServerConcurrentTabletScanIT extends
SharedMiniClusterBase {
SharedMiniClusterBase.getCluster().getClusterControl().stop(ServerType.SCAN_SERVER);
Map<String,String> overrides = new HashMap<>();
- overrides.put(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION.getKey(),
- cacheEnabled ? "300m" : "0m");
+ overrides.put(Property.SSERV_CACHED_TABLET_METADATA_EXPIRATION.getKey(),
cacheExpiration);
+
overrides.put(Property.SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT.getKey(),
cacheRefresh);
SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER,
overrides,
1);
while (zrw.getChildren(scanServerRoot).size() == 0) {
@@ -102,9 +103,31 @@ public class ScanServerConcurrentTabletScanIT extends
SharedMiniClusterBase {
}
@Test
- public void testScanSameTabletDifferentDataTabletMetadataCacheEnabled()
throws Exception {
+ public void
testScanSameTabletDifferentDataTmCacheEnabledRefreshNotTriggered() throws
Exception {
+ // Set the cache time to 10 minutes so it won't expire
+ // Set the cache refresh to 50%, which is 5 minutes so a refresh won't be
triggered
+ startScanServer("10m", ".5");
+ testScanSameTabletDifferentDataTabletMetadataCacheEnabled(false);
+ }
+
+ @Test
+ public void testScanSameTabletDifferentDataTmCacheEnabledRefreshTriggered()
throws Exception {
+ // Set the cache time to 10 minutes so it won't expire
+ // Set the cache refresh to 6ms, so a second hit after 6ms will trigger a
background refresh
+ // .00001 * 10m (600000ms) = 6 ms
+ startScanServer("10m", ".00001");
+ testScanSameTabletDifferentDataTabletMetadataCacheEnabled(true);
+ }
+
+ @Test
+ public void testScanSameTabletDifferentDataTmCacheEnabledRefreshDisabled()
throws Exception {
+ // Set the cache time to 10 minutes so it won't expire and disable the
refresh entirely
+ startScanServer("5m", "0");
+ testScanSameTabletDifferentDataTabletMetadataCacheEnabled(false);
+ }
- startScanServer(true);
+ private void
testScanSameTabletDifferentDataTabletMetadataCacheEnabled(boolean shouldRefresh)
+ throws Exception {
Properties clientProperties = getClientProps();
clientProperties.put(ClientProperty.SCANNER_BATCH_SIZE.getKey(), "100");
@@ -135,7 +158,15 @@ public class ScanServerConcurrentTabletScanIT extends
SharedMiniClusterBase {
// Ingest another 100 k/v with a different column family
final int secondBatchOfEntriesCount = ingest(client, tableName, 10, 10,
0, "COLB", true);
+ // Add a sleep that is long enough that the configured refresh interval
passes if
+ // the test has been set to use one.
+ Thread.sleep(1000);
+
// iter2 should read 1000 k/v because the tablet metadata is cached.
+ // This call will trigger a cache refresh for the entry in the
background if refresh is
+ // enabled.
+ // This current iter2 scan will still return the old data (1000 k/v) as
the new value
+ // won't be visible until the reload finishes
Iterator<Entry<Key,Value>> iter2 = scanner1.iterator();
int count2 = 0;
boolean useIter1 = true;
@@ -157,6 +188,22 @@ public class ScanServerConcurrentTabletScanIT extends
SharedMiniClusterBase {
assertEquals(firstBatchOfEntriesCount, count1);
assertEquals(firstBatchOfEntriesCount, count2);
+ // If a refresh was done this should see 1100 entries
+ // Keep scanning until updated value is seen
+ if (shouldRefresh) {
+ // Count the number of entries for the third iterator to test if
+ // refresh worked depending on configs
+ Wait.waitFor(
+ () -> countEntries(scanner1) == firstBatchOfEntriesCount +
secondBatchOfEntriesCount,
+ 10000, 500);
+ } else {
+ // There's not a great way to test the case of things not refreshing
as the value
+ // should just be the same, so sleep for a period of time that should
be longer than
+ // the refresh
+ Thread.sleep(1000);
+ assertEquals(firstBatchOfEntriesCount, countEntries(scanner1));
+ }
+
scanner1.close();
// A new scan should read all 1100 entries
@@ -167,10 +214,20 @@ public class ScanServerConcurrentTabletScanIT extends
SharedMiniClusterBase {
}
}
+ private int countEntries(Scanner scanner) {
+ int count = 0;
+ Iterator<Entry<Key,Value>> iter = scanner.iterator();
+ while (iter.hasNext()) {
+ iter.next();
+ count++;
+ }
+ return count;
+ }
+
@Test
public void testScanSameTabletDifferentDataTabletMetadataCacheDisabled()
throws Exception {
- startScanServer(false);
+ startScanServer("0m", "0");
Properties clientProperties = getClientProps();
clientProperties.put(ClientProperty.SCANNER_BATCH_SIZE.getKey(), "100");