This is an automated email from the ASF dual-hosted git repository.
domgarguilo pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 7636a8b975 Parallelize split seeding in TableOperationImpl.addSplits()
(#4700)
7636a8b975 is described below
commit 7636a8b975a24e12e1b5f96c0c83e03b59f512a6
Author: Dom G <[email protected]>
AuthorDate: Thu Jun 27 15:30:01 2024 -0400
Parallelize split seeding in TableOperationImpl.addSplits() (#4700)
* parallelize the starting and waiting fate operation steps
---
.../core/clientImpl/TableOperationsImpl.java | 100 ++++++++++++---------
1 file changed, 57 insertions(+), 43 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
index 213e8ba291..780177d68e 100644
---
a/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
+++
b/core/src/main/java/org/apache/accumulo/core/clientImpl/TableOperationsImpl.java
@@ -67,6 +67,8 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
@@ -484,7 +486,14 @@ public class TableOperationsImpl extends
TableOperationsHelper {
ClientTabletCache tabLocator = ClientTabletCache.getInstance(context,
tableId);
- SortedSet<Text> splitsTodo = new TreeSet<>(splits);
+ SortedSet<Text> splitsTodo = Collections.synchronizedSortedSet(new
TreeSet<>(splits));
+
+ final ByteBuffer EMPTY = ByteBuffer.allocate(0);
+
+ ExecutorService startExecutor =
+
context.threadPools().getPoolBuilder("addSplitsStart").numCoreThreads(16).build();
+ ExecutorService waitExecutor =
+
context.threadPools().getPoolBuilder("addSplitsWait").numCoreThreads(16).build();
while (!splitsTodo.isEmpty()) {
@@ -493,59 +502,64 @@ public class TableOperationsImpl extends
TableOperationsHelper {
Map<KeyExtent,List<Text>> tabletSplits =
mapSplitsToTablets(tableName, tableId, tabLocator, splitsTodo);
- ArrayList<Pair<TFateId,List<Text>>> opids = new
ArrayList<>(tabletSplits.size());
-
- final ByteBuffer EMPTY = ByteBuffer.allocate(0);
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
// begin the fate operation for each tablet without waiting for the
operation to complete
for (Entry<KeyExtent,List<Text>> splitsForTablet :
tabletSplits.entrySet()) {
- var extent = splitsForTablet.getKey();
+ CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
+ var extent = splitsForTablet.getKey();
- List<ByteBuffer> args = new ArrayList<>();
-
args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8)));
- args.add(extent.endRow() == null ? EMPTY :
TextUtil.getByteBuffer(extent.endRow()));
- args.add(extent.prevEndRow() == null ? EMPTY :
TextUtil.getByteBuffer(extent.prevEndRow()));
- splitsForTablet.getValue().forEach(split ->
args.add(TextUtil.getByteBuffer(split)));
+ List<ByteBuffer> args = new ArrayList<>();
+
args.add(ByteBuffer.wrap(extent.tableId().canonical().getBytes(UTF_8)));
+ args.add(extent.endRow() == null ? EMPTY :
TextUtil.getByteBuffer(extent.endRow()));
+ args.add(
+ extent.prevEndRow() == null ? EMPTY :
TextUtil.getByteBuffer(extent.prevEndRow()));
+ splitsForTablet.getValue().forEach(split ->
args.add(TextUtil.getByteBuffer(split)));
- try {
- handleFateOperation(() -> {
- TFateInstanceType t =
FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
- TFateId opid = beginFateOperation(t);
- executeFateOperation(opid, FateOperation.TABLE_SPLIT, args,
Map.of(), false);
- opids.add(new Pair<>(opid, splitsForTablet.getValue()));
- return null;
- }, tableName);
- } catch (TableExistsException | NamespaceExistsException |
NamespaceNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
-
- // after all operations have been started, wait for them to complete
- for (Pair<TFateId,List<Text>> entry : opids) {
- final TFateId opid = entry.getFirst();
- final List<Text> completedSplits = entry.getSecond();
+ try {
+ return handleFateOperation(() -> {
+ TFateInstanceType t =
FateInstanceType.fromNamespaceOrTableName(tableName).toThrift();
+ TFateId opid = beginFateOperation(t);
+ executeFateOperation(opid, FateOperation.TABLE_SPLIT, args,
Map.of(), false);
+ return new Pair<>(opid, splitsForTablet.getValue());
+ }, tableName);
+ } catch (TableExistsException | NamespaceExistsException |
NamespaceNotFoundException
+ | AccumuloSecurityException | TableNotFoundException |
AccumuloException e) {
+ throw new RuntimeException(e);
+ }
+ // wait for the fate operation to complete in a separate thread pool
+ }, startExecutor).thenApplyAsync(pair -> {
+ final TFateId opid = pair.getFirst();
+ final List<Text> completedSplits = pair.getSecond();
- try {
- String status = handleFateOperation(() ->
waitForFateOperation(opid), tableName);
+ try {
+ String status = handleFateOperation(() ->
waitForFateOperation(opid), tableName);
- if (SPLIT_SUCCESS_MSG.equals(status)) {
- completedSplits.forEach(splitsTodo::remove);
- }
- } catch (TableExistsException | NamespaceExistsException |
NamespaceNotFoundException e) {
- throw new RuntimeException(e);
- } finally {
- context.clearTableListCache();
- // always finish table op, even when exception
- if (opid != null) {
- try {
- finishFateOperation(opid);
- } catch (Exception e) {
- log.warn("Exception thrown while finishing fate table
operation", e);
+ if (SPLIT_SUCCESS_MSG.equals(status)) {
+ completedSplits.forEach(splitsTodo::remove);
+ }
+ } catch (TableExistsException | NamespaceExistsException |
NamespaceNotFoundException
+ | AccumuloSecurityException | TableNotFoundException |
AccumuloException e) {
+ throw new RuntimeException(e);
+ } finally {
+ // always finish table op, even when exception
+ if (opid != null) {
+ try {
+ finishFateOperation(opid);
+ } catch (Exception e) {
+ log.warn("Exception thrown while finishing fate table
operation", e);
+ }
}
}
- }
+ return null;
+ }, waitExecutor);
+ futures.add(future);
}
+
+ futures.forEach(CompletableFuture::join);
}
+ startExecutor.shutdown();
+ waitExecutor.shutdown();
}
private Map<KeyExtent,List<Text>> mapSplitsToTablets(String tableName,
TableId tableId,