This is an automated email from the ASF dual-hosted git repository.
krathbun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 1e301e8bf4 Improvements to reservation column in FATE table and bug
fix (#4992)
1e301e8bf4 is described below
commit 1e301e8bf490152ce10e15a42d067f2f5b2c67ea
Author: Kevin Rathbun <[email protected]>
AuthorDate: Mon Oct 21 10:12:41 2024 -0400
Improvements to reservation column in FATE table and bug fix (#4992)
* Makes it so the reservation column is created on reservation and deleted
on unreservation (no longer store an unreserved value in the column)
* Addresses a bug with MultipleStoresIT.testDeadReservationsCleanup()
(ZooUtil.LockID was missing equals() and hashCode())
closes #4907
---
.../org/apache/accumulo/core/fate/FateStore.java | 18 -------------
.../accumulo/core/fate/user/FateMutator.java | 20 +-------------
.../accumulo/core/fate/user/FateMutatorImpl.java | 23 +---------------
.../accumulo/core/fate/user/UserFateStore.java | 24 +++++++----------
.../accumulo/core/fate/zookeeper/ZooUtil.java | 19 +++++++++++++
.../accumulo/test/fate/MultipleStoresIT.java | 10 +++----
.../accumulo/test/fate/user/FateMutatorImplIT.java | 31 +---------------------
7 files changed, 36 insertions(+), 109 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
index 6b7b68baf5..ae193d8df8 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/FateStore.java
@@ -29,7 +29,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
-import org.apache.accumulo.core.fate.user.FateMutatorImpl;
import org.apache.accumulo.core.fate.zookeeper.ZooUtil;
import org.apache.hadoop.io.DataInputBuffer;
@@ -147,19 +146,6 @@ public interface FateStore<T> extends ReadOnlyFateStore<T>
{
return new FateReservation(lockID, reservationUUID);
}
- /**
- * @param serializedFateRes the value present in the table for the
reservation column
- * @return true if the array represents a valid serialized FateReservation
object, false if it
- * represents an unreserved value, error otherwise
- */
- public static boolean isFateReservation(byte[] serializedFateRes) {
- if (Arrays.equals(serializedFateRes, FateMutatorImpl.NOT_RESERVED)) {
- return false;
- }
- deserialize(serializedFateRes);
- return true;
- }
-
public ZooUtil.LockID getLockID() {
return lockID;
}
@@ -195,10 +181,6 @@ public interface FateStore<T> extends ReadOnlyFateStore<T>
{
}
}
- public static boolean locksAreEqual(ZooUtil.LockID lockID1, ZooUtil.LockID
lockID2) {
- return lockID1.serialize("/").equals(lockID2.serialize("/"));
- }
-
@Override
public String toString() {
return lockID.serialize("/") + ":" + reservationUUID;
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
index 8c39e89700..d199a7463e 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutator.java
@@ -44,31 +44,13 @@ public interface FateMutator<T> {
/**
* Add a conditional mutation to {@link
FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
- * put the reservation if the column doesn't exist yet. This should only be
used for
- * {@link UserFateStore#createAndReserve(FateKey)}
- *
- * @param reservation the reservation to attempt to put
- * @return the FateMutator with this added mutation
- */
- FateMutator<T> putReservedTxOnCreation(FateStore.FateReservation
reservation);
-
- /**
- * Add a conditional mutation to {@link
FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
- * remove the given reservation if it matches what is present in the column.
+ * delete the column if the column value matches the given reservation
*
* @param reservation the reservation to attempt to remove
* @return the FateMutator with this added mutation
*/
FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation);
- /**
- * Add a conditional mutation to {@link
FateSchema.TxColumnFamily#RESERVATION_COLUMN} that will
- * put the initial column value if it has not already been set yet
- *
- * @return the FateMutator with this added mutation
- */
- FateMutator<T> putInitReservationVal();
-
FateMutator<T> putName(byte[] data);
FateMutator<T> putAutoClean(byte[] data);
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
index fcb0e4f1f1..5d99a8df3a 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/FateMutatorImpl.java
@@ -18,7 +18,6 @@
*/
package org.apache.accumulo.core.fate.user;
-import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.core.fate.AbstractFateStore.serialize;
import static org.apache.accumulo.core.fate.user.UserFateStore.getRow;
import static org.apache.accumulo.core.fate.user.UserFateStore.getRowId;
@@ -51,8 +50,6 @@ import org.apache.hadoop.io.Text;
public class FateMutatorImpl<T> implements FateMutator<T> {
- public static final byte[] NOT_RESERVED = "".getBytes(UTF_8);
-
private final ClientContext context;
private final String tableName;
private final FateId fateId;
@@ -85,15 +82,6 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
@Override
public FateMutator<T> putReservedTx(FateStore.FateReservation reservation) {
- Condition condition = new
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
-
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier()).setValue(NOT_RESERVED);
- mutation.addCondition(condition);
- TxColumnFamily.RESERVATION_COLUMN.put(mutation, new
Value(reservation.getSerialized()));
- return this;
- }
-
- @Override
- public FateMutator<T> putReservedTxOnCreation(FateStore.FateReservation
reservation) {
Condition condition = new
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
mutation.addCondition(condition);
@@ -107,16 +95,7 @@ public class FateMutatorImpl<T> implements FateMutator<T> {
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier())
.setValue(reservation.getSerialized());
mutation.addCondition(condition);
- TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED));
- return this;
- }
-
- @Override
- public FateMutator<T> putInitReservationVal() {
- Condition condition = new
Condition(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
- TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
- mutation.addCondition(condition);
- TxColumnFamily.RESERVATION_COLUMN.put(mutation, new Value(NOT_RESERVED));
+ TxColumnFamily.RESERVATION_COLUMN.putDelete(mutation);
return this;
}
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
index e1cb4d6405..7446d1fafe 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/user/UserFateStore.java
@@ -107,7 +107,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
}
var status = newMutator(fateId).requireStatus().putStatus(TStatus.NEW)
-
.putCreateTime(System.currentTimeMillis()).putInitReservationVal().tryMutate();
+ .putCreateTime(System.currentTimeMillis()).tryMutate();
switch (status) {
case ACCEPTED:
@@ -137,8 +137,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
// Only need to retry if it is UNKNOWN
for (int attempt = 0; attempt < maxAttempts; attempt++) {
status =
newMutator(fateId).requireStatus().putStatus(TStatus.NEW).putKey(fateKey)
-
.putReservedTxOnCreation(reservation).putCreateTime(System.currentTimeMillis())
- .tryMutate();
+
.putReservedTx(reservation).putCreateTime(System.currentTimeMillis()).tryMutate();
if (status != FateMutator.Status.UNKNOWN) {
break;
}
@@ -182,9 +181,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
fateKeySeen = Optional.of(FateKey.deserialize(val.get()));
break;
case TxColumnFamily.RESERVATION:
- if (FateReservation.isFateReservation(val.get())) {
- reservationSeen =
Optional.of(FateReservation.deserialize(val.get()));
- }
+ reservationSeen =
Optional.of(FateReservation.deserialize(val.get()));
break;
default:
throw new IllegalStateException("Unexpected column seen: " +
colf + ":" + colq);
@@ -231,7 +228,9 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
// Create a unique FateReservation for this reservation attempt
FateReservation reservation = FateReservation.from(lockID,
UUID.randomUUID());
- FateMutator.Status status =
newMutator(fateId).putReservedTx(reservation).tryMutate();
+ // requiring any status prevents creating an entry if the fate id doesn't
exist
+ FateMutator.Status status =
+
newMutator(fateId).requireStatus(TStatus.values()).putReservedTx(reservation).tryMutate();
if (status.equals(FateMutator.Status.ACCEPTED)) {
return Optional.of(new FateTxStoreImpl(fateId, reservation));
} else if (status.equals(FateMutator.Status.UNKNOWN)) {
@@ -246,10 +245,9 @@ public class UserFateStore<T> extends AbstractFateStore<T>
{
scanner.setRange(getRow(fateId));
scanner.fetchColumn(TxColumnFamily.RESERVATION_COLUMN.getColumnFamily(),
TxColumnFamily.RESERVATION_COLUMN.getColumnQualifier());
- FateReservation persistedRes = scanner.stream()
- .filter(entry ->
FateReservation.isFateReservation(entry.getValue().get()))
- .map(entry ->
FateReservation.deserialize(entry.getValue().get())).findFirst()
- .orElse(null);
+ FateReservation persistedRes =
+ scanner.stream().map(entry ->
FateReservation.deserialize(entry.getValue().get()))
+ .findFirst().orElse(null);
if (persistedRes != null && persistedRes.equals(reservation)) {
return Optional.of(new FateTxStoreImpl(fateId, reservation));
}
@@ -318,9 +316,7 @@ public class UserFateStore<T> extends AbstractFateStore<T> {
status = TStatus.valueOf(val.toString());
break;
case TxColumnFamily.RESERVATION:
- if (FateReservation.isFateReservation(val.get())) {
- reservation = FateReservation.deserialize(val.get());
- }
+ reservation = FateReservation.deserialize(val.get());
break;
default:
throw new IllegalStateException("Unexpected column seen: " +
colf + ":" + colq);
diff --git
a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java
b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java
index 51002ee574..263f17e440 100644
--- a/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java
+++ b/core/src/main/java/org/apache/accumulo/core/fate/zookeeper/ZooUtil.java
@@ -28,6 +28,7 @@ import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Objects;
import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.data.InstanceId;
@@ -92,6 +93,24 @@ public class ZooUtil {
public String toString() {
return " path = " + path + " node = " + node + " eid = " +
Long.toHexString(eid);
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == this) {
+ return true;
+ }
+ if (obj instanceof LockID) {
+ LockID other = (LockID) obj;
+ return this.path.equals(other.path) && this.node.equals(other.node)
+ && this.eid == other.eid;
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(path, node, eid);
+ }
}
// Need to use Collections.unmodifiableList() instead of List.of() or
List.copyOf(), because
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
index f5e537394d..edd6a53859 100644
--- a/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/fate/MultipleStoresIT.java
@@ -421,8 +421,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
// Verify store1 has the transactions reserved and that they were reserved
with lock1
reservations = store1.getActiveReservations();
assertEquals(allIds, reservations.keySet());
- reservations.values().forEach(
- res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1,
res.getLockID())));
+ reservations.values().forEach(res -> assertEquals(lock1, res.getLockID()));
if (isUserStore) {
store2 = new UserFateStore<>(client, tableName, lock2, isLockHeld);
@@ -434,8 +433,7 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
// store1
reservations = store2.getActiveReservations();
assertEquals(allIds, reservations.keySet());
- reservations.values().forEach(
- res -> assertTrue(FateStore.FateReservation.locksAreEqual(lock1,
res.getLockID())));
+ reservations.values().forEach(res -> assertEquals(lock1, res.getLockID()));
// Simulate what would happen if the Manager using the Fate object (fate1)
died.
// isLockHeld would return false for the LockId of the Manager that died
(in this case, lock1)
@@ -455,8 +453,8 @@ public class MultipleStoresIT extends SharedMiniClusterBase
{
// the workers for fate1 are hung up
Wait.waitFor(() -> {
Map<FateId,FateStore.FateReservation> store2Reservations =
store2.getActiveReservations();
- boolean allReservedWithLock2 = store2Reservations.values().stream()
- .allMatch(entry ->
FateStore.FateReservation.locksAreEqual(entry.getLockID(), lock2));
+ boolean allReservedWithLock2 =
+ store2Reservations.values().stream().allMatch(entry ->
entry.getLockID().equals(lock2));
return store2Reservations.keySet().equals(allIds) &&
allReservedWithLock2;
}, fate1.getDeadResCleanupDelay().toMillis() * 2);
diff --git
a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java
b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java
index fe16e2b014..1d80a5fb9b 100644
---
a/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java
+++
b/test/src/main/java/org/apache/accumulo/test/fate/user/FateMutatorImplIT.java
@@ -181,29 +181,14 @@ public class FateMutatorImplIT extends
SharedMiniClusterBase {
ClientContext context = (ClientContext) client;
FateId fateId = FateId.from(FateInstanceType.USER, UUID.randomUUID());
- FateId fateId1 = FateId.from(FateInstanceType.USER, UUID.randomUUID());
ZooUtil.LockID lockID = new ZooUtil.LockID("/locks", "L1", 50);
FateStore.FateReservation reservation =
FateStore.FateReservation.from(lockID, UUID.randomUUID());
FateStore.FateReservation wrongReservation =
FateStore.FateReservation.from(lockID, UUID.randomUUID());
- // Ensure that we cannot do anything in the column until it is
initialized
- FateMutator.Status status =
- new FateMutatorImpl<>(context, table,
fateId).putReservedTx(reservation).tryMutate();
- assertEquals(REJECTED, status);
- status =
- new FateMutatorImpl<>(context, table,
fateId).putUnreserveTx(reservation).tryMutate();
- assertEquals(REJECTED, status);
-
- // Initialize the column and ensure we can't do it twice
- status = new FateMutatorImpl<>(context, table,
fateId).putInitReservationVal().tryMutate();
- assertEquals(ACCEPTED, status);
- status = new FateMutatorImpl<>(context, table,
fateId).putInitReservationVal().tryMutate();
- assertEquals(REJECTED, status);
-
// Ensure that reserving is the only thing we can do
- status =
+ FateMutator.Status status =
new FateMutatorImpl<>(context, table,
fateId).putUnreserveTx(reservation).tryMutate();
assertEquals(REJECTED, status);
status = new FateMutatorImpl<>(context, table,
fateId).putReservedTx(reservation).tryMutate();
@@ -226,20 +211,6 @@ public class FateMutatorImplIT extends
SharedMiniClusterBase {
status =
new FateMutatorImpl<>(context, table,
fateId).putUnreserveTx(reservation).tryMutate();
assertEquals(REJECTED, status);
-
- // Verify putReservedTxOnCreation works as expected
- status = new FateMutatorImpl<>(context, table,
fateId1).putReservedTxOnCreation(reservation)
- .tryMutate();
- assertEquals(ACCEPTED, status);
- status = new FateMutatorImpl<>(context, table,
fateId1).putReservedTxOnCreation(reservation)
- .tryMutate();
- assertEquals(REJECTED, status);
- status =
- new FateMutatorImpl<>(context, table,
fateId1).putUnreserveTx(reservation).tryMutate();
- assertEquals(ACCEPTED, status);
- status = new FateMutatorImpl<>(context, table,
fateId1).putReservedTxOnCreation(reservation)
- .tryMutate();
- assertEquals(REJECTED, status);
}
}