cshannon commented on code in PR #5404:
URL: https://github.com/apache/accumulo/pull/5404#discussion_r2007458472
##########
core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java:
##########
@@ -393,6 +387,113 @@ public FateInstanceType type() {
return fateInstanceType;
}
+ private class BatchSeeder implements Seeder<T> {
+ private final AtomicBoolean closed = new AtomicBoolean(false);
+
+ private final
Map<FateId,Pair<FateMutator<T>,CompletableFuture<Optional<FateId>>>> pending =
+ new HashMap<>();
+
+ @Override
+ public CompletableFuture<Optional<FateId>>
attemptToSeedTransaction(FateOperation fateOp,
+ FateKey fateKey, Repo<T> repo, boolean autoCleanUp) {
+ Preconditions.checkState(!closed.get(), "Can't attempt to seed with a
closed seeder.");
+
+ final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
+ // If not already submitted, add to the pending list and return the
future
+ // or the existing future if duplicate. The pending map will store the
mutator
+ // to be processed on close in a one batch.
+ return pending.computeIfAbsent(fateId, id -> {
+ FateMutator<T> mutator = seedTransaction(fateOp, fateKey, fateId,
repo, autoCleanUp);
+ CompletableFuture<Optional<FateId>> future = new CompletableFuture<>();
+ return new Pair<>(mutator, future);
+ }).getSecond();
+ }
+
+ @Override
+ public void close() {
+ closed.set(true);
+
+ int maxAttempts = 5;
+
+ // This loop will submit all the pending mutations as one batch
+ // to a conditional writer and any known results will be removed
+ // from the pending map. Unknown results will be re-attempted up
+ // to the maxAttempts count
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
Review Comment:
Good catch, i'll fix that
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]