This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git

commit 7242cb209bb723fc6ba0e7225974b8ec8f86630b
Author: Benedict Elliott Smith <bened...@apache.org>
AuthorDate: Mon May 12 14:37:01 2025 +0100

    Follow-up to CASSANDRA-20641; propagate forceUpdate further
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20641
---
 .../src/main/java/accord/api/ProgressLog.java      |  4 ++--
 .../java/accord/impl/InMemoryCommandStore.java     | 10 +++++-----
 .../impl/progresslog/DefaultProgressLog.java       |  8 ++++----
 .../src/main/java/accord/local/Commands.java       |  2 +-
 .../src/main/java/accord/local/SafeCommand.java    | 16 +++++++++++++---
 .../main/java/accord/local/SafeCommandStore.java   | 22 +++++++++++-----------
 .../java/accord/local/cfk/CommandsForKeyTest.java  |  2 +-
 7 files changed, 37 insertions(+), 27 deletions(-)

diff --git a/accord-core/src/main/java/accord/api/ProgressLog.java 
b/accord-core/src/main/java/accord/api/ProgressLog.java
index a7a141ad..5404da9d 100644
--- a/accord-core/src/main/java/accord/api/ProgressLog.java
+++ b/accord-core/src/main/java/accord/api/ProgressLog.java
@@ -179,7 +179,7 @@ public interface ProgressLog
     /**
      * Record an updated local status for the transaction, to clear any 
waiting state it satisfies.
      */
-    void update(SafeCommandStore safeStore, TxnId txnId, Command before, 
Command after);
+    void update(SafeCommandStore safeStore, TxnId txnId, Command before, 
Command after, boolean force);
 
     /**
      * Process a remote asynchronous callback.
@@ -234,7 +234,7 @@ public interface ProgressLog
 
     class NoOpProgressLog implements ProgressLog
     {
-        @Override public void update(SafeCommandStore safeStore, TxnId txnId, 
Command before, Command after) {}
+        @Override public void update(SafeCommandStore safeStore, TxnId txnId, 
Command before, Command after, boolean force) {}
         @Override public void remoteCallback(SafeCommandStore safeStore, 
SafeCommand safeCommand, SaveStatus remoteStatus, int callbackId, Node.Id from) 
{}
         @Override public void waiting(BlockedUntil blockedUntil, 
SafeCommandStore safeStore, SafeCommand blockedBy, Route<?> blockedOnRoute, 
Participants<?> blockedOnParticipants, StoreParticipants participants) {}
         @Override public void invalidIfUncommitted(TxnId txnId) {}
diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java 
b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
index 64103a3d..885fd7ae 100644
--- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
+++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java
@@ -652,9 +652,9 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         }
 
         @Override
-        protected void update(Command prev, Command updated)
+        protected void update(Command prev, Command updated, boolean force)
         {
-            super.update(prev, updated);
+            super.update(prev, updated, force);
 
             TxnId txnId = updated.txnId();
             if (txnId.domain() != Domain.Range)
@@ -782,10 +782,10 @@ public abstract class InMemoryCommandStore extends 
CommandStore
         }
 
         @Override
-        public void updateExclusiveSyncPoint(Command prev, Command updated)
+        public void updateExclusiveSyncPoint(Command prev, Command updated, 
boolean force)
         {
-            super.updateExclusiveSyncPoint(prev, updated);
-            if (updated.txnId().kind() != Txn.Kind.ExclusiveSyncPoint || 
updated.txnId().domain() != Range || !updated.hasBeen(Applied) || 
prev.hasBeen(Applied) || updated.hasBeen(Truncated)) return;
+            super.updateExclusiveSyncPoint(prev, updated, force);
+            if (updated.txnId().kind() != Txn.Kind.ExclusiveSyncPoint || 
updated.txnId().domain() != Range || !updated.hasBeen(Applied) || 
(prev.hasBeen(Applied) && !force) || updated.hasBeen(Truncated)) return;
 
             Participants<?> covering = updated.participants().touches();
             for (Map.Entry<TxnId, GlobalCommand> entry : 
commandStore().commands.headMap(updated.txnId(), false).entrySet())
diff --git 
a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java 
b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
index d95f4af5..72e2b6de 100644
--- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
+++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java
@@ -187,7 +187,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
     }
 
     @Override
-    public void update(SafeCommandStore safeStore, TxnId txnId, Command 
before, Command after)
+    public void update(SafeCommandStore safeStore, TxnId txnId, Command 
before, Command after, boolean force)
     {
         if (!txnId.isVisible())
             return;
@@ -195,7 +195,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
         TxnState state = null;
         Route<?> beforeRoute = before.route();
         Route<?> afterRoute = after.route();
-        if (beforeRoute == null && afterRoute != null)
+        if (afterRoute != null && (beforeRoute == null || force))
         {
             RoutingKey homeKey = afterRoute.homeKey();
             Ranges coordinateRanges = safeStore.coordinateRanges(txnId);
@@ -222,7 +222,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
                 state.setHomeDone(this);
             }
         }
-        else if (after.durability().isDurableOrInvalidated() && 
!before.durability().isDurableOrInvalidated())
+        else if (after.durability().isDurableOrInvalidated() && (force || 
!before.durability().isDurableOrInvalidated()))
         {
             state = get(txnId);
             if (state != null)
@@ -233,7 +233,7 @@ public class DefaultProgressLog implements ProgressLog, 
Consumer<SafeCommandStor
 
         SaveStatus beforeSaveStatus = before.saveStatus();
         SaveStatus afterSaveStatus = after.saveStatus();
-        if (beforeSaveStatus == afterSaveStatus)
+        if (beforeSaveStatus == afterSaveStatus && !force)
             return;
 
         if (state == null)
diff --git a/accord-core/src/main/java/accord/local/Commands.java 
b/accord-core/src/main/java/accord/local/Commands.java
index cf48d228..58c088c8 100644
--- a/accord-core/src/main/java/accord/local/Commands.java
+++ b/accord-core/src/main/java/accord/local/Commands.java
@@ -565,7 +565,7 @@ public class Commands
         if (command.hasBeen(Applied) && !forceApply)
             return;
 
-        safeCommand.applied(safeStore);
+        safeCommand.applied(safeStore, forceApply);
         safeStore.notifyListeners(safeCommand, command);
         if (t0 >= 0) safeStore.agent().eventListener().onApplied(command, t0);
     }
diff --git a/accord-core/src/main/java/accord/local/SafeCommand.java 
b/accord-core/src/main/java/accord/local/SafeCommand.java
index 7f10b37d..1b77676c 100644
--- a/accord-core/src/main/java/accord/local/SafeCommand.java
+++ b/accord-core/src/main/java/accord/local/SafeCommand.java
@@ -58,14 +58,19 @@ public abstract class SafeCommand
     }
 
     public <C extends Command> C update(SafeCommandStore safeStore, C update)
+    {
+        return update(safeStore, update, false);
+    }
+
+    private <C extends Command> C update(SafeCommandStore safeStore, C update, 
boolean force)
     {
         Command prev = current();
         if (prev == update)
             return update;
 
         set(update);
-        safeStore.progressLog().update(safeStore, txnId, prev, update);
-        safeStore.update(prev, update);
+        safeStore.progressLog().update(safeStore, txnId, prev, update, force);
+        safeStore.update(prev, update, force);
         return update;
     }
 
@@ -90,7 +95,7 @@ public abstract class SafeCommand
             return prev;
 
         Command update = 
incidentalUpdate(prev.updateParticipants(participants));
-        safeStore.progressLog().update(safeStore, txnId, prev, update);
+        safeStore.progressLog().update(safeStore, txnId, prev, update, false);
         return update;
     }
 
@@ -163,6 +168,11 @@ public abstract class SafeCommand
         return update(safeStore, Command.applied(current().asExecuted()));
     }
 
+    public Command.Executed applied(SafeCommandStore safeStore, boolean 
forceUpdate)
+    {
+        return update(safeStore, Command.applied(current().asExecuted()), 
forceUpdate);
+    }
+
     public Command.NotDefined uninitialised()
     {
         Invariants.requireArgument(current() == null);
diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java 
b/accord-core/src/main/java/accord/local/SafeCommandStore.java
index 47012d48..86fc58cd 100644
--- a/accord-core/src/main/java/accord/local/SafeCommandStore.java
+++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java
@@ -238,14 +238,14 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
      */
     public abstract PreLoadContext context();
 
-    protected void update(Command prev, Command updated)
+    protected void update(Command prev, Command updated, boolean force)
     {
-        updateMaxConflicts(prev, updated);
-        updateCommandsForKey(prev, updated);
-        updateExclusiveSyncPoint(prev, updated);
+        updateMaxConflicts(prev, updated, force);
+        updateCommandsForKey(prev, updated, force);
+        updateExclusiveSyncPoint(prev, updated, force);
     }
 
-    public void updateExclusiveSyncPoint(Command prev, Command updated)
+    public void updateExclusiveSyncPoint(Command prev, Command updated, 
boolean force)
     {
         if (updated.txnId().kind() != Kind.ExclusiveSyncPoint || 
updated.txnId().domain() != Range) return;
         if (updated.route() == null) return;
@@ -253,13 +253,13 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
         SaveStatus oldSaveStatus = prev == null ? SaveStatus.Uninitialised : 
prev.saveStatus();
         SaveStatus newSaveStatus = updated.saveStatus();
 
-        if (newSaveStatus.known.isDefinitionKnown() && 
!oldSaveStatus.known.isDefinitionKnown())
+        if (newSaveStatus.known.isDefinitionKnown() && (force || 
!oldSaveStatus.known.isDefinitionKnown()))
         {
             Ranges ranges = updated.participants().touches().toRanges();
             commandStore().markExclusiveSyncPoint(this, updated.txnId(), 
ranges);
         }
 
-        if (newSaveStatus == Applied && oldSaveStatus != Applied)
+        if (newSaveStatus == Applied && (force || oldSaveStatus != Applied))
         {
             Ranges ranges = updated.participants().touches().toRanges();
             TxnId txnIdWithFlags = (TxnId)updated.executeAt();
@@ -289,11 +289,11 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
         }
     }
 
-    public void updateMaxConflicts(Command prev, Command updated)
+    public void updateMaxConflicts(Command prev, Command updated, boolean 
force)
     {
         SaveStatus oldSaveStatus = prev == null ? SaveStatus.Uninitialised : 
prev.saveStatus();
         SaveStatus newSaveStatus = updated.saveStatus();
-        if (newSaveStatus.status.equals(oldSaveStatus.status) && 
oldSaveStatus.known.definition().isKnown())
+        if (newSaveStatus.status.equals(oldSaveStatus.status) && 
oldSaveStatus.known.definition().isKnown() && !force)
             return;
 
         TxnId txnId = updated.txnId();
@@ -334,9 +334,9 @@ public abstract class SafeCommandStore implements 
RangesForEpochSupplier, Redund
         commandStore().unsafeSetRangesForEpoch(rangesForEpoch);
     }
 
-    public void updateCommandsForKey(Command prev, Command next)
+    public void updateCommandsForKey(Command prev, Command next, boolean force)
     {
-        if (!CommandsForKey.needsUpdate(this, prev, next))
+        if (!CommandsForKey.needsUpdate(this, prev, next) && !force)
             return;
 
         TxnId txnId = next.txnId();
diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java 
b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
index 8a47120d..6d3b6707 100644
--- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
+++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java
@@ -813,7 +813,7 @@ public class CommandsForKeyTest
         }
 
         @Override
-        protected void update(Command prev, Command updated)
+        protected void update(Command prev, Command updated, boolean force)
         {
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to