This is an automated email from the ASF dual-hosted git repository. ifesdjeen pushed a commit to branch ex-sp-pl in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 1a9b54b22199f8828b4eaaa4c4255d3ecc76a868 Author: Benedict Elliott Smith <[email protected]> AuthorDate: Sun Sep 29 11:15:55 2024 +0100 refine Cleanup logic to support C* cleanup --- .../src/main/java/accord/local/Cleanup.java | 92 ++++++++++++++++++---- .../src/main/java/accord/local/Commands.java | 1 + .../main/java/accord/local/RedundantBefore.java | 5 ++ .../main/java/accord/utils/ReducingRangeMap.java | 18 ++++- 4 files changed, 100 insertions(+), 16 deletions(-) diff --git a/accord-core/src/main/java/accord/local/Cleanup.java b/accord-core/src/main/java/accord/local/Cleanup.java index 888fe0ba..765fddeb 100644 --- a/accord-core/src/main/java/accord/local/Cleanup.java +++ b/accord-core/src/main/java/accord/local/Cleanup.java @@ -19,7 +19,11 @@ package accord.local; import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import accord.api.VisibleForImplementation; +import accord.primitives.FullRoute; +import accord.primitives.Route; import accord.primitives.SaveStatus; import accord.primitives.Status.Durability; import accord.primitives.TxnId; @@ -52,7 +56,9 @@ public enum Cleanup TRUNCATE(TruncatedApply), INVALIDATE(Invalidated), VESTIGIAL(ErasedOrVestigial), - ERASE(Erased); + ERASE(Erased), + // we can stop storing the record entirely + EXPUNGE(Erased); public final SaveStatus appliesIfNot; @@ -84,20 +90,24 @@ public enum Cleanup public static Cleanup shouldCleanup(TxnId txnId, SaveStatus status, Durability durability, StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore durableBefore) { - return shouldCleanup(txnId, status, durability, participants, redundantBefore, durableBefore, true); + return shouldCleanupInternal(txnId, status, durability, participants, redundantBefore, durableBefore).filter(status); } - public static Cleanup shouldCleanup(TxnId txnId, SaveStatus status, Durability durability, StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore durableBefore, boolean enforceInvariants) + @VisibleForImplementation + public static Cleanup shouldCleanupPartial(TxnId txnId, SaveStatus status, Durability durability, StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore durableBefore) { - return shouldCleanupInternal(txnId, status, durability, participants, redundantBefore, durableBefore, enforceInvariants).filter(status); + return shouldCleanupPartialInternal(txnId, status, durability, participants, redundantBefore, durableBefore).filter(status); } - private static Cleanup shouldCleanupInternal(TxnId txnId, SaveStatus status, Durability durability, StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore durableBefore, boolean enforceInvariants) + private static Cleanup shouldCleanupInternal(TxnId txnId, SaveStatus saveStatus, Durability durability, StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore durableBefore) { if (txnId.kind() == EphemeralRead) return NO; // TODO (required): clean-up based on timeout - if (status == Uninitialised) + if (expunge(txnId, saveStatus, durableBefore, redundantBefore)) + return EXPUNGE; + + if (saveStatus == Uninitialised) { if (!redundantBefore.isAnyOnAnyEpoch(txnId, participants.touches, SHARD_REDUNDANT)) return NO; @@ -116,22 +126,60 @@ public enum Cleanup return cleanup; } - if (!status.hasBeen(PreCommitted) && redundantBefore.isAnyOnCoordinationEpoch(txnId, participants.owns, SHARD_REDUNDANT)) - return Cleanup.INVALIDATE; - if (!participants.hasFullRoute()) { - if (status == Invalidated && durableBefore.min(txnId) == UniversalOrInvalidated) - return Cleanup.ERASE; + if (!saveStatus.hasBeen(PreCommitted) && redundantBefore.isAnyOnCoordinationEpoch(txnId, participants.owns, SHARD_REDUNDANT)) + return Cleanup.INVALIDATE; return Cleanup.NO; } + return cleanupWithFullRoute(false, participants, txnId, saveStatus, durability, redundantBefore, durableBefore); + } + + private static Cleanup shouldCleanupPartialInternal(TxnId txnId, SaveStatus status, @Nullable Durability durability, @Nullable StoreParticipants participants, RedundantBefore redundantBefore, DurableBefore durableBefore) + { + if (txnId.kind() == EphemeralRead) + return NO; // TODO (required): clean-up based on timeout + + if (expunge(txnId, status, durableBefore, redundantBefore)) + return EXPUNGE; + + if (participants == null) + return NO; + + if (!participants.hasFullRoute()) + { + if (!redundantBefore.isAnyOnCoordinationEpoch(txnId, participants.owns, SHARD_REDUNDANT)) + return NO; + + // we only need to keep the outcome if we have it; otherwise we can expunge + switch (status) + { + case TruncatedApply: + case TruncatedApplyWithOutcome: + case Invalidated: + return NO; + case PreApplied: + case Applied: + case Applying: + return TRUNCATE_WITH_OUTCOME; + default: + return EXPUNGE; + } + } + + return cleanupWithFullRoute(true, participants, txnId, status, durability, redundantBefore, durableBefore); + } + + private static Cleanup cleanupWithFullRoute(boolean isPartial, StoreParticipants participants, TxnId txnId, SaveStatus saveStatus, Durability durability, RedundantBefore redundantBefore, DurableBefore durableBefore) + { // We first check if the command is redundant locally, i.e. whether it has been applied to all non-faulty replicas of the local shard // If not, we don't want to truncate its state else we may make catching up for these other replicas much harder - RedundantStatus redundant = redundantBefore.status(txnId, participants.route); + FullRoute<?> route = Route.castToFullRoute(participants.route); + RedundantStatus redundant = redundantBefore.status(txnId, route); if (redundant == NOT_OWNED) - illegalState("Command " + txnId + " that is being loaded is not owned by this shard on route " + participants.route); + illegalState("Command " + txnId + " that is being loaded is not owned by this shard on route " + route); switch (redundant) { @@ -143,12 +191,12 @@ public enum Cleanup case LOCALLY_REDUNDANT: return Cleanup.NO; case SHARD_REDUNDANT: - if (enforceInvariants && status.hasBeen(PreCommitted) && !status.hasBeen(Applied) && redundantBefore.preBootstrapOrStale(txnId, participants.owns) != FULLY) + if (!isPartial && saveStatus.hasBeen(PreCommitted) && !saveStatus.hasBeen(Applied) && redundantBefore.preBootstrapOrStale(txnId, participants.owns) != FULLY) illegalState("Loading redundant command that has been PreCommitted but not Applied."); case WAS_OWNED: } - if (!status.hasBeen(PreCommitted)) + if (!isPartial && !saveStatus.hasBeen(PreCommitted)) return INVALIDATE; Durability min = durableBefore.min(txnId, participants.route); @@ -172,4 +220,18 @@ public enum Cleanup return Cleanup.ERASE; } } + + private static boolean expunge(TxnId txnId, SaveStatus saveStatus, DurableBefore durableBefore, RedundantBefore redundantBefore) + { + if (durableBefore.min(txnId) != UniversalOrInvalidated) + return false; + + if (saveStatus == Invalidated) + return true; + + // TODO (required): we should perhaps weaken this to separately account whether remotely and locally redundant? + // i.e., if we know that the shard is remotely durable and we know we don't need it locally (e.g. due to bootstrap) + // then we can safely erase. Revisit as part of rationalising RedundantBefore registers. + return redundantBefore.shardStatus(txnId) == SHARD_REDUNDANT; + } } diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index 2ddd2dbf..637d791f 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -847,6 +847,7 @@ public class Commands break; case ERASE: + case EXPUNGE: Invariants.checkState(command.saveStatus().compareTo(Erased) < 0); result = erased(command); break; diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java b/accord-core/src/main/java/accord/local/RedundantBefore.java index 1ba6e207..638d7043 100644 --- a/accord-core/src/main/java/accord/local/RedundantBefore.java +++ b/accord-core/src/main/java/accord/local/RedundantBefore.java @@ -522,6 +522,11 @@ public class RedundantBefore extends ReducingRangeMap<RedundantBefore.Entry> return Entry.get(entry, txnId, executeAt); } + public RedundantStatus shardStatus(TxnId txnId) + { + return foldl(Entry::getAndMerge, NOT_OWNED, txnId, null, i -> i == LIVE); + } + public RedundantStatus status(TxnId txnId, Participants<?> participants) { // TODO (required): consider how the use of txnId for executeAt affects exclusive sync points for cleanup // may want to issue synthetic sync points for local evaluation in later epochs diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java index 4e2854c8..59fabf70 100644 --- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java +++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java @@ -122,7 +122,7 @@ public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> } } - public <V2, P1, P2> V2 foldl(AbstractKeys<?> keys, ReduceFunction<V, V2, P1, P2> fold, V2 accumulator, P1 p1, P2 p2, Predicate<V2> terminate) + public <A, P1, P2> A foldl(AbstractKeys<?> keys, ReduceFunction<V, A, P1, P2> fold, A accumulator, P1 p1, P2 p2, Predicate<A> terminate) { if (values.length == 0) return accumulator; @@ -156,6 +156,22 @@ public class ReducingRangeMap<V> extends ReducingIntervalMap<RoutingKey, V> return accumulator; } + public <A, P1, P2> A foldl(QuadFunction<V, A, P1, P2, A> fold, A accumulator, P1 p1, P2 p2, Predicate<A> terminate) + { + if (values.length == 0) + return accumulator; + + for (V value : values) + { + if (value == null) continue; + accumulator = fold.apply(value, accumulator, p1, p2); + if (terminate.test(accumulator)) + return accumulator; + } + + return accumulator; + } + public <V2, P1, P2> V2 foldl(AbstractRanges ranges, ReduceFunction<V, V2, P1, P2> fold, V2 accumulator, P1 p1, P2 p2, Predicate<V2> terminate) { if (values.length == 0) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
