This is an automated email from the ASF dual-hosted git repository.
ddanielr pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new 17717d9c27 adds debugging context to shared writers in coodinator
(#5560)
17717d9c27 is described below
commit 17717d9c27a74111786406369a9f68d7a1d17f3c
Author: Keith Turner <[email protected]>
AuthorDate: Tue May 20 13:50:44 2025 -0400
adds debugging context to shared writers in coodinator (#5560)
---
.../accumulo/coordinator/CompactionFinalizer.java | 3 ++-
.../apache/accumulo/coordinator/SharedBatchWriter.java | 18 ++++++++++++------
2 files changed, 14 insertions(+), 7 deletions(-)
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 2efb9e718b..6b12c775c0 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -107,7 +107,8 @@ public class CompactionFinalizer {
private SharedBatchWriter getWriter(ExternalCompactionId ecid) {
return writers.computeIfAbsent(ecid.getFirstUUIDChar(),
- (i) -> new SharedBatchWriter(Ample.DataLevel.USER.metaTable(),
context, queueSize));
+ (prefix) -> new SharedBatchWriter(Ample.DataLevel.USER.metaTable(),
prefix, context,
+ queueSize / 16));
}
public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent,
long fileSize,
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
index 012ee6c4ea..baf21be1d3 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
@@ -33,6 +33,8 @@ import org.apache.accumulo.server.ServerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.base.Preconditions;
+
/**
* This class supports the use case of many threads writing a single mutation
to a table. It avoids
* each thread creating its own batch writer which creates threads and makes 3
RPCs to write the
@@ -40,6 +42,7 @@ import org.slf4j.LoggerFactory;
*/
public class SharedBatchWriter {
private static final Logger log =
LoggerFactory.getLogger(SharedBatchWriter.class);
+ private final Character prefix;
private static class Work {
private final Mutation mutation;
@@ -55,12 +58,14 @@ public class SharedBatchWriter {
private final String table;
private final ServerContext context;
- public SharedBatchWriter(String table, ServerContext context, int queueSize)
{
+ public SharedBatchWriter(String table, Character prefix, ServerContext
context, int queueSize) {
+ Preconditions.checkArgument(queueSize > 0, "illegal queue size %s",
queueSize);
this.table = table;
+ this.prefix = prefix;
this.context = context;
this.mutations = new ArrayBlockingQueue<>(queueSize);
- var thread =
- Threads.createCriticalThread("shared batch writer for " + table,
this::processMutations);
+ var thread = Threads.createCriticalThread(
+ "shared batch writer for " + table + " prefix:" + prefix,
this::processMutations);
thread.start();
}
@@ -93,11 +98,12 @@ public class SharedBatchWriter {
writer.addMutation(work.mutation);
}
writer.flush();
- log.trace("Wrote {} mutations in {}ms", batch.size(),
timer.elapsed(TimeUnit.MILLISECONDS));
+ log.trace("Wrote {} mutations in {}ms for prefix {}", batch.size(),
+ timer.elapsed(TimeUnit.MILLISECONDS), prefix);
batch.forEach(work -> work.future.complete(null));
} catch (TableNotFoundException | MutationsRejectedException e) {
- log.debug("Failed to process {} mutations in {}ms", batch.size(),
- timer.elapsed(TimeUnit.MILLISECONDS), e);
+ log.debug("Failed to process {} mutations in {}ms for prefix {}",
batch.size(),
+ timer.elapsed(TimeUnit.MILLISECONDS), prefix, e);
batch.forEach(work -> work.future.completeExceptionally(e));
}
}