This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/2.1 by this push:
new a0239d7a01 Avoids creating a batch writer per completed ext compaction
(#5543)
a0239d7a01 is described below
commit a0239d7a01693332399032f62280daad8d52fbc2
Author: Keith Turner <[email protected]>
AuthorDate: Mon May 12 10:55:01 2025 -0400
Avoids creating a batch writer per completed ext compaction (#5543)
The compaction coordinator was creating a batch writer per a completed
external compaction. The batch writer was used to write a single small
mutation. This caused a lot of thread creation and RPCs in the
coordinator. Changed the coordinator to use a single batch writer for
these mutations. Also update some logging in the coordinator to lower
levels to trace and add some timing information.
Co-authored-by: Daniel Roberts ddanielr <[email protected]>
---
.../org/apache/accumulo/core/conf/Property.java | 7 +-
.../accumulo/core/metadata/schema/Ample.java | 5 -
.../schema/ExternalCompactionFinalState.java | 8 ++
.../apache/accumulo/core/util/threads/Threads.java | 17 ++++
.../accumulo/server/metadata/ServerAmpleImpl.java | 15 ---
.../accumulo/coordinator/CompactionFinalizer.java | 85 ++++++++++++----
.../accumulo/coordinator/SharedBatchWriter.java | 107 +++++++++++++++++++++
.../TestCompactionCoordinatorForOfflineTable.java | 11 ++-
8 files changed, 213 insertions(+), 42 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index 97cc0319ec..ca73765984 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -1584,6 +1584,10 @@ public enum Property {
"The interval at which to check for external compaction final state
markers in the metadata table.",
"2.1.0"),
@Experimental
+ COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE(
+ "compaction.coordinator.compaction.finalizer.queue.size", "16384",
PropertyType.COUNT,
+ "The number of completed compactions to buffer in memory before
blocking.", "2.1.4"),
+ @Experimental
COMPACTION_COORDINATOR_TSERVER_COMPACTION_CHECK_INTERVAL(
"compaction.coordinator.tserver.check.interval", "1m",
PropertyType.TIMEDURATION,
"The interval at which to check the tservers for external compactions.",
"2.1.0"),
@@ -1928,7 +1932,8 @@ public enum Property {
COMPACTOR_MINTHREADS_TIMEOUT,
// others
- TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES,
MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME);
+ TSERV_NATIVEMAP_ENABLED, TSERV_SCAN_MAX_OPENFILES,
MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME,
+ COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE);
/**
* Checks if the given property may be changed via Zookeeper, but not
recognized until the restart
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
index f7232d7865..60dc6bbbf5 100644
--- a/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
+++ b/core/src/main/java/org/apache/accumulo/core/metadata/schema/Ample.java
@@ -224,11 +224,6 @@ public interface Ample {
throw new UnsupportedOperationException();
}
- default void
-
putExternalCompactionFinalStates(Collection<ExternalCompactionFinalState>
finalStates) {
- throw new UnsupportedOperationException();
- }
-
default Stream<ExternalCompactionFinalState>
getExternalCompactionFinalStates() {
throw new UnsupportedOperationException();
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java
index 9212ccde9f..14c1df57b0 100644
---
a/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java
+++
b/core/src/main/java/org/apache/accumulo/core/metadata/schema/ExternalCompactionFinalState.java
@@ -20,6 +20,7 @@ package org.apache.accumulo.core.metadata.schema;
import java.util.Base64;
+import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.util.TextUtil;
@@ -136,4 +137,11 @@ public class ExternalCompactionFinalState {
public String toString() {
return toJson();
}
+
+ public Mutation toMutation() {
+ String prefix = MetadataSchema.ExternalCompactionSection.getRowPrefix();
+ Mutation m = new Mutation(prefix + getExternalCompactionId().canonical());
+ m.put("", "", toJson());
+ return m;
+ }
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
index 76a4029ad3..3ff86913b0 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
@@ -22,9 +22,13 @@ import java.lang.Thread.UncaughtExceptionHandler;
import java.util.OptionalInt;
import org.apache.accumulo.core.trace.TraceUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class Threads {
+ private static final Logger log = LoggerFactory.getLogger(Threads.class);
+
public static final UncaughtExceptionHandler UEH = new
AccumuloUncaughtExceptionHandler();
public static class AccumuloDaemonThread extends Thread {
@@ -66,4 +70,17 @@ public class Threads {
return thread;
}
+ public static Thread createCriticalThread(String name, Runnable r) {
+ Runnable wrapped = () -> {
+ try {
+ r.run();
+ } catch (RuntimeException e) {
+ System.err.println("Critical thread " + name + " died");
+ e.printStackTrace();
+ Runtime.getRuntime().halt(-1);
+ }
+ };
+
+ return createThread(name, wrapped);
+ }
}
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index 472aa0a9d7..857853311c 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -294,21 +294,6 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
return delFlag;
}
- @Override
- public void
-
putExternalCompactionFinalStates(Collection<ExternalCompactionFinalState>
finalStates) {
- try (BatchWriter writer =
context.createBatchWriter(DataLevel.USER.metaTable())) {
- String prefix = ExternalCompactionSection.getRowPrefix();
- for (ExternalCompactionFinalState finalState : finalStates) {
- Mutation m = new Mutation(prefix +
finalState.getExternalCompactionId().canonical());
- m.put("", "", finalState.toJson());
- writer.addMutation(m);
- }
- } catch (MutationsRejectedException | TableNotFoundException e) {
- throw new RuntimeException(e);
- }
- }
-
@Override
public Stream<ExternalCompactionFinalState>
getExternalCompactionFinalStates() {
Scanner scanner;
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
index 3ca9791c70..d61f265c4c 100644
---
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionFinalizer.java
@@ -36,10 +36,13 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
import
org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -52,6 +55,7 @@ import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.TraceUtil;
+import org.apache.accumulo.core.util.Timer;
import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.apache.thrift.TException;
@@ -67,10 +71,14 @@ public class CompactionFinalizer {
private final ExecutorService backgroundExecutor;
private final BlockingQueue<ExternalCompactionFinalState>
pendingNotifications;
private final long tserverCheckInterval;
+ private final SharedBatchWriter sharedBatchWriter;
protected CompactionFinalizer(ServerContext context,
ScheduledThreadPoolExecutor schedExecutor) {
this.context = context;
- this.pendingNotifications = new ArrayBlockingQueue<>(1000);
+ var queueSize =
+
context.getConfiguration().getCount(Property.COMPACTION_COORDINATOR_FINALIZER_QUEUE_SIZE);
+
+ this.pendingNotifications = new ArrayBlockingQueue<>(queueSize);
tserverCheckInterval = this.context.getConfiguration()
.getTimeInMillis(Property.COMPACTION_COORDINATOR_FINALIZER_COMPLETION_CHECK_INTERVAL);
@@ -91,6 +99,9 @@ public class CompactionFinalizer {
ThreadPools.watchCriticalScheduledTask(schedExecutor.scheduleWithFixedDelay(
this::notifyTservers, 0, tserverCheckInterval, TimeUnit.MILLISECONDS));
+
+ this.sharedBatchWriter =
+ new SharedBatchWriter(Ample.DataLevel.USER.metaTable(), context,
queueSize);
}
public void commitCompaction(ExternalCompactionId ecid, KeyExtent extent,
long fileSize,
@@ -99,41 +110,61 @@ public class CompactionFinalizer {
var ecfs =
new ExternalCompactionFinalState(ecid, extent, FinalState.FINISHED,
fileSize, fileEntries);
- LOG.debug("Initiating commit for external compaction: {}", ecfs);
+ LOG.trace("Initiating commit for external compaction: {} {}", ecid, ecfs);
// write metadata entry
- context.getAmple().putExternalCompactionFinalStates(List.of(ecfs));
+ Timer timer = Timer.startNew();
+ sharedBatchWriter.write(ecfs.toMutation());
+ LOG.trace("{} metadata compation state write completed in {}ms", ecid,
+ timer.elapsed(TimeUnit.MILLISECONDS));
if (!pendingNotifications.offer(ecfs)) {
- LOG.debug("Queue full, notification to tablet server will happen later
{}.", ecfs);
+ LOG.trace("Queue full, notification to tablet server will happen later
{}.", ecid);
} else {
- LOG.debug("Queued tserver notification for completed external
compaction: {}", ecfs);
+ LOG.trace("Queued tserver notification for completed external
compaction: {}", ecid);
}
}
public void failCompactions(Map<ExternalCompactionId,KeyExtent>
compactionsToFail) {
+ if (compactionsToFail.size() == 1) {
+ var e = compactionsToFail.entrySet().iterator().next();
+ var ecfs =
+ new ExternalCompactionFinalState(e.getKey(), e.getValue(),
FinalState.FAILED, 0L, 0L);
+ sharedBatchWriter.write(ecfs.toMutation());
+ } else {
+ try (BatchWriter writer =
context.createBatchWriter(Ample.DataLevel.USER.metaTable())) {
+ for (var e : compactionsToFail.entrySet()) {
+ var ecfs =
+ new ExternalCompactionFinalState(e.getKey(), e.getValue(),
FinalState.FAILED, 0L, 0L);
+ writer.addMutation(ecfs.toMutation());
+ }
+ } catch (MutationsRejectedException | TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
- var finalStates = compactionsToFail.entrySet().stream().map(
- e -> new ExternalCompactionFinalState(e.getKey(), e.getValue(),
FinalState.FAILED, 0L, 0L))
- .collect(Collectors.toList());
-
- context.getAmple().putExternalCompactionFinalStates(finalStates);
-
- finalStates.forEach(pendingNotifications::offer);
+ for (var e : compactionsToFail.entrySet()) {
+ var ecfs =
+ new ExternalCompactionFinalState(e.getKey(), e.getValue(),
FinalState.FAILED, 0L, 0L);
+ if (!pendingNotifications.offer(ecfs)) {
+ break;
+ }
+ }
}
private void notifyTserver(Location loc, ExternalCompactionFinalState ecfs) {
TabletClientService.Client client = null;
+ Timer timer = Timer.startNew();
try {
client = ThriftUtil.getClient(ThriftClientTypes.TABLET_SERVER,
loc.getHostAndPort(), context);
if (ecfs.getFinalState() == FinalState.FINISHED) {
- LOG.debug("Notifying tserver {} that compaction {} has finished.",
loc, ecfs);
+ LOG.trace("Notifying tserver {} that compaction {} has finished.",
loc, ecfs);
client.compactionJobFinished(TraceUtil.traceInfo(), context.rpcCreds(),
ecfs.getExternalCompactionId().canonical(),
ecfs.getExtent().toThrift(),
ecfs.getFileSize(), ecfs.getEntries());
} else if (ecfs.getFinalState() == FinalState.FAILED) {
- LOG.debug("Notifying tserver {} that compaction {} with {} has
failed.", loc,
+ LOG.trace("Notifying tserver {} that compaction {} with {} has
failed.", loc,
ecfs.getExternalCompactionId(), ecfs);
client.compactionJobFailed(TraceUtil.traceInfo(), context.rpcCreds(),
ecfs.getExternalCompactionId().canonical(),
ecfs.getExtent().toThrift());
@@ -145,6 +176,8 @@ public class CompactionFinalizer {
} finally {
ThriftUtil.returnClient(client, context);
}
+ LOG.trace("Tserver {} notification of {} {} took {}ms", loc,
ecfs.getExternalCompactionId(),
+ ecfs, timer.elapsed(TimeUnit.MILLISECONDS));
}
private void processPending() {
@@ -155,16 +188,21 @@ public class CompactionFinalizer {
batch.add(pendingNotifications.take());
pendingNotifications.drainTo(batch);
+ LOG.trace("Processing pending of batch size {}", batch.size());
+
List<Future<?>> futures = new ArrayList<>();
List<ExternalCompactionId> statusesToDelete = new ArrayList<>();
Map<KeyExtent,TabletMetadata> tabletsMetadata;
var extents =
batch.stream().map(ExternalCompactionFinalState::getExtent).collect(toList());
+ Timer timer = Timer.startNew();
try (TabletsMetadata tablets =
context.getAmple().readTablets().forTablets(extents)
.fetch(ColumnType.LOCATION, ColumnType.PREV_ROW,
ColumnType.ECOMP).build()) {
tabletsMetadata =
tablets.stream().collect(toMap(TabletMetadata::getExtent, identity()));
}
+ LOG.trace("Metadata scan completed in {}ms for batch size {}, found
{}",
+ timer.elapsed(TimeUnit.MILLISECONDS), batch.size(),
tabletsMetadata.size());
for (ExternalCompactionFinalState ecfs : batch) {
@@ -190,10 +228,14 @@ public class CompactionFinalizer {
}
if (!statusesToDelete.isEmpty()) {
- LOG.info(
- "Deleting unresolvable completed external compactions from
metadata table, ids: {}",
- statusesToDelete);
+ timer.restart();
context.getAmple().deleteExternalCompactionFinalStates(statusesToDelete);
+ LOG.info(
+ "Deleted unresolvable completed external compactions from
metadata table, ids: {} in {}ms",
+ statusesToDelete.size(), timer.elapsed(TimeUnit.MILLISECONDS));
+ for (var ecid : statusesToDelete) {
+ LOG.debug("Deleted unresolvable completed external compaction {}",
ecid);
+ }
}
for (Future<?> future : futures) {
@@ -214,14 +256,19 @@ public class CompactionFinalizer {
}
private void notifyTservers() {
+ Timer timer = Timer.startNew();
try (var finalStatesStream =
context.getAmple().getExternalCompactionFinalStates()) {
+ int count = 0;
Iterator<ExternalCompactionFinalState> finalStates =
finalStatesStream.iterator();
while (finalStates.hasNext()) {
ExternalCompactionFinalState state = finalStates.next();
- LOG.debug("Found external compaction in final state: {}, queueing for
tserver notification",
+ count++;
+ LOG.trace("Found external compaction in final state: {}, queueing for
tserver notification",
state);
pendingNotifications.put(state);
}
+ LOG.trace("Added {} final compaction states to notification queue in
{}ms", count,
+ timer.elapsed(TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
diff --git
a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
new file mode 100644
index 0000000000..b25fd5f64e
--- /dev/null
+++
b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/SharedBatchWriter.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.coordinator;
+
+import java.util.ArrayList;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.accumulo.core.client.BatchWriterConfig;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
+import org.apache.accumulo.core.data.Mutation;
+import org.apache.accumulo.core.util.Timer;
+import org.apache.accumulo.core.util.threads.Threads;
+import org.apache.accumulo.server.ServerContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class supports the use case of many threads writing a single mutation
to a table. It avoids
+ * each thread creating its own batch writer which creates threads and makes 3
RPCs to write the
+ * single mutation. Using this class results in much less thread creation and
RPCs.
+ */
+public class SharedBatchWriter {
+ private static final Logger log =
LoggerFactory.getLogger(SharedBatchWriter.class);
+
+ private static class Work {
+ private final Mutation mutation;
+ private final CompletableFuture<Void> future;
+
+ private Work(Mutation mutation) {
+ this.mutation = mutation;
+ this.future = new CompletableFuture<>();
+ }
+ }
+
+ private final BlockingQueue<Work> mutations;
+ private final String table;
+ private final ServerContext context;
+
+ public SharedBatchWriter(String table, ServerContext context, int queueSize)
{
+ this.table = table;
+ this.context = context;
+ this.mutations = new ArrayBlockingQueue<>(queueSize);
+ var thread =
+ Threads.createCriticalThread("shared batch writer for " + table,
this::processMutations);
+ thread.start();
+ }
+
+ public void write(Mutation m) {
+ try {
+ var work = new Work(m);
+ mutations.put(work);
+ work.future.join();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private void processMutations() {
+ Timer timer = Timer.startNew();
+ while (true) {
+ ArrayList<Work> batch = new ArrayList<>();
+ try {
+ batch.add(mutations.take());
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new IllegalStateException(e);
+ }
+
+ var config = new BatchWriterConfig().setMaxWriteThreads(16);
+ try (var writer = context.createBatchWriter(table, config)) {
+ mutations.drainTo(batch);
+ timer.restart();
+ for (var work : batch) {
+ writer.addMutation(work.mutation);
+ }
+ writer.flush();
+ log.trace("Wrote {} mutations in {}ms", batch.size(),
timer.elapsed(TimeUnit.MILLISECONDS));
+ batch.forEach(work -> work.future.complete(null));
+ } catch (TableNotFoundException | MutationsRejectedException e) {
+ log.debug("Failed to process {} mutations in {}ms", batch.size(),
+ timer.elapsed(TimeUnit.MILLISECONDS), e);
+ batch.forEach(work -> work.future.completeExceptionally(e));
+ }
+ }
+ }
+}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
index 48e86a5b67..a3cc78905e 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/TestCompactionCoordinatorForOfflineTable.java
@@ -18,13 +18,16 @@
*/
package org.apache.accumulo.test.compaction;
-import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.accumulo.coordinator.CompactionCoordinator;
import org.apache.accumulo.coordinator.CompactionFinalizer;
+import org.apache.accumulo.core.client.BatchWriter;
+import org.apache.accumulo.core.client.MutationsRejectedException;
+import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.compaction.thrift.CompactionCoordinatorService;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.core.metadata.schema.Ample;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState;
import
org.apache.accumulo.core.metadata.schema.ExternalCompactionFinalState.FinalState;
import org.apache.accumulo.core.metadata.schema.ExternalCompactionId;
@@ -55,7 +58,11 @@ public class TestCompactionCoordinatorForOfflineTable
extends CompactionCoordina
// write metadata entry
LOG.info("Writing completed external compaction to metadata table: {}",
ecfs);
- context.getAmple().putExternalCompactionFinalStates(List.of(ecfs));
+ try (BatchWriter writer =
context.createBatchWriter(Ample.DataLevel.USER.metaTable())) {
+ writer.addMutation(ecfs.toMutation());
+ } catch (MutationsRejectedException | TableNotFoundException e) {
+ throw new RuntimeException(e);
+ }
// queue RPC if queue is not full
LOG.info("Skipping tserver notification for completed external
compaction: {}", ecfs);