This is an automated email from the ASF dual-hosted git repository.
ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo-testing.git
The following commit(s) were added to refs/heads/main by this push:
new ed97a2f fixes sorting in bulk batch writer (#289)
ed97a2f is described below
commit ed97a2f81e617428158e24552ad3a96e0460a875
Author: Keith Turner <[email protected]>
AuthorDate: Tue Dec 10 15:32:30 2024 -0500
fixes sorting in bulk batch writer (#289)
BulkBatchWriter was sorting only on the mutation row. Occasionally
continuous ingest would generate the same 64 bit id twice in the same
batch and if its columns were not sorted properly then the write to the
rfile would fail.
This fixes the problem by sorting on the entire key instead of only the
row.
---
.../testing/continuous/BulkBatchWriter.java | 49 ++++++++++++++--------
1 file changed, 32 insertions(+), 17 deletions(-)
diff --git
a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
index 6e763b3..b5b68f9 100644
--- a/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
+++ b/src/main/java/org/apache/accumulo/testing/continuous/BulkBatchWriter.java
@@ -18,8 +18,11 @@
*/
package org.apache.accumulo.testing.continuous;
+import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Deque;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
@@ -32,6 +35,7 @@ import
org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.rfile.RFile;
import org.apache.accumulo.core.client.rfile.RFileWriter;
import org.apache.accumulo.core.data.Key;
+import org.apache.accumulo.core.data.KeyValue;
import org.apache.accumulo.core.data.LoadPlan;
import org.apache.accumulo.core.data.Mutation;
import org.apache.hadoop.fs.FileSystem;
@@ -47,7 +51,7 @@ public class BulkBatchWriter implements BatchWriter {
private static final Logger log =
LoggerFactory.getLogger(BulkBatchWriter.class);
- private final List<Mutation> mutations = new ArrayList<>();
+ private final Deque<Mutation> mutations = new ArrayDeque<>();
private final AccumuloClient client;
private final String tableName;
private final FileSystem fileSystem;
@@ -72,7 +76,7 @@ public class BulkBatchWriter implements BatchWriter {
public synchronized void addMutation(Mutation mutation) throws
MutationsRejectedException {
Preconditions.checkState(!closed);
mutation = new Mutation(mutation);
- mutations.add(mutation);
+ mutations.addLast(mutation);
memUsed += mutation.estimatedMemoryUsed();
if (memUsed > memLimit) {
flush();
@@ -96,7 +100,27 @@ public class BulkBatchWriter implements BatchWriter {
Path tmpDir = new Path(workPath, UUID.randomUUID().toString());
fileSystem.mkdirs(tmpDir);
- mutations.sort((m1, m2) -> Arrays.compare(m1.getRow(), m2.getRow()));
+
+ List<KeyValue> keysValues = new ArrayList<>(mutations.size());
+
+ // remove mutations from the dequeue as we convert them to Keys making
the Mutation objects
+ // available for garbage collection
+ Mutation mutation;
+ while ((mutation = mutations.pollFirst()) != null) {
+ for (var columnUpdate : mutation.getUpdates()) {
+ var builder = Key.builder(false).row(mutation.getRow())
+
.family(columnUpdate.getColumnFamily()).qualifier(columnUpdate.getColumnQualifier())
+ .visibility(columnUpdate.getColumnVisibility());
+ if (columnUpdate.hasTimestamp()) {
+ builder = builder.timestamp(columnUpdate.getTimestamp());
+ }
+ Key key = builder.deleted(columnUpdate.isDeleted()).build();
+ keysValues.add(new KeyValue(key, columnUpdate.getValue()));
+ }
+ }
+
+ Comparator<KeyValue> kvComparator = (kv1, kv2) ->
kv1.getKey().compareTo(kv2.getKey());
+ keysValues.sort(kvComparator);
RFileWriter writer = null;
byte[] currEndRow = null;
@@ -104,14 +128,15 @@ public class BulkBatchWriter implements BatchWriter {
var loadPlanBuilder = LoadPlan.builder();
- for (var mutation : mutations) {
+ for (var keyValue : keysValues) {
+ var key = keyValue.getKey();
if (writer == null
- || (currEndRow != null && Arrays.compare(mutation.getRow(),
currEndRow) > 0)) {
+ || (currEndRow != null &&
Arrays.compare(key.getRowData().toArray(), currEndRow) > 0)) {
if (writer != null) {
writer.close();
}
- var row = new Text(mutation.getRow());
+ var row = key.getRow();
var headSet = splits.headSet(row);
var tabletPrevRow = headSet.isEmpty() ? null : headSet.last();
var tailSet = splits.tailSet(row);
@@ -126,17 +151,7 @@ public class BulkBatchWriter implements BatchWriter {
log.debug("Created new file {} for range {} {}", filename,
tabletPrevRow, tabletEndRow);
}
- for (var colUpdate : mutation.getUpdates()) {
- var key = new Key(mutation.getRow(), colUpdate.getColumnFamily(),
- colUpdate.getColumnQualifier(), colUpdate.getColumnVisibility());
- if (colUpdate.hasTimestamp()) {
- key.setTimestamp(colUpdate.getTimestamp());
- }
- if (colUpdate.isDeleted()) {
- key.setDeleted(true);
- }
- writer.append(key, colUpdate.getValue());
- }
+ writer.append(key, keyValue.getValue());
}
if (writer != null) {