This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch elasticity
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/elasticity by this push:
new 262d96cc65 always uses conditional writer interceptor in test ample
(#4792)
262d96cc65 is described below
commit 262d96cc654e7b9270a9c59d1196ae9c1d502778
Author: Keith Turner <[email protected]>
AuthorDate: Fri Aug 9 12:26:33 2024 -0400
always uses conditional writer interceptor in test ample (#4792)
Ample has two ways to write conditional mutations. TestAmple was
only intercepting conditional mutations for one of these methods.
This commit updates TestAmple to incercept both.
Also made TestAmple set more aggressive retry times for failed
conditional muations. This speeds up all of the FlakyAmple ITs.
This change is important because more conditional mutations are
failing with these changes and the those tests were running
slower.
---
.../AsyncConditionalTabletsMutatorImpl.java | 21 ++++++++-------------
.../metadata/ConditionalTabletsMutatorImpl.java | 10 +++++++---
.../accumulo/server/metadata/ServerAmpleImpl.java | 3 ++-
.../accumulo/test/ample/metadata/TestAmple.java | 13 +++++++++++--
4 files changed, 28 insertions(+), 19 deletions(-)
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
index fb3a43dcd7..ac5d186356 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/AsyncConditionalTabletsMutatorImpl.java
@@ -25,38 +25,33 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Consumer;
-import java.util.function.Function;
+import java.util.function.Supplier;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.metadata.schema.Ample;
-import org.apache.accumulo.core.metadata.schema.Ample.DataLevel;
import org.apache.accumulo.core.util.threads.Threads;
-import org.apache.accumulo.server.ServerContext;
import com.google.common.annotations.VisibleForTesting;
public class AsyncConditionalTabletsMutatorImpl implements
Ample.AsyncConditionalTabletsMutator {
private final Consumer<Ample.ConditionalResult> resultsConsumer;
private final ExecutorService executor;
+ private final Supplier<Ample.ConditionalTabletsMutator> mutatorFactory;
private Future<Map<KeyExtent,Ample.ConditionalResult>> backgroundProcessing
= null;
- private ConditionalTabletsMutatorImpl bufferingMutator;
- private final ServerContext context;
+ private Ample.ConditionalTabletsMutator bufferingMutator;
private long mutatedTablets = 0;
public static final int BATCH_SIZE = 1000;
- private final Function<DataLevel,String> tableMapper;
@VisibleForTesting
- public AsyncConditionalTabletsMutatorImpl(ServerContext context,
- Function<DataLevel,String> tableMapper,
Consumer<Ample.ConditionalResult> resultsConsumer) {
+ public AsyncConditionalTabletsMutatorImpl(Consumer<Ample.ConditionalResult>
resultsConsumer,
+ Supplier<Ample.ConditionalTabletsMutator> mutatorFactory) {
this.resultsConsumer = Objects.requireNonNull(resultsConsumer);
- this.context = context;
- this.bufferingMutator = new ConditionalTabletsMutatorImpl(context,
tableMapper);
+ this.mutatorFactory = mutatorFactory;
+ this.bufferingMutator = mutatorFactory.get();
var creatorId = Thread.currentThread().getId();
this.executor = Executors.newSingleThreadExecutor(runnable ->
Threads.createThread(
"Async conditional tablets mutator background thread, created by : #"
+ creatorId,
runnable));
- this.tableMapper = Objects.requireNonNull(tableMapper);
-
}
@Override
@@ -80,7 +75,7 @@ public class AsyncConditionalTabletsMutatorImpl implements
Ample.AsyncConditiona
return result;
});
- bufferingMutator = new ConditionalTabletsMutatorImpl(context,
tableMapper);
+ bufferingMutator = mutatorFactory.get();
mutatedTablets = 0;
}
mutatedTablets++;
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
index 11412e2e2a..3dbdfb5842 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ConditionalTabletsMutatorImpl.java
@@ -163,6 +163,12 @@ public class ConditionalTabletsMutatorImpl implements
Ample.ConditionalTabletsMu
}
}
+ protected Retry createUnknownRetry() {
+ return Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
+
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5)
+ .logInterval(Duration.ofMinutes(3)).createRetry();
+ }
+
private Iterator<ConditionalWriter.Result> writeMutations(ConditionalWriter
conditionalWriter) {
var results = conditionalWriter.write(mutations.iterator());
@@ -175,9 +181,7 @@ public class ConditionalTabletsMutatorImpl implements
Ample.ConditionalTabletsMu
while (!unknownResults.isEmpty()) {
try {
if (retry == null) {
- retry =
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(100))
-
.incrementBy(Duration.ofMillis(100)).maxWait(Duration.ofSeconds(2)).backOffFactor(1.5)
- .logInterval(Duration.ofMinutes(3)).createRetry();
+ retry = createUnknownRetry();
}
retry.waitForNextAttempt(log, "handle conditional mutations with
unknown status");
} catch (InterruptedException e) {
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
index c136e4503e..7cc3dd1de4 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/ServerAmpleImpl.java
@@ -99,7 +99,8 @@ public class ServerAmpleImpl extends AmpleImpl implements
Ample {
@Override
public AsyncConditionalTabletsMutator
conditionallyMutateTablets(Consumer<ConditionalResult> resultsConsumer) {
- return new AsyncConditionalTabletsMutatorImpl(context, getTableMapper(),
resultsConsumer);
+ return new AsyncConditionalTabletsMutatorImpl(resultsConsumer,
+ () -> new ConditionalTabletsMutatorImpl(context, getTableMapper()));
}
private void mutateRootGcCandidates(Consumer<RootGcCandidates> mutator) {
diff --git
a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java
b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java
index 99dc332d3a..0b1c2b9e84 100644
--- a/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java
+++ b/test/src/main/java/org/apache/accumulo/test/ample/metadata/TestAmple.java
@@ -18,6 +18,7 @@
*/
package org.apache.accumulo.test.ample.metadata;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
@@ -56,6 +57,7 @@ import
org.apache.accumulo.core.metadata.schema.TabletsMetadata;
import org.apache.accumulo.core.metadata.schema.TabletsMetadata.TableOptions;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.util.ColumnFQ;
+import org.apache.accumulo.core.util.Retry;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.metadata.AsyncConditionalTabletsMutatorImpl;
import org.apache.accumulo.server.metadata.ConditionalTabletsMutatorImpl;
@@ -122,8 +124,8 @@ public class TestAmple {
@Override
public AsyncConditionalTabletsMutator
conditionallyMutateTablets(Consumer<ConditionalResult>
resultsConsumer) {
- return new AsyncConditionalTabletsMutatorImpl(getContext(),
getTableMapper(),
- resultsConsumer);
+ return new AsyncConditionalTabletsMutatorImpl(resultsConsumer,
+ () -> conditionallyMutateTablets(cwInterceptor.get()));
}
@Override
@@ -200,6 +202,13 @@ public class TestAmple {
return new ConditionalTabletsMutatorImpl(getContext(), tables::get) {
+ @Override
+ protected Retry createUnknownRetry() {
+ return
Retry.builder().infiniteRetries().retryAfter(Duration.ofMillis(3))
+
.incrementBy(Duration.ofMillis(3)).maxWait(Duration.ofMillis(50)).backOffFactor(1.5)
+ .logInterval(Duration.ofMinutes(3)).createRetry();
+ }
+
@Override
protected ConditionalWriter createConditionalWriter(Ample.DataLevel
dataLevel)
throws TableNotFoundException {