This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new cfae5e9abc fixes slow bulk import with many tablets and file (#5044)
cfae5e9abc is described below
commit cfae5e9abc7447aacc7136777f853fc0842c49de
Author: Keith Turner <[email protected]>
AuthorDate: Fri Nov 8 11:44:10 2024 -0500
fixes slow bulk import with many tablets and file (#5044)
* fixes slow bulk import with many tablets and file
The bulk import code was reading all tablets in the bulk import range
for each range being bulk imported. This resulted in O(N^2) metadata
table scans which made really large bulk imports really slow.
Added a new test that bulk imports thousands of files into thousands of
tablets. Running this test w/o the fixes in this PR the following time
is seen for the fate step.
```
DEBUG: Running LoadFiles.isReady()
FATE:USER:6320e73d-e661-4c66-bf25-c0c27a0a79d5 took 289521 ms and returned 0
```
With this fix in this PR seeing the following times for the new test,
so goes from 290s to 1.2s.
```
DEBUG: Running LoadFiles.isReady()
FATE:USER:18e52fc2-5876-4b01-ba7b-3b3c099a82be took 1225 ms and returned 0
```
This bug does not seem to exists in 2.1 or 3.1. Did not run the test
though, may be worthwhile to backport the test.
---
.../manager/tableOps/bulkVer2/LoadFiles.java | 8 +++-
.../apache/accumulo/test/functional/BulkNewIT.java | 56 ++++++++++++++++++++++
2 files changed, 62 insertions(+), 2 deletions(-)
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
index fa657eb9d8..711387955d 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer2/LoadFiles.java
@@ -338,11 +338,15 @@ class LoadFiles extends ManagerRepo {
TabletsMetadata.builder(manager.getContext()).forTable(tableId).overlapping(startRow,
null)
.checkConsistency().fetch(PREV_ROW, LOCATION, LOADED,
TIME).build()) {
+ // The tablet iterator and load mapping iterator are both iterating over
data that is sorted
+ // in the same way. The two iterators are each independently advanced to
find common points in
+ // the sorted data.
+ var tabletIter = tabletsMetadata.iterator();
+
t1 = System.currentTimeMillis();
while (lmi.hasNext()) {
loadMapEntry = lmi.next();
- List<TabletMetadata> tablets =
- findOverlappingTablets(loadMapEntry.getKey(),
tabletsMetadata.iterator());
+ List<TabletMetadata> tablets =
findOverlappingTablets(loadMapEntry.getKey(), tabletIter);
loader.load(tablets, loadMapEntry.getValue());
}
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
index df256d2499..1a3439919a 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BulkNewIT.java
@@ -35,7 +35,9 @@ import java.nio.file.Paths;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -49,6 +51,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -829,6 +832,59 @@ public class BulkNewIT extends SharedMiniClusterBase {
}
}
+ @Test
+ public void testManyTabletAndFiles() throws Exception {
+ try (AccumuloClient c =
Accumulo.newClient().from(getClientProps()).build()) {
+ String dir = getDir("/testBulkFile-");
+ FileSystem fs = getCluster().getFileSystem();
+ fs.mkdirs(new Path(dir));
+
+ TreeSet<Text> splits = IntStream.range(1,
9000).mapToObj(BulkNewIT::row).map(Text::new)
+ .collect(Collectors.toCollection(TreeSet::new));
+ c.tableOperations().addSplits(tableName, splits);
+
+ var executor = Executors.newFixedThreadPool(16);
+ var futures = new ArrayList<Future<?>>();
+
+ var loadPlanBuilder = LoadPlan.builder();
+ var rowsExpected = new HashSet<>();
+ var imports = IntStream.range(2,
8999).boxed().collect(Collectors.toList());
+ // The order in which imports are added to the load plan should not
matter so test that.
+ Collections.shuffle(imports);
+ for (var data : imports) {
+ String filename = "f" + data + ".";
+ loadPlanBuilder.loadFileTo(filename + RFile.EXTENSION,
RangeType.TABLE, row(data - 1),
+ row(data));
+ var future = executor.submit(() -> {
+ writeData(dir + "/" + filename, aconf, data, data);
+ return null;
+ });
+ futures.add(future);
+ rowsExpected.add(row(data));
+ }
+
+ for (var future : futures) {
+ future.get();
+ }
+
+ executor.shutdown();
+
+ var loadPlan = loadPlanBuilder.build();
+
+
c.tableOperations().importDirectory(dir).to(tableName).plan(loadPlan).load();
+
+ // using a batch scanner can read from lots of tablets w/ less RPCs
+ try (var scanner = c.createBatchScanner(tableName)) {
+ // use a scan server so that tablets do not need to be hosted
+ scanner.setConsistencyLevel(ScannerBase.ConsistencyLevel.EVENTUAL);
+ scanner.setRanges(List.of(new Range()));
+ var rowsSeen = scanner.stream().map(e ->
e.getKey().getRowData().toString())
+ .collect(Collectors.toSet());
+ assertEquals(rowsExpected, rowsSeen);
+ }
+ }
+ }
+
/**
* @return Map w/ keys that are end rows of tablets and the value is a true
when the tablet has a
* current location.