This is an automated email from the ASF dual-hosted git repository.
kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 6407c31147 Makes seeding a fate transaction more efficient (#5122)
6407c31147 is described below
commit 6407c31147423f1eacc861477a9d176e746660b0
Author: Keith Turner <[email protected]>
AuthorDate: Mon Dec 9 19:21:28 2024 -0500
Makes seeding a fate transaction more efficient (#5122)
Modified fate to seed fate transaction in single conditional mutation
instead of multiple.
fixes #5097
Co-authored-by: Kevin Rathbun <[email protected]>
---
.../accumulo/core/fate/AbstractFateStore.java | 11 +
.../java/org/apache/accumulo/core/fate/Fate.java | 51 +----
.../org/apache/accumulo/core/fate/FateStore.java | 42 +++-
.../accumulo/core/fate/user/FateMutator.java | 23 ++
.../accumulo/core/fate/user/FateMutatorImpl.java | 38 +++-
.../accumulo/core/fate/user/RowExistsIterator.java | 46 ++++
.../accumulo/core/fate/user/UserFateStore.java | 140 ++++--------
.../core/fate/zookeeper/MetaFateStore.java | 54 ++++-
.../apache/accumulo/core/logging/FateLogger.java | 43 ++--
.../org/apache/accumulo/core/fate/TestStore.java | 11 +-
.../metadata/iterators/SetEncodingIterator.java | 2 +-
.../coordinator/CompactionCoordinator.java | 7 +-
.../accumulo/manager/split/SeedSplitTask.java | 16 +-
.../test/compaction/ExternalCompaction_1_IT.java | 24 +-
.../java/org/apache/accumulo/test/fate/FateIT.java | 4 +
.../org/apache/accumulo/test/fate/FateStoreIT.java | 252 ++++++++++++++++++---
16 files changed, 529 insertions(+), 235 deletions(-)
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
index ff5e45d310..3bc322c3c2 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/AbstractFateStore.java
@@ -69,6 +69,11 @@ public abstract class AbstractFateStore<T> implements
FateStore<T> {
UUID txUUID = UUID.nameUUIDFromBytes(fateKey.getSerialized());
return FateId.from(instanceType, txUUID);
}
+
+ @Override
+ public FateId newRandomId(FateInstanceType instanceType) {
+ return FateId.from(instanceType, UUID.randomUUID());
+ }
};
// The ZooKeeper lock for the process that's running this store instance
@@ -402,6 +407,12 @@ public abstract class AbstractFateStore<T> implements
FateStore<T> {
public interface FateIdGenerator {
FateId fromTypeAndKey(FateInstanceType instanceType, FateKey fateKey);
+
+ FateId newRandomId(FateInstanceType instanceType);
+ }
+
+ protected void seededTx() {
+ unreservedRunnableCount.increment();
}
protected byte[] serializeTxInfo(Serializable so) {
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
index f46cc1aa43..fc405920d5 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/Fate.java
@@ -63,8 +63,6 @@ import org.apache.thrift.TApplicationException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.base.Preconditions;
-
/**
* Fault tolerant executor
*/
@@ -439,57 +437,16 @@ public class Fate<T> {
return store.create();
}
- public Optional<FateId> seedTransaction(String txName, FateKey fateKey,
Repo<T> repo,
- boolean autoCleanUp, String goalMessage) {
-
- Optional<FateTxStore<T>> optTxStore = store.createAndReserve(fateKey);
-
- return optTxStore.map(txStore -> {
- var fateId = txStore.getID();
- try {
- Preconditions.checkState(txStore.getStatus() == NEW);
- seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage,
txStore);
- } finally {
- txStore.unreserve(Duration.ZERO);
- }
- return fateId;
- });
- }
-
- private void seedTransaction(String txName, FateId fateId, Repo<T> repo,
boolean autoCleanUp,
- String goalMessage, FateTxStore<T> txStore) {
- if (txStore.top() == null) {
- try {
- log.info("Seeding {} {}", fateId, goalMessage);
- txStore.push(repo);
- } catch (StackOverflowException e) {
- // this should not happen
- throw new IllegalStateException(e);
- }
- }
-
- if (autoCleanUp) {
- txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
- }
-
- txStore.setTransactionInfo(TxInfo.TX_NAME, txName);
-
- txStore.setStatus(SUBMITTED);
+ public void seedTransaction(String txName, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
+ store.seedTransaction(txName, fateKey, repo, autoCleanUp);
}
// start work in the transaction.. it is safe to call this
// multiple times for a transaction... but it will only seed once
public void seedTransaction(String txName, FateId fateId, Repo<T> repo,
boolean autoCleanUp,
String goalMessage) {
- FateTxStore<T> txStore = store.reserve(fateId);
- try {
- if (txStore.getStatus() == NEW) {
- seedTransaction(txName, fateId, repo, autoCleanUp, goalMessage,
txStore);
- }
- } finally {
- txStore.unreserve(Duration.ZERO);
- }
-
+ log.info("Seeding {} {}", fateId, goalMessage);
+ store.seedTransaction(txName, fateId, repo, autoCleanUp);
}
// check on the transaction
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index 09ee12dd94..d434770461 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -50,19 +50,41 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
FateId create();
/**
- * Creates and reserves a transaction using the given key. If something is
already running for the
- * given key, then Optional.empty() will be returned. When this returns a
non-empty id, it will be
- * in the new state.
+ * Seeds a transaction with the given repo if it does not exists. A fateId
will be derived from
+ * the fateKey. If seeded, sets the following data for the fateId in the
store.
*
- * <p>
- * In the case where a process dies in the middle of a call to this. If
later, another call is
- * made with the same key and its in the new state then the FateId for that
key will be returned.
- * </p>
+ * <ul>
+ * <li>Set the tx name</li>
+ * <li>Set the status to SUBMITTED</li>
+ * <li>Set the fate key</li>
+ * <li>Sets autocleanup only if true</li>
+ * <li>Sets the creation time</li>
+ * </ul>
*
- * @throws IllegalStateException when there is an unexpected collision. This
can occur if two key
- * hash to the same FateId or if a random FateId already exists.
+ * @return The return type is only intended for testing it may not be
correct in the face of
+ * failures. When there are no failures returns optional w/ the fate
id set if seeded and
+ * empty optional otherwise. If there was a failure this could
return an empty optional
+ * when it actually succeeded.
*/
- Optional<FateTxStore<T>> createAndReserve(FateKey fateKey);
+ Optional<FateId> seedTransaction(String txName, FateKey fateKey, Repo<T>
repo,
+ boolean autoCleanUp);
+
+ /**
+ * Seeds a transaction with the given repo if its current status is NEW and
it is currently
+ * unreserved. If seeded, sets the following data for the fateId in the
store.
+ *
+ * <ul>
+ * <li>Set the tx name</li>
+ * <li>Set the status to SUBMITTED</li>
+ * <li>Sets autocleanup only if true</li>
+ * <li>Sets the creation time</li>
+ * </ul>
+ *
+ * @return The return type is only intended for testing it may not be
correct in the face of
+ * failures. When there are no failures returns true if seeded and
false otherwise. If
+ * there was a failure this could return false when it actually
succeeded.
+ */
+ boolean seedTransaction(String txName, FateId fateId, Repo<T> repo, boolean
autoCleanUp);
/**
* An interface that allows read/write access to the data related to a
single fate operation.
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
index d199a7463e..ac675d7fb9 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
@@ -33,6 +33,29 @@ public interface FateMutator<T> {
FateMutator<T> putCreateTime(long ctime);
+ /**
+ * Requires that nothing exists for this fate mutation.
+ */
+ FateMutator<T> requireAbsent();
+
+ /**
+ * Require that the transaction status is one of the given statuses. If no
statuses are provided,
+ * require that the status column is absent.
+ *
+ * @param statuses The statuses to check against.
+ */
+ FateMutator<T> requireStatus(TStatus... statuses);
+
+ /**
+ * Require the transaction has no reservation.
+ */
+ FateMutator<T> requireUnreserved();
+
+ /**
+ * Require the transaction has no fate key set.
+ */
+ FateMutator<T> requireAbsentKey();
+
/**
* Add a conditional mutation to {@link
FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
* put the reservation if there is not already a reservation present
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
index 5d99a8df3a..ea7dd85c57 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
@@ -29,6 +29,7 @@ import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.ConditionalWriter;
+import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.Scanner;
import org.apache.accumulo.core.client.TableNotFoundException;
@@ -48,12 +49,16 @@ import
org.apache.accumulo.core.fate.user.schema.FateSchema.TxInfoColumnFamily;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.hadoop.io.Text;
+import com.google.common.base.Preconditions;
+
public class FateMutatorImpl<T> implements FateMutator<T> {
private final ClientContext context;
private final String tableName;
private final FateId fateId;
private final ConditionalMutation mutation;
+ private boolean requiredUnreserved = false;
+ public static final int INITIAL_ITERATOR_PRIO = 1000000;
public FateMutatorImpl(ClientContext context, String tableName, FateId
fateId) {
this.context = Objects.requireNonNull(context);
@@ -81,10 +86,34 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
}
@Override
- public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
+ public FateMutator<T> requireAbsent() {
+ IteratorSetting is = new IteratorSetting(INITIAL_ITERATOR_PRIO,
RowExistsIterator.class);
+ Condition c = new Condition("", "").setIterators(is);
+ mutation.addCondition(c);
+ return this;
+ }
+
+ @Override
+ public FateMutator<T> requireUnreserved() {
+ Preconditions.checkState(!requiredUnreserved);
Condition condition = new
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
mutation.addCondition(condition);
+ requiredUnreserved = true;
+ return this;
+ }
+
+ @Override
+ public FateMutator<T> requireAbsentKey() {
+ Condition condition = new
Condition(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
+ TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
+ mutation.addCondition(condition);
+ return this;
+ }
+
+ @Override
+ public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
+ requireUnreserved();
TxColumnFamily.RESERVATION_COLUMN.put(mutation, new
Value(reservation.getSerialized()));
return this;
}
@@ -179,12 +208,7 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
return this;
}
- /**
- * Require that the transaction status is one of the given statuses. If no
statuses are provided,
- * require that the status column is absent.
- *
- * @param statuses The statuses to check against.
- */
+ @Override
public FateMutator<T> requireStatus(TStatus... statuses) {
Condition condition = StatusMappingIterator.createCondition(statuses);
mutation.addCondition(condition);
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/RowExistsIterator.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/RowExistsIterator.java
new file mode 100644
index 0000000000..6095546b0d
--- /dev/null
+++
b/core/src/main/java/org/apache/accumulo/core/fate/user/RowExistsIterator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.core.fate.user;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Set;
+
+import org.apache.accumulo.core.data.ByteSequence;
+import org.apache.accumulo.core.data.Range;
+import org.apache.accumulo.core.iterators.WrappingIterator;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Iterator is used by conditional mutations to check if row exists.
+ */
+public class RowExistsIterator extends WrappingIterator {
+
+ @Override
+ public void seek(Range range, Collection<ByteSequence> columnFamilies,
boolean inclusive)
+ throws IOException {
+ Preconditions.checkState(range.getStartKey() != null && range.getEndKey()
!= null);
+ var startRow = range.getStartKey().getRow();
+ var endRow = range.getEndKey().getRow();
+ Preconditions.checkState(startRow.equals(endRow));
+ Range r = new Range(startRow);
+ super.seek(r, Set.of(), false);
+ }
+}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index efd0cbc62f..c134db1840 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.core.fate.user;
import java.io.IOException;
import java.io.Serializable;
-import java.time.Duration;
import java.util.EnumSet;
import java.util.List;
import java.util.Map.Entry;
@@ -30,6 +29,7 @@ import java.util.SortedMap;
import java.util.UUID;
import java.util.function.Function;
import java.util.function.Predicate;
+import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -106,7 +106,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
UtilWaitThread.sleep(100);
}
- var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW)
+ var status = newMutator(fateId).requireAbsent().putStatus(TStatus.NEW)
.putCreateTime(System.currentTimeMillis()).tryMutate();
switch (status) {
@@ -123,104 +123,62 @@ public class UserFateStore<T> extends
AbstractFateStore<T> {
}
public FateId getFateId() {
- return FateId.from(fateInstanceType, UUID.randomUUID());
+ return fateIdGenerator.newRandomId(type());
}
@Override
- public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
- final var reservation = FateReservation.from(lockID, UUID.randomUUID());
+ public Optional<FateId> seedTransaction(String txName, FateKey fateKey,
Repo<T> repo,
+ boolean autoCleanUp) {
final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
- Optional<FateTxStore<T>> txStore = Optional.empty();
- int maxAttempts = 5;
- FateMutator.Status status = null;
-
- // Only need to retry if it is UNKNOWN
- for (int attempt = 0; attempt < maxAttempts; attempt++) {
- status =
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
-
.putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate();
- if (status != FateMutator.Status.UNKNOWN) {
- break;
- }
- UtilWaitThread.sleep(100);
+ Supplier<FateMutator<T>> mutatorFactory = () ->
newMutator(fateId).requireAbsent()
+ .putKey(fateKey).putCreateTime(System.currentTimeMillis());
+ if (seedTransaction(mutatorFactory, fateKey + " " + fateId, txName, repo,
autoCleanUp)) {
+ return Optional.of(fateId);
+ } else {
+ return Optional.empty();
}
+ }
- switch (status) {
- case ACCEPTED:
- txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
- break;
- case REJECTED:
- // If the status is REJECTED, we need to check what about the mutation
was REJECTED:
- // 1) Possible something like the following occurred:
- // the first attempt was UNKNOWN but written, the next attempt would
be rejected
- // We return the FateTxStore in this case.
- // 2) If there is a collision with existing fate id, throw error
- // 3) If the fate id is already reserved, return an empty optional
- // 4) If the fate id is still NEW/unseeded and unreserved, we can try
to reserve it
- try (Scanner scanner = context.createScanner(tableName,
Authorizations.EMPTY)) {
- scanner.setRange(getRow(fateId));
- scanner.fetchColumn(TxColumnFamily.STATUS_COLUMN.getColumnFamily(),
- TxColumnFamily.STATUS_COLUMN.getColumnQualifier());
- scanner.fetchColumn(TxColumnFamily.TX_KEY_COLUMN.getColumnFamily(),
- TxColumnFamily.TX_KEY_COLUMN.getColumnQualifier());
-
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
- TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
- TStatus statusSeen = TStatus.UNKNOWN;
- Optional<FateKey> fateKeySeen = Optional.empty();
- Optional<FateReservation> reservationSeen = Optional.empty();
-
- for (Entry<Key,Value> entry : scanner) {
- Text colf = entry.getKey().getColumnFamily();
- Text colq = entry.getKey().getColumnQualifier();
- Value val = entry.getValue();
-
- switch (colq.toString()) {
- case TxColumnFamily.STATUS:
- statusSeen = TStatus.valueOf(val.toString());
- break;
- case TxColumnFamily.TX_KEY:
- fateKeySeen = Optional.of(FateKey.deserialize(val.get()));
- break;
- case TxColumnFamily.RESERVATION:
- reservationSeen =
Optional.of(FateReservation.deserialize(val.get()));
- break;
- default:
- throw new IllegalStateException("Unexpected column seen: " +
colf + ":" + colq);
- }
- }
+ @Override
+ public boolean seedTransaction(String txName, FateId fateId, Repo<T> repo,
boolean autoCleanUp) {
+ Supplier<FateMutator<T>> mutatorFactory =
+ () ->
newMutator(fateId).requireStatus(TStatus.NEW).requireUnreserved().requireAbsentKey();
+ return seedTransaction(mutatorFactory, fateId.canonical(), txName, repo,
autoCleanUp);
+ }
- if (statusSeen == TStatus.NEW) {
- verifyFateKey(fateId, fateKeySeen, fateKey);
- // This will be the case if the mutation status is REJECTED but
the mutation was written
- if (reservationSeen.isPresent() &&
reservationSeen.orElseThrow().equals(reservation)) {
- txStore = Optional.of(new FateTxStoreImpl(fateId, reservation));
- } else if (reservationSeen.isEmpty()) {
- // NEW/unseeded transaction and not reserved, so we can allow it
to be reserved
- // we tryReserve() since another thread may have reserved it
since the scan
- txStore = tryReserve(fateId);
- // the status was known before reserving to be NEW,
- // however it could change so check after reserving to avoid
race conditions.
- var statusAfterReserve =
-
txStore.map(ReadOnlyFateTxStore::getStatus).orElse(TStatus.UNKNOWN);
- if (statusAfterReserve != TStatus.NEW) {
- txStore.ifPresent(txs -> txs.unreserve(Duration.ZERO));
- txStore = Optional.empty();
- }
- }
- } else {
- log.trace(
- "fate id {} tstatus {} fate key {} is reserved {} "
- + "has already been seeded with work (non-NEW status)",
- fateId, statusSeen, fateKeySeen.orElse(null),
reservationSeen.isPresent());
- }
- } catch (TableNotFoundException e) {
- throw new IllegalStateException(tableName + " not found!", e);
- }
- break;
- default:
- throw new IllegalStateException("Unknown or unexpected status " +
status);
+ private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory,
String logId,
+ String txName, Repo<T> repo, boolean autoCleanUp) {
+ int maxAttempts = 5;
+ for (int attempt = 0; attempt < maxAttempts; attempt++) {
+ var mutator = mutatorFactory.get();
+ mutator =
+ mutator.putName(serializeTxInfo(txName)).putRepo(1,
repo).putStatus(TStatus.SUBMITTED);
+ if (autoCleanUp) {
+ mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
+ }
+ var status = mutator.tryMutate();
+ if (status == FateMutator.Status.ACCEPTED) {
+ // signal to the super class that a new fate transaction was seeded
and is ready to run
+ seededTx();
+ log.trace("Attempt to seed {} returned {}", logId, status);
+ return true;
+ } else if (status == FateMutator.Status.REJECTED) {
+ log.debug("Attempt to seed {} returned {}", logId, status);
+ return false;
+ } else if (status == FateMutator.Status.UNKNOWN) {
+ // At this point can not reliably determine if the conditional
mutation was successful or
+ // not because no reservation was acquired. For example since no
reservation was acquired it
+ // is possible that seeding was a success and something immediately
picked it up and started
+ // operating on it and changing it. If scanning after that point can
not conclude success or
+ // failure. Another situation is that maybe the fateId already existed
in a seeded form
+ // prior to getting this unknown.
+ log.debug("Attempt to seed {} returned {} status, retrying", logId,
status);
+ UtilWaitThread.sleep(250);
+ }
}
- return txStore;
+ log.warn("Repeatedly received unknown status when attempting to seed {}",
logId);
+ return false;
}
@Override
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
index d6da05e844..28c0904ffa 100644
---
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
+++
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/MetaFateStore.java
@@ -20,12 +20,15 @@ package org.apache.accumulo.core.fate.zookeeper;
import static
com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.NEW;
+import static
org.apache.accumulo.core.fate.ReadOnlyFateStore.TStatus.SUBMITTED;
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.io.UncheckedIOException;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
@@ -100,7 +103,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
public FateId create() {
while (true) {
try {
- FateId fateId = FateId.from(fateInstanceType, UUID.randomUUID());
+ FateId fateId = fateIdGenerator.newRandomId(fateInstanceType);
zk.putPersistentData(getTXPath(fateId), new NodeValue(TStatus.NEW,
null).serialize(),
NodeExistsPolicy.FAIL);
return fateId;
@@ -112,8 +115,7 @@ public class MetaFateStore<T> extends AbstractFateStore<T> {
}
}
- @Override
- public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
+ private Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
final var reservation = FateReservation.from(lockID, UUID.randomUUID());
final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
@@ -161,6 +163,52 @@ public class MetaFateStore<T> extends AbstractFateStore<T>
{
}
}
+ @Override
+ public Optional<FateId> seedTransaction(String txName, FateKey fateKey,
Repo<T> repo,
+ boolean autoCleanUp) {
+ return createAndReserve(fateKey).map(txStore -> {
+ try {
+ seedTransaction(txName, repo, autoCleanUp, txStore);
+ return txStore.getID();
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+ });
+ }
+
+ @Override
+ public boolean seedTransaction(String txName, FateId fateId, Repo<T> repo,
boolean autoCleanUp) {
+ return tryReserve(fateId).map(txStore -> {
+ try {
+ if (txStore.getStatus() == NEW) {
+ seedTransaction(txName, repo, autoCleanUp, txStore);
+ return true;
+ }
+ return false;
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+ }).orElse(false);
+ }
+
+ private void seedTransaction(String txName, Repo<T> repo, boolean
autoCleanUp,
+ FateTxStore<T> txStore) {
+ if (txStore.top() == null) {
+ try {
+ txStore.push(repo);
+ } catch (StackOverflowException e) {
+ // this should not happen
+ throw new IllegalStateException(e);
+ }
+ }
+
+ if (autoCleanUp) {
+ txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
+ }
+ txStore.setTransactionInfo(TxInfo.TX_NAME, txName);
+ txStore.setStatus(SUBMITTED);
+ }
+
@Override
public Optional<FateTxStore<T>> tryReserve(FateId fateId) {
// uniquely identify this attempt to reserve the fate operation data
diff --git
a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
index 4a9f2517c0..9a5984f4ed 100644
--- a/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
+++ b/core/src/main/java/org/apache/accumulo/core/logging/FateLogger.java
@@ -149,6 +149,33 @@ public class FateLogger {
return fateId;
}
+ @Override
+ public Optional<FateId> seedTransaction(String txName, FateKey fateKey,
Repo<T> repo,
+ boolean autoCleanUp) {
+ var optional = store.seedTransaction(txName, fateKey, repo,
autoCleanUp);
+ if (storeLog.isTraceEnabled()) {
+ optional.ifPresentOrElse(fateId -> {
+ storeLog.trace("{} seeded {} {} {}", fateId, fateKey,
toLogString.apply(repo),
+ autoCleanUp);
+ }, () -> {
+ storeLog.trace("Possibly unable to seed {} {} {}", fateKey,
toLogString.apply(repo),
+ autoCleanUp);
+ });
+ }
+ return optional;
+ }
+
+ @Override
+ public boolean seedTransaction(String txName, FateId fateId, Repo<T>
repo,
+ boolean autoCleanUp) {
+ boolean seeded = store.seedTransaction(txName, fateId, repo,
autoCleanUp);
+ if (storeLog.isTraceEnabled()) {
+ storeLog.trace("{} {} {} {}", fateId, seeded ? "seeded" : "unable to
seed",
+ toLogString.apply(repo), autoCleanUp);
+ }
+ return seeded;
+ }
+
@Override
public int getDeferredCount() {
return store.getDeferredCount();
@@ -164,22 +191,6 @@ public class FateLogger {
return store.isDeferredOverflow();
}
- @Override
- public Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
- Optional<FateTxStore<T>> txStore = store.createAndReserve(fateKey);
- if (storeLog.isTraceEnabled()) {
- if (txStore.isPresent()) {
- storeLog.trace("{} created and reserved fate transaction using key
: {}",
- txStore.orElseThrow().getID(), fateKey);
- } else {
- storeLog.trace(
- "fate transaction was not created using key : {}, existing
transaction exists",
- fateKey);
- }
- }
- return txStore;
- }
-
@Override
public Map<FateId,FateReservation> getActiveReservations() {
return store.getActiveReservations();
diff --git a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
index 2c54464663..40d0d755b1 100644
--- a/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
+++ b/core/src/test/java/org/apache/accumulo/core/fate/TestStore.java
@@ -53,8 +53,15 @@ public class TestStore implements FateStore<String> {
}
@Override
- public Optional<FateTxStore<String>> createAndReserve(FateKey key) {
- throw new UnsupportedOperationException();
+ public Optional<FateId> seedTransaction(String txName, FateKey fateKey,
Repo<String> repo,
+ boolean autoCleanUp) {
+ return Optional.empty();
+ }
+
+ @Override
+ public boolean seedTransaction(String txName, FateId fateId, Repo<String>
repo,
+ boolean autoCleanUp) {
+ return false;
}
@Override
diff --git
a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
index b0456afd32..ebe732049f 100644
---
a/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
+++
b/server/base/src/main/java/org/apache/accumulo/server/metadata/iterators/SetEncodingIterator.java
@@ -79,7 +79,7 @@ public class SetEncodingIterator implements
SortedKeyValueIterator<Key,Value> {
// expecting this range to cover a single metadata row, so validate the
range meets expectations
MetadataSchema.TabletsSection.validateRow(row);
Preconditions.checkArgument(row.equals(range.getEndKey().getRow()));
- return range.getStartKey().getRow();
+ return row;
}
@Override
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
index e06229c21b..772676a491 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/compaction/coordinator/CompactionCoordinator.java
@@ -747,11 +747,8 @@ public class CompactionCoordinator
// Start a fate transaction to commit the compaction.
CompactionMetadata ecm = tabletMeta.getExternalCompactions().get(ecid);
var renameOp = new RenameCompactionFile(new CompactionCommitData(ecid,
extent, ecm, stats));
- var txid = localFate.seedTransaction("COMMIT_COMPACTION",
FateKey.forCompactionCommit(ecid),
- renameOp, true, "Commit compaction " + ecid);
-
- txid.ifPresentOrElse(fateId -> LOG.debug("initiated compaction commit {}
{}", ecid, fateId),
- () -> LOG.debug("compaction commit already initiated for {}", ecid));
+ localFate.seedTransaction("COMMIT_COMPACTION",
FateKey.forCompactionCommit(ecid), renameOp,
+ true);
}
@Override
diff --git
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
index 78f08a9471..7b56ea4388 100644
---
a/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
+++
b/server/manager/src/main/java/org/apache/accumulo/manager/split/SeedSplitTask.java
@@ -18,10 +18,7 @@
*/
package org.apache.accumulo.manager.split;
-import java.util.Optional;
-
import org.apache.accumulo.core.dataImpl.KeyExtent;
-import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.manager.Manager;
@@ -44,17 +41,8 @@ public class SeedSplitTask implements Runnable {
public void run() {
try {
var fateInstanceType = FateInstanceType.fromTableId((extent.tableId()));
-
- Optional<FateId> optFateId =
- manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT",
FateKey.forSplit(extent),
- new FindSplits(extent), true, "System initiated split of tablet
" + extent);
-
- optFateId.ifPresentOrElse(fateId -> {
- log.trace("System initiated a split for : {} {}", extent, fateId);
- }, () -> {
- log.trace("System attempted to initiate a split but one was in
progress : {}", extent);
- });
-
+ manager.fate(fateInstanceType).seedTransaction("SYSTEM_SPLIT",
FateKey.forSplit(extent),
+ new FindSplits(extent), true);
} catch (Exception e) {
log.error("Failed to split {}", extent, e);
}
diff --git
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
index e8955e465a..c8905cd850 100644
---
a/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
+++
b/test/src/main/java/org/apache/accumulo/test/compaction/ExternalCompaction_1_IT.java
@@ -78,6 +78,7 @@ import org.apache.accumulo.core.fate.FateId;
import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateStore;
+import org.apache.accumulo.core.fate.Repo;
import org.apache.accumulo.core.fate.user.UserFateStore;
import org.apache.accumulo.core.fate.zookeeper.MetaFateStore;
import org.apache.accumulo.core.iterators.DevNull;
@@ -96,6 +97,7 @@ import
org.apache.accumulo.core.spi.compaction.SimpleCompactionDispatcher;
import org.apache.accumulo.harness.MiniClusterConfigurationCallback;
import org.apache.accumulo.harness.SharedMiniClusterBase;
import org.apache.accumulo.manager.Manager;
+import org.apache.accumulo.manager.tableOps.ManagerRepo;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl;
import org.apache.accumulo.server.util.FindCompactionTmpFiles;
@@ -332,6 +334,21 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
}
}
+ public static class FakeRepo extends ManagerRepo {
+
+ private static final long serialVersionUID = 1234L;
+
+ @Override
+ public long isReady(FateId fateId, Manager environment) throws Exception {
+ return 1000;
+ }
+
+ @Override
+ public Repo<Manager> call(FateId fateId, Manager environment) throws
Exception {
+ return null;
+ }
+ }
+
private FateId createCompactionCommitAndDeadMetadata(AccumuloClient c,
FateStore<Manager> fateStore, String tableName,
Map<TableId,List<ExternalCompactionId>> allCids) throws Exception {
@@ -345,10 +362,9 @@ public class ExternalCompaction_1_IT extends
SharedMiniClusterBase {
// Create a fate transaction for one of the compaction ids that is in the
new state, it
// should never run. Its purpose is to prevent the dead compaction detector
// from deleting the id.
- FateStore.FateTxStore<Manager> fateTx = fateStore
-
.createAndReserve(FateKey.forCompactionCommit(allCids.get(tableId).get(0))).orElseThrow();
- var fateId = fateTx.getID();
- fateTx.unreserve(Duration.ZERO);
+ Repo<Manager> repo = new FakeRepo();
+ var fateId = fateStore.seedTransaction("COMPACTION_COMMIT",
+ FateKey.forCompactionCommit(allCids.get(tableId).get(0)), repo,
true).orElseThrow();
// Read the tablet metadata
var tabletsMeta =
ctx.getAmple().readTablets().forTable(tableId).build().stream()
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
index e8a77bd330..f493292368 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
@@ -77,6 +77,10 @@ public abstract class FateIT extends SharedMiniClusterBase
implements FateTestRu
private final String data;
+ public TestRepo() {
+ this("test");
+ }
+
public TestRepo(String data) {
this.data = data;
}
diff --git a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
index 64607cab7b..1980dcf4ee 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/FateStoreIT.java
@@ -25,10 +25,10 @@ import static
org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
-import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -40,6 +40,7 @@ import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.accumulo.core.data.TableId;
@@ -47,6 +48,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.fate.AbstractFateStore;
import org.apache.accumulo.core.fate.Fate.TxInfo;
import org.apache.accumulo.core.fate.FateId;
+import org.apache.accumulo.core.fate.FateInstanceType;
import org.apache.accumulo.core.fate.FateKey;
import org.apache.accumulo.core.fate.FateStore;
import org.apache.accumulo.core.fate.FateStore.FateTxStore;
@@ -296,18 +298,20 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
FateKey fateKey2 =
FateKey.forCompactionCommit(ExternalCompactionId.generate(UUID.randomUUID()));
- FateTxStore<TestEnv> txStore1 =
store.createAndReserve(fateKey1).orElseThrow();
- FateTxStore<TestEnv> txStore2 =
store.createAndReserve(fateKey2).orElseThrow();
+ var fateId1 = store.seedTransaction("TEST", fateKey1, new TestRepo(),
true).orElseThrow();
+ var fateId2 = store.seedTransaction("TEST", fateKey2, new TestRepo(),
true).orElseThrow();
- assertNotEquals(txStore1.getID(), txStore2.getID());
+ assertNotEquals(fateId1, fateId2);
+ var txStore1 = store.reserve(fateId1);
+ var txStore2 = store.reserve(fateId2);
try {
assertTrue(txStore1.timeCreated() > 0);
- assertEquals(TStatus.NEW, txStore1.getStatus());
+ assertEquals(TStatus.SUBMITTED, txStore1.getStatus());
assertEquals(fateKey1, txStore1.getKey().orElseThrow());
assertTrue(txStore2.timeCreated() > 0);
- assertEquals(TStatus.NEW, txStore2.getStatus());
+ assertEquals(TStatus.SUBMITTED, txStore2.getStatus());
assertEquals(fateKey2, txStore2.getKey().orElseThrow());
assertEquals(2, store.list().count());
@@ -328,18 +332,17 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
KeyExtent ke =
new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new
Text("aaa"));
- // Creating with the same key should be fine if the status is NEW
- // A second call to createAndReserve() should just return an empty optional
- // since it's already in reserved and in progress
FateKey fateKey = FateKey.forSplit(ke);
- FateTxStore<TestEnv> txStore =
store.createAndReserve(fateKey).orElseThrow();
+ var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(),
true).orElseThrow();
// second call is empty
- assertTrue(store.createAndReserve(fateKey).isEmpty());
+ assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(),
true).isEmpty());
+ assertFalse(store.seedTransaction("TEST", fateId, new TestRepo(), true));
+ var txStore = store.reserve(fateId);
try {
assertTrue(txStore.timeCreated() > 0);
- assertEquals(TStatus.NEW, txStore.getStatus());
+ assertEquals(TStatus.SUBMITTED, txStore.getStatus());
assertEquals(fateKey, txStore.getKey().orElseThrow());
assertEquals(1, store.list().count());
} finally {
@@ -359,15 +362,16 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new
Text("aaa"));
FateKey fateKey = FateKey.forSplit(ke);
- FateTxStore<TestEnv> txStore =
store.createAndReserve(fateKey).orElseThrow();
+ var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(),
true).orElseThrow();
+ var txStore = store.reserve(fateId);
try {
assertTrue(txStore.timeCreated() > 0);
txStore.setStatus(TStatus.IN_PROGRESS);
// We have an existing transaction with the same key in progress
// so should return an empty Optional
- assertTrue(store.createAndReserve(fateKey).isEmpty());
+ assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(),
true).isEmpty());
assertEquals(TStatus.IN_PROGRESS, txStore.getStatus());
} finally {
txStore.setStatus(TStatus.SUCCESSFUL);
@@ -375,14 +379,20 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
txStore.unreserve(Duration.ZERO);
}
+ txStore = null;
+
try {
// After deletion, make sure we can create again with the same key
- txStore = store.createAndReserve(fateKey).orElseThrow();
+ var fateId2 = store.seedTransaction("TEST", fateKey, new TestRepo(),
true).orElseThrow();
+ txStore = store.reserve(fateId);
+ assertEquals(fateId, fateId2);
assertTrue(txStore.timeCreated() > 0);
- assertEquals(TStatus.NEW, txStore.getStatus());
+ assertEquals(TStatus.SUBMITTED, txStore.getStatus());
} finally {
- txStore.delete();
- txStore.unreserve(Duration.ZERO);
+ if (txStore != null) {
+ txStore.delete();
+ txStore.unreserve(Duration.ZERO);
+ }
}
}
@@ -392,8 +402,18 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
// Replace the default hashing algorithm with one that always returns the
same tid so
// we can check duplicate detection with different keys
executeTest(this::testCreateWithKeyCollision,
AbstractFateStore.DEFAULT_MAX_DEFERRED,
- (instanceType, fateKey) -> FateId.from(instanceType,
- UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8))));
+ new AbstractFateStore.FateIdGenerator() {
+ @Override
+ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey
fateKey) {
+ return FateId.from(instanceType,
+ UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8)));
+ }
+
+ @Override
+ public FateId newRandomId(FateInstanceType instanceType) {
+ return FateId.from(instanceType, UUID.randomUUID());
+ }
+ });
}
protected void testCreateWithKeyCollision(FateStore<TestEnv> store,
ServerContext sctx) {
@@ -404,13 +424,10 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
FateKey fateKey1 = FateKey.forSplit(ke1);
FateKey fateKey2 = FateKey.forSplit(ke2);
- FateTxStore<TestEnv> txStore =
store.createAndReserve(fateKey1).orElseThrow();
+ var fateId1 = store.seedTransaction("TEST", fateKey1, new TestRepo(),
true).orElseThrow();
+ var txStore = store.reserve(fateId1);
try {
- var e = assertThrows(IllegalStateException.class, () ->
store.createAndReserve(fateKey2));
- assertEquals(
- "Collision detected for fate id "
- + FateId.from(store.type(), UUID.nameUUIDFromBytes("testing
uuid".getBytes(UTF_8))),
- e.getMessage());
+ assertTrue(store.seedTransaction("TEST", fateKey2, new TestRepo(),
true).isEmpty());
assertEquals(fateKey1, txStore.getKey().orElseThrow());
} finally {
txStore.delete();
@@ -430,26 +447,192 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new
Text("aaa"));
FateKey fateKey = FateKey.forSplit(ke);
- FateTxStore<TestEnv> txStore =
store.createAndReserve(fateKey).orElseThrow();
- FateId fateId = txStore.getID();
+ var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(),
true).orElseThrow();
- // After createAndReserve a fate transaction using a key we can simulate a
collision with
- // a random FateId by deleting the key out of Fate and calling
createAndReserve again to
+ // After seeding a fate transaction using a key we can simulate a
collision with
+ // a random FateId by deleting the key out of Fate and calling seed again
to
// verify it detects the key is missing. Then we can continue and see if
we can still use
// the existing transaction.
deleteKey(fateId, sctx);
- var e = assertThrows(IllegalStateException.class, () ->
store.createAndReserve(fateKey));
- assertEquals("fate key is missing from fate id " + fateId, e.getMessage());
+ assertTrue(store.seedTransaction("TEST", fateKey, new TestRepo(),
true).isEmpty());
+ var txStore = store.reserve(fateId);
// We should still be able to use the existing transaction
try {
assertTrue(txStore.timeCreated() > 0);
+ assertEquals(TStatus.SUBMITTED, txStore.getStatus());
+ } finally {
+ txStore.delete();
+ txStore.unreserve(Duration.ZERO);
+ }
+ }
+
+ public static final UUID DUPLICATE_UUID = UUID.randomUUID();
+
+ public static final List<UUID> UUIDS = List.of(DUPLICATE_UUID,
DUPLICATE_UUID, UUID.randomUUID());
+
+ @Test
+ public void testCreate() throws Exception {
+ AtomicInteger index = new AtomicInteger(0);
+ executeTest(this::testCreate, AbstractFateStore.DEFAULT_MAX_DEFERRED,
+ new AbstractFateStore.FateIdGenerator() {
+ @Override
+ public FateId fromTypeAndKey(FateInstanceType instanceType, FateKey
fateKey) {
+ return FateId.from(instanceType,
+ UUID.nameUUIDFromBytes("testing uuid".getBytes(UTF_8)));
+ }
+
+ @Override
+ public FateId newRandomId(FateInstanceType instanceType) {
+ return FateId.from(instanceType, UUIDS.get(index.getAndIncrement()
% UUIDS.size()));
+ }
+ });
+ }
+
+ protected void testCreate(FateStore<TestEnv> store, ServerContext sctx)
throws Exception {
+
+ var fateId1 = store.create();
+ assertEquals(UUIDS.get(0), fateId1.getTxUUID());
+
+ // This UUIDS[1] should collide with UUIDS[0] and then the code should
retry and end up UUIDS[2]
+ var fateId2 = store.create();
+ assertEquals(UUIDS.get(2), fateId2.getTxUUID());
+
+ for (var fateId : List.of(fateId1, fateId2)) {
+ var txStore = store.reserve(fateId);
+ try {
+ assertEquals(TStatus.NEW, txStore.getStatus());
+ assertTrue(txStore.timeCreated() > 0);
+ assertNull(txStore.top());
+ assertTrue(txStore.getKey().isEmpty());
+ assertEquals(fateId, txStore.getID());
+ assertTrue(txStore.getStack().isEmpty());
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+ }
+
+ assertEquals(Set.of(fateId1, fateId2),
+ store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+ var txStore = store.reserve(fateId2);
+ try {
+ txStore.delete();
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+
+ assertEquals(Set.of(fateId1),
+ store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+ txStore = store.reserve(fateId1);
+ try {
+ txStore.setStatus(TStatus.SUBMITTED);
+ txStore.setStatus(TStatus.IN_PROGRESS);
+ txStore.push(new TestRepo());
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+
+ assertEquals(Set.of(fateId1),
+ store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+ // should collide again with the first fate id and go to the second
+ fateId2 = store.create();
+ assertEquals(UUIDS.get(2), fateId2.getTxUUID());
+
+ assertEquals(Set.of(fateId1, fateId2),
+ store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+ // ensure fateId1 was not altered in anyway by creating fateid2 when it
collided
+ txStore = store.reserve(fateId1);
+ try {
+ assertEquals(TStatus.IN_PROGRESS, txStore.getStatus());
+ assertNotNull(txStore.top());
+ txStore.forceDelete();
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+
+ assertEquals(Set.of(fateId2),
+ store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+ txStore = store.reserve(fateId2);
+ try {
assertEquals(TStatus.NEW, txStore.getStatus());
+ txStore.delete();
} finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+
+ // should be able to recreate something at the same id
+ fateId1 = store.create();
+ assertEquals(UUIDS.get(0), fateId1.getTxUUID());
+ txStore = store.reserve(fateId1);
+ try {
+ assertEquals(TStatus.NEW, txStore.getStatus());
+ assertTrue(txStore.timeCreated() > 0);
+ assertNull(txStore.top());
+ assertTrue(txStore.getKey().isEmpty());
+ assertEquals(fateId1, txStore.getID());
+ assertTrue(txStore.getStack().isEmpty());
txStore.delete();
+ } finally {
txStore.unreserve(Duration.ZERO);
}
+ assertEquals(Set.of(),
store.list().map(FateIdStatus::getFateId).collect(Collectors.toSet()));
+
+ }
+
+ @Test
+ public void testConcurrent() throws Exception {
+ executeTest(this::testConcurrent);
+ }
+
+ protected void testConcurrent(FateStore<TestEnv> store, ServerContext sctx)
throws Exception {
+ KeyExtent ke =
+ new KeyExtent(TableId.of(getUniqueNames(1)[0]), new Text("zzz"), new
Text("aaa"));
+ FateKey fateKey = FateKey.forSplit(ke);
+
+ var executor = Executors.newFixedThreadPool(10);
+ try {
+ // have 10 threads all try to seed the same fate key, only one should
succeed.
+ List<Future<Optional<FateId>>> futures = new ArrayList<>(10);
+ for (int i = 0; i < 10; i++) {
+ futures.add(
+ executor.submit(() -> store.seedTransaction("TEST", fateKey, new
TestRepo(), true)));
+ }
+
+ int idsSeen = 0;
+ for (var future : futures) {
+ if (future.get().isPresent()) {
+ idsSeen++;
+ }
+ }
+
+ assertEquals(1, idsSeen);
+ assertEquals(1, store.list(FateKey.FateKeyType.SPLIT).count());
+ assertEquals(0,
store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count());
+
+ for (var future : futures) {
+ if (future.get().isPresent()) {
+ var txStore = store.reserve(future.get().orElseThrow());
+ try {
+ txStore.delete();
+ } finally {
+ txStore.unreserve(Duration.ZERO);
+ }
+ }
+ }
+
+ assertEquals(0, store.list(FateKey.FateKeyType.SPLIT).count());
+ assertEquals(0,
store.list(FateKey.FateKeyType.COMPACTION_COMMIT).count());
+
+ } finally {
+ executor.shutdown();
+ }
+
}
@Test
@@ -500,9 +683,8 @@ public abstract class FateStoreIT extends
SharedMiniClusterBase implements FateT
Map<FateKey,FateId> fateKeyIds = new HashMap<>();
for (FateKey fateKey : List.of(fateKey1, fateKey2, fateKey3, fateKey4)) {
- var fateTx = store.createAndReserve(fateKey).orElseThrow();
- fateKeyIds.put(fateKey, fateTx.getID());
- fateTx.unreserve(Duration.ZERO);
+ var fateId = store.seedTransaction("TEST", fateKey, new TestRepo(),
true).orElseThrow();
+ fateKeyIds.put(fateKey, fateId);
}
HashSet<FateId> allIds = new HashSet<>();