This is an automated email from the ASF dual-hosted git repository.

ifesdjeen pushed a commit to branch ex-sp-pl
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 1a9b54b22199f8828b4eaaa4c4255d3ecc76a868
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Sun Sep 29 11:15:55 2024 +0100

    refine Cleanup logic to support C* cleanup
---
 .../src/main/java/accord/local/Cleanup.java        | 92 ++++++++++++++++++----
 .../src/main/java/accord/local/Commands.java       |  1 +
 .../main/java/accord/local/RedundantBefore.java    |  5 ++
 .../main/java/accord/utils/ReducingRangeMap.java   | 18 ++++-
 4 files changed, 100 insertions(+), 16 deletions(-)

diff --git a/accord-core/src/main/java/accord/local/Cleanup.java 
b/accord-core/src/main/java/accord/local/Cleanup.java
index 888fe0ba..765fddeb 100644
--- a/accord-core/src/main/java/accord/local/Cleanup.java
+++ b/accord-core/src/main/java/accord/local/Cleanup.java
@@ -19,7 +19,11 @@
 package accord.local;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
+import accord.api.VisibleForImplementation;
+import accord.primitives.FullRoute;
+import accord.primitives.Route;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status.Durability;
 import accord.primitives.TxnId;
@@ -52,7 +56,9 @@ public enum Cleanup
     TRUNCATE(TruncatedApply),
     INVALIDATE(Invalidated),
     VESTIGIAL(ErasedOrVestigial),
-    ERASE(Erased);
+    ERASE(Erased),
+    // we can stop storing the record entirely
+    EXPUNGE(Erased);
 
     public final SaveStatus appliesIfNot;
 
@@ -84,20 +90,24 @@ public enum Cleanup
 
     public static Cleanup shouldCleanup(TxnId txnId, SaveStatus status, 
Durability durability, StoreParticipants participants, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
     {
-        return shouldCleanup(txnId, status, durability, participants, 
redundantBefore, durableBefore, true);
+        return shouldCleanupInternal(txnId, status, durability, participants, 
redundantBefore, durableBefore).filter(status);
     }
 
-    public static Cleanup shouldCleanup(TxnId txnId, SaveStatus status, 
Durability durability, StoreParticipants participants, RedundantBefore 
redundantBefore, DurableBefore durableBefore, boolean enforceInvariants)
+    @VisibleForImplementation
+    public static Cleanup shouldCleanupPartial(TxnId txnId, SaveStatus status, 
Durability durability, StoreParticipants participants, RedundantBefore 
redundantBefore, DurableBefore durableBefore)
     {
-        return shouldCleanupInternal(txnId, status, durability, participants, 
redundantBefore, durableBefore, enforceInvariants).filter(status);
+        return shouldCleanupPartialInternal(txnId, status, durability, 
participants, redundantBefore, durableBefore).filter(status);
     }
 
-    private static Cleanup shouldCleanupInternal(TxnId txnId, SaveStatus 
status, Durability durability, StoreParticipants participants, RedundantBefore 
redundantBefore, DurableBefore durableBefore, boolean enforceInvariants)
+    private static Cleanup shouldCleanupInternal(TxnId txnId, SaveStatus 
saveStatus, Durability durability, StoreParticipants participants, 
RedundantBefore redundantBefore, DurableBefore durableBefore)
     {
         if (txnId.kind() == EphemeralRead)
             return NO; // TODO (required): clean-up based on timeout
 
-        if (status == Uninitialised)
+        if (expunge(txnId, saveStatus, durableBefore, redundantBefore))
+            return EXPUNGE;
+
+        if (saveStatus == Uninitialised)
         {
             if (!redundantBefore.isAnyOnAnyEpoch(txnId, participants.touches, 
SHARD_REDUNDANT))
                 return NO;
@@ -116,22 +126,60 @@ public enum Cleanup
             return cleanup;
         }
 
-        if (!status.hasBeen(PreCommitted) && 
redundantBefore.isAnyOnCoordinationEpoch(txnId, participants.owns, 
SHARD_REDUNDANT))
-            return Cleanup.INVALIDATE;
-
         if (!participants.hasFullRoute())
         {
-            if (status == Invalidated && durableBefore.min(txnId) == 
UniversalOrInvalidated)
-                return Cleanup.ERASE;
+            if (!saveStatus.hasBeen(PreCommitted) && 
redundantBefore.isAnyOnCoordinationEpoch(txnId, participants.owns, 
SHARD_REDUNDANT))
+                return Cleanup.INVALIDATE;
 
             return Cleanup.NO;
         }
 
+        return cleanupWithFullRoute(false, participants, txnId, saveStatus, 
durability, redundantBefore, durableBefore);
+    }
+
+    private static Cleanup shouldCleanupPartialInternal(TxnId txnId, 
SaveStatus status, @Nullable Durability durability, @Nullable StoreParticipants 
participants, RedundantBefore redundantBefore, DurableBefore durableBefore)
+    {
+        if (txnId.kind() == EphemeralRead)
+            return NO; // TODO (required): clean-up based on timeout
+
+        if (expunge(txnId, status, durableBefore, redundantBefore))
+            return EXPUNGE;
+
+        if (participants == null)
+            return NO;
+
+        if (!participants.hasFullRoute())
+        {
+            if (!redundantBefore.isAnyOnCoordinationEpoch(txnId, 
participants.owns, SHARD_REDUNDANT))
+                return NO;
+
+            // we only need to keep the outcome if we have it; otherwise we 
can expunge
+            switch (status)
+            {
+                case TruncatedApply:
+                case TruncatedApplyWithOutcome:
+                case Invalidated:
+                    return NO;
+                case PreApplied:
+                case Applied:
+                case Applying:
+                    return TRUNCATE_WITH_OUTCOME;
+                default:
+                    return EXPUNGE;
+            }
+        }
+
+        return cleanupWithFullRoute(true, participants, txnId, status, 
durability, redundantBefore, durableBefore);
+    }
+
+    private static Cleanup cleanupWithFullRoute(boolean isPartial, 
StoreParticipants participants, TxnId txnId, SaveStatus saveStatus, Durability 
durability, RedundantBefore redundantBefore, DurableBefore durableBefore)
+    {
         // We first check if the command is redundant locally, i.e. whether it 
has been applied to all non-faulty replicas of the local shard
         // If not, we don't want to truncate its state else we may make 
catching up for these other replicas much harder
-        RedundantStatus redundant = redundantBefore.status(txnId, 
participants.route);
+        FullRoute<?> route = Route.castToFullRoute(participants.route);
+        RedundantStatus redundant = redundantBefore.status(txnId, route);
         if (redundant == NOT_OWNED)
-            illegalState("Command " + txnId + " that is being loaded is not 
owned by this shard on route " + participants.route);
+            illegalState("Command " + txnId + " that is being loaded is not 
owned by this shard on route " + route);
 
         switch (redundant)
         {
@@ -143,12 +191,12 @@ public enum Cleanup
             case LOCALLY_REDUNDANT:
                 return Cleanup.NO;
             case SHARD_REDUNDANT:
-                if (enforceInvariants && status.hasBeen(PreCommitted) && 
!status.hasBeen(Applied) && redundantBefore.preBootstrapOrStale(txnId, 
participants.owns) != FULLY)
+                if (!isPartial && saveStatus.hasBeen(PreCommitted) && 
!saveStatus.hasBeen(Applied) && redundantBefore.preBootstrapOrStale(txnId, 
participants.owns) != FULLY)
                     illegalState("Loading redundant command that has been 
PreCommitted but not Applied.");
             case WAS_OWNED:
         }
 
-        if (!status.hasBeen(PreCommitted))
+        if (!isPartial && !saveStatus.hasBeen(PreCommitted))
             return INVALIDATE;
 
         Durability min = durableBefore.min(txnId, participants.route);
@@ -172,4 +220,18 @@ public enum Cleanup
                 return Cleanup.ERASE;
         }
     }
+
+    private static boolean expunge(TxnId txnId, SaveStatus saveStatus, 
DurableBefore durableBefore, RedundantBefore redundantBefore)
+    {
+        if (durableBefore.min(txnId) != UniversalOrInvalidated)
+            return false;
+
+        if (saveStatus == Invalidated)
+            return true;
+
+        // TODO (required): we should perhaps weaken this to separately 
account whether remotely and locally redundant?
+        //  i.e., if we know that the shard is remotely durable and we know we 
don't need it locally (e.g. due to bootstrap)
+        //  then we can safely erase. Revisit as part of rationalising 
RedundantBefore registers.
+        return redundantBefore.shardStatus(txnId) == SHARD_REDUNDANT;
+    }
 }
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index 2ddd2dbf..637d791f 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -847,6 +847,7 @@ public class Commands
                 break;
 
             case ERASE:
+            case EXPUNGE:
                 Invariants.checkState(command.saveStatus().compareTo(Erased) < 
0);
                 result = erased(command);
                 break;
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 1ba6e207..638d7043 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -522,6 +522,11 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
         return Entry.get(entry, txnId, executeAt);
     }
 
+    public RedundantStatus shardStatus(TxnId txnId)
+    {
+        return foldl(Entry::getAndMerge, NOT_OWNED, txnId, null, i -> i == 
LIVE);
+    }
+
     public RedundantStatus status(TxnId txnId, Participants<?> participants)
     {   // TODO (required): consider how the use of txnId for executeAt 
affects exclusive sync points for cleanup
         //    may want to issue synthetic sync points for local evaluation in 
later epochs
diff --git a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java 
b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
index 4e2854c8..59fabf70 100644
--- a/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
+++ b/accord-core/src/main/java/accord/utils/ReducingRangeMap.java
@@ -122,7 +122,7 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
         }
     }
 
-    public <V2, P1, P2> V2 foldl(AbstractKeys<?> keys, ReduceFunction<V, V2, 
P1, P2> fold, V2 accumulator, P1 p1, P2 p2, Predicate<V2> terminate)
+    public <A, P1, P2> A foldl(AbstractKeys<?> keys, ReduceFunction<V, A, P1, 
P2> fold, A accumulator, P1 p1, P2 p2, Predicate<A> terminate)
     {
         if (values.length == 0)
             return accumulator;
@@ -156,6 +156,22 @@ public class ReducingRangeMap<V> extends 
ReducingIntervalMap<RoutingKey, V>
         return accumulator;
     }
 
+    public <A, P1, P2> A foldl(QuadFunction<V, A, P1, P2, A> fold, A 
accumulator, P1 p1, P2 p2, Predicate<A> terminate)
+    {
+        if (values.length == 0)
+            return accumulator;
+
+        for (V value : values)
+        {
+            if (value == null) continue;
+            accumulator = fold.apply(value, accumulator, p1, p2);
+            if (terminate.test(accumulator))
+                return accumulator;
+        }
+
+        return accumulator;
+    }
+
     public <V2, P1, P2> V2 foldl(AbstractRanges ranges, ReduceFunction<V, V2, 
P1, P2> fold, V2 accumulator, P1 p1, P2 p2, Predicate<V2> terminate)
     {
         if (values.length == 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to