This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new f208a886c5 avoids uneeded work when splitting tablets (#5071)
f208a886c5 is described below
commit f208a886c54bf1307cb7afc1af9efd8d54aadc59
Author: Keith Turner <[email protected]>
AuthorDate: Sat Nov 16 18:09:29 2024 -0500
avoids uneeded work when splitting tablets (#5071)
When tablets needed to split they were queued in a thread pool to seed
a fate operation. When the tablet group watcher ran again if the tablet
was still in queue it would add it again. This would not cause any
correctness issues, but it would cause a lot of wasted work. Also if one
tablet was queued multiple times it could prevent another tablet that
was not queued at all from being added. This made splitting very when a
large number of tablets needed to split very inefficient.
---
.../org/apache/accumulo/core/conf/Property.java | 6 ++--
.../accumulo/manager/split/SeedSplitTask.java | 4 +++
.../apache/accumulo/manager/split/Splitter.java | 42 +++++++++++++++-------
3 files changed, 38 insertions(+), 14 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 6e2fed76af..98412a97d7 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -447,8 +447,10 @@ public enum Property {
+ "indefinitely. Default is 0 to block indefinitely. Only valid when
tserver available "
+ "threshold is set greater than 0.",
"1.10.0"),
- MANAGER_SPLIT_WORKER_THREADS("manager.split.inspection.threadpool.size",
"8", PropertyType.COUNT,
- "The number of threads used to inspect tablets files to find split
points.", "4.0.0"),
+ MANAGER_SPLIT_WORKER_THREADS("manager.split.seed.threadpool.size", "8",
PropertyType.COUNT,
+ "The number of threads used to seed fate split task, the actual split
work is done by fate"
+ + " threads.",
+ "4.0.0"),
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_INITIAL_SIZE(
"manager.compaction.major.service.queue.initial.size", "10000",
PropertyType.COUNT,
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
index 95904229aa..78f08a9471 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
@@ -59,4 +59,8 @@ public class SeedSplitTask implements Runnable {
log.error("Failed to split {}", extent, e);
}
}
+
+ public KeyExtent getExtent() {
+ return extent;
+ }
}
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
index 9a99465820..85b841d1cf 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/split/Splitter.java
@@ -23,8 +23,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -50,6 +50,8 @@ public class Splitter {
private static final Logger LOG = LoggerFactory.getLogger(Splitter.class);
private final ThreadPoolExecutor splitExecutor;
+ // tracks which tablets are queued in splitExecutor
+ private final Set<Text> queuedTablets = ConcurrentHashMap.newKeySet();
public static class FileInfo {
final Text firstRow;
@@ -151,17 +153,10 @@ public class Splitter {
public Splitter(ServerContext context) {
int numThreads =
context.getConfiguration().getCount(Property.MANAGER_SPLIT_WORKER_THREADS);
- // Set up thread pool that constrains the amount of task it queues and
when full discards task.
- // The purpose of this is to avoid reading lots of data into memory if
lots of tablets need to
- // split.
- BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(10000);
+
this.splitExecutor = context.threadPools().getPoolBuilder("split_seeder")
.numCoreThreads(numThreads).numMaxThreads(numThreads).withTimeOut(0L,
TimeUnit.MILLISECONDS)
- .withQueue(queue).enableThreadPoolMetrics().build();
-
- // Discard task when the queue is full, this allows the TGW to continue
processing task other
- // than splits.
- this.splitExecutor.setRejectedExecutionHandler(new
ThreadPoolExecutor.DiscardPolicy());
+ .enableThreadPoolMetrics().build();
Weigher<CacheKey,
FileInfo> weigher = (key, info) -> key.tableId.canonical().length()
@@ -191,6 +186,29 @@ public class Splitter {
}
public void initiateSplit(SeedSplitTask seedSplitTask) {
- splitExecutor.execute(seedSplitTask);
+ // Want to avoid queuing the same tablet multiple times, it would not
cause bugs but would waste
+ // work. Use the metadata row to identify a tablet because the KeyExtent
also includes the prev
+ // end row which may change when splits happen. The metaRow is
conceptually tableId+endRow and
+ // that does not change for a split.
+ Text metaRow = seedSplitTask.getExtent().toMetaRow();
+ int qsize = queuedTablets.size();
+ if (qsize < 10_000 && queuedTablets.add(metaRow)) {
+ Runnable taskWrapper = () -> {
+ try {
+ seedSplitTask.run();
+ } finally {
+ queuedTablets.remove(metaRow);
+ }
+ };
+
+ try {
+ splitExecutor.execute(taskWrapper);
+ } catch (RejectedExecutionException rje) {
+ queuedTablets.remove(metaRow);
+ throw rje;
+ }
+ } else {
+ LOG.trace("Did not add {} to split queue {}", metaRow, qsize);
+ }
}
}