This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 08ee5ce1 support filtering by GC commands before loading
08ee5ce1 is described below

commit 08ee5ce1c6301201ccaf7d580a6af289ab4c5765
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Fri Oct 4 13:04:57 2024 +0100

    support filtering by GC commands before loading
    
    special case recovery, to avoid loading range commands except where 
necessary
---
 .../src/main/java/accord/impl/InMemoryCommandStore.java  |  1 +
 .../java/accord/impl/progresslog/DefaultProgressLog.java |  5 ++++-
 accord-core/src/main/java/accord/local/CommandStore.java |  3 ++-
 accord-core/src/main/java/accord/local/KeyHistory.java   |  1 +
 .../src/main/java/accord/local/RedundantBefore.java      | 16 ++++++++++++++++
 .../src/main/java/accord/messages/BeginRecovery.java     |  2 +-
 .../src/main/java/accord/utils/PersistentField.java      |  6 +-----
 7 files changed, 26 insertions(+), 8 deletions(-)

diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 385fbd16..67bd325f 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -465,6 +465,7 @@ public abstract class InMemoryCommandStore extends 
CommandStore
                         case NONE:
                             continue;
                         case COMMANDS:
+                        case RECOVERY:
                             commandsForKey.put(key, 
commandsForKey((RoutingKey) key).createSafeReference());
                             break;
                         case TIMESTAMPS:
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java 
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index 31ae8adc..9d1aea56 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -43,6 +43,7 @@ import accord.primitives.Participants;
 import accord.primitives.ProgressToken;
 import accord.primitives.Ranges;
 import accord.primitives.Route;
+import accord.primitives.Txn;
 import accord.primitives.TxnId;
 import accord.utils.Invariants;
 import accord.utils.LogGroupTimers;
@@ -355,7 +356,9 @@ public class DefaultProgressLog implements ProgressLog, 
Runnable
         if (update != command)
             command = blockedBy.updateAttributes(safeStore, update);
 
-        
Invariants.checkState(safeStore.ranges().allSince(command.txnId().epoch()).intersects(command.participants().hasTouched()));
+        // TODO (required): tighten up ExclusiveSyncPoint range bounds
+        Invariants.checkState((command.txnId().is(Txn.Kind.ExclusiveSyncPoint) 
? safeStore.ranges().all()
+                                                                               
: 
safeStore.ranges().allSince(command.txnId().epoch())).intersects(command.participants().hasTouched()));
 
         // TODO (consider): consider triggering a preemption of existing 
coordinator (if any) in some circumstances;
         //                  today, an LWT can pre-empt more efficiently (i.e. 
instantly) a failed operation whereas Accord will
diff --git a/accord-core/src/main/java/accord/local/CommandStore.java 
b/accord-core/src/main/java/accord/local/CommandStore.java
index 726d13f3..afbafc71 100644
--- a/accord-core/src/main/java/accord/local/CommandStore.java
+++ b/accord-core/src/main/java/accord/local/CommandStore.java
@@ -69,6 +69,7 @@ import static accord.primitives.Routables.Slice.Minimal;
 import static accord.primitives.Txn.Kind.ExclusiveSyncPoint;
 import static accord.utils.Invariants.checkState;
 import static accord.utils.Invariants.illegalState;
+import static accord.utils.Invariants.nonNull;
 
 /**
  * Single threaded internal shard of accord transaction metadata
@@ -220,7 +221,7 @@ public abstract class CommandStore implements AgentExecutor
 
     protected void unsafeSetRangesForEpoch(RangesForEpoch newRangesForEpoch)
     {
-        rangesForEpoch = newRangesForEpoch;
+        rangesForEpoch = nonNull(newRangesForEpoch);
     }
 
     public abstract boolean inStore();
diff --git a/accord-core/src/main/java/accord/local/KeyHistory.java 
b/accord-core/src/main/java/accord/local/KeyHistory.java
index 7f061d6e..40dfe4ef 100644
--- a/accord-core/src/main/java/accord/local/KeyHistory.java
+++ b/accord-core/src/main/java/accord/local/KeyHistory.java
@@ -26,6 +26,7 @@ public enum KeyHistory
 {
     TIMESTAMPS,
     COMMANDS,
+    RECOVERY,
     NONE;
 
     public boolean isNone()
diff --git a/accord-core/src/main/java/accord/local/RedundantBefore.java 
b/accord-core/src/main/java/accord/local/RedundantBefore.java
index 41b84ae8..ed97fade 100644
--- a/accord-core/src/main/java/accord/local/RedundantBefore.java
+++ b/accord-core/src/main/java/accord/local/RedundantBefore.java
@@ -299,6 +299,17 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
             return safeToRead;
         }
 
+        static TxnId minGcBefore(Entry entry, @Nullable TxnId minGcBefore)
+        {
+            if (entry == null)
+                return minGcBefore;
+
+            if (minGcBefore == null)
+                return entry.gcBefore;
+
+            return TxnId.min(minGcBefore, entry.gcBefore);
+        }
+
         static Ranges expectToExecute(Entry entry, @Nonnull Ranges 
executeRanges, TxnId txnId, @Nullable Timestamp executeAt)
         {
             if (entry == null || (executeAt == null ? entry.outOfBounds(txnId) 
: entry.outOfBounds(txnId, executeAt)))
@@ -574,6 +585,11 @@ public class RedundantBefore extends 
ReducingRangeMap<RedundantBefore.Entry>
         return foldl(ranges, Entry::validateSafeToRead, ranges, 
forBootstrapAt, null, r -> false);
     }
 
+    public TxnId minGcBefore(Routables<?> participants)
+    {
+        return TxnId.nonNullOrMax(TxnId.NONE, foldl(participants, 
Entry::minGcBefore, null, ignore -> false));
+    }
+
     /**
      * Subtract any ranges we consider stale or pre-bootstrap
      */
diff --git a/accord-core/src/main/java/accord/messages/BeginRecovery.java 
b/accord-core/src/main/java/accord/messages/BeginRecovery.java
index a2afd1b3..ad8bb5a0 100644
--- a/accord-core/src/main/java/accord/messages/BeginRecovery.java
+++ b/accord-core/src/main/java/accord/messages/BeginRecovery.java
@@ -215,7 +215,7 @@ public class BeginRecovery extends 
TxnRequest.WithUnsynced<BeginRecovery.Recover
     @Override
     public KeyHistory keyHistory()
     {
-        return KeyHistory.COMMANDS;
+        return KeyHistory.RECOVERY;
     }
 
     @Override
diff --git a/accord-core/src/main/java/accord/utils/PersistentField.java 
b/accord-core/src/main/java/accord/utils/PersistentField.java
index 3d872185..da66ffdc 100644
--- a/accord-core/src/main/java/accord/utils/PersistentField.java
+++ b/accord-core/src/main/java/accord/utils/PersistentField.java
@@ -102,11 +102,7 @@ public class PersistentField<Input, Saved>
         Saved newValue = merge.apply(inputValue, startingValue);
         this.latestPending = newValue;
         int id = ++nextId;
-
-        synchronized (this)
-        {
-            pending.add(new Pending<>(id, newValue));
-        }
+        pending.add(new Pending<>(id, newValue));
 
         AsyncResult<?> pendingWrite = persister.persist(inputValue, newValue);
         pendingWrite.addCallback((success, fail) -> {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to