This is an automated email from the ASF dual-hosted git repository. benedict pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra-accord.git
commit 7242cb209bb723fc6ba0e7225974b8ec8f86630b Author: Benedict Elliott Smith <bened...@apache.org> AuthorDate: Mon May 12 14:37:01 2025 +0100 Follow-up to CASSANDRA-20641; propagate forceUpdate further patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20641 --- .../src/main/java/accord/api/ProgressLog.java | 4 ++-- .../java/accord/impl/InMemoryCommandStore.java | 10 +++++----- .../impl/progresslog/DefaultProgressLog.java | 8 ++++---- .../src/main/java/accord/local/Commands.java | 2 +- .../src/main/java/accord/local/SafeCommand.java | 16 +++++++++++++--- .../main/java/accord/local/SafeCommandStore.java | 22 +++++++++++----------- .../java/accord/local/cfk/CommandsForKeyTest.java | 2 +- 7 files changed, 37 insertions(+), 27 deletions(-) diff --git a/accord-core/src/main/java/accord/api/ProgressLog.java b/accord-core/src/main/java/accord/api/ProgressLog.java index a7a141ad..5404da9d 100644 --- a/accord-core/src/main/java/accord/api/ProgressLog.java +++ b/accord-core/src/main/java/accord/api/ProgressLog.java @@ -179,7 +179,7 @@ public interface ProgressLog /** * Record an updated local status for the transaction, to clear any waiting state it satisfies. */ - void update(SafeCommandStore safeStore, TxnId txnId, Command before, Command after); + void update(SafeCommandStore safeStore, TxnId txnId, Command before, Command after, boolean force); /** * Process a remote asynchronous callback. @@ -234,7 +234,7 @@ public interface ProgressLog class NoOpProgressLog implements ProgressLog { - @Override public void update(SafeCommandStore safeStore, TxnId txnId, Command before, Command after) {} + @Override public void update(SafeCommandStore safeStore, TxnId txnId, Command before, Command after, boolean force) {} @Override public void remoteCallback(SafeCommandStore safeStore, SafeCommand safeCommand, SaveStatus remoteStatus, int callbackId, Node.Id from) {} @Override public void waiting(BlockedUntil blockedUntil, SafeCommandStore safeStore, SafeCommand blockedBy, Route<?> blockedOnRoute, Participants<?> blockedOnParticipants, StoreParticipants participants) {} @Override public void invalidIfUncommitted(TxnId txnId) {} diff --git a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java index 64103a3d..885fd7ae 100644 --- a/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java +++ b/accord-core/src/main/java/accord/impl/InMemoryCommandStore.java @@ -652,9 +652,9 @@ public abstract class InMemoryCommandStore extends CommandStore } @Override - protected void update(Command prev, Command updated) + protected void update(Command prev, Command updated, boolean force) { - super.update(prev, updated); + super.update(prev, updated, force); TxnId txnId = updated.txnId(); if (txnId.domain() != Domain.Range) @@ -782,10 +782,10 @@ public abstract class InMemoryCommandStore extends CommandStore } @Override - public void updateExclusiveSyncPoint(Command prev, Command updated) + public void updateExclusiveSyncPoint(Command prev, Command updated, boolean force) { - super.updateExclusiveSyncPoint(prev, updated); - if (updated.txnId().kind() != Txn.Kind.ExclusiveSyncPoint || updated.txnId().domain() != Range || !updated.hasBeen(Applied) || prev.hasBeen(Applied) || updated.hasBeen(Truncated)) return; + super.updateExclusiveSyncPoint(prev, updated, force); + if (updated.txnId().kind() != Txn.Kind.ExclusiveSyncPoint || updated.txnId().domain() != Range || !updated.hasBeen(Applied) || (prev.hasBeen(Applied) && !force) || updated.hasBeen(Truncated)) return; Participants<?> covering = updated.participants().touches(); for (Map.Entry<TxnId, GlobalCommand> entry : commandStore().commands.headMap(updated.txnId(), false).entrySet()) diff --git a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java index d95f4af5..72e2b6de 100644 --- a/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java +++ b/accord-core/src/main/java/accord/impl/progresslog/DefaultProgressLog.java @@ -187,7 +187,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor } @Override - public void update(SafeCommandStore safeStore, TxnId txnId, Command before, Command after) + public void update(SafeCommandStore safeStore, TxnId txnId, Command before, Command after, boolean force) { if (!txnId.isVisible()) return; @@ -195,7 +195,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor TxnState state = null; Route<?> beforeRoute = before.route(); Route<?> afterRoute = after.route(); - if (beforeRoute == null && afterRoute != null) + if (afterRoute != null && (beforeRoute == null || force)) { RoutingKey homeKey = afterRoute.homeKey(); Ranges coordinateRanges = safeStore.coordinateRanges(txnId); @@ -222,7 +222,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor state.setHomeDone(this); } } - else if (after.durability().isDurableOrInvalidated() && !before.durability().isDurableOrInvalidated()) + else if (after.durability().isDurableOrInvalidated() && (force || !before.durability().isDurableOrInvalidated())) { state = get(txnId); if (state != null) @@ -233,7 +233,7 @@ public class DefaultProgressLog implements ProgressLog, Consumer<SafeCommandStor SaveStatus beforeSaveStatus = before.saveStatus(); SaveStatus afterSaveStatus = after.saveStatus(); - if (beforeSaveStatus == afterSaveStatus) + if (beforeSaveStatus == afterSaveStatus && !force) return; if (state == null) diff --git a/accord-core/src/main/java/accord/local/Commands.java b/accord-core/src/main/java/accord/local/Commands.java index cf48d228..58c088c8 100644 --- a/accord-core/src/main/java/accord/local/Commands.java +++ b/accord-core/src/main/java/accord/local/Commands.java @@ -565,7 +565,7 @@ public class Commands if (command.hasBeen(Applied) && !forceApply) return; - safeCommand.applied(safeStore); + safeCommand.applied(safeStore, forceApply); safeStore.notifyListeners(safeCommand, command); if (t0 >= 0) safeStore.agent().eventListener().onApplied(command, t0); } diff --git a/accord-core/src/main/java/accord/local/SafeCommand.java b/accord-core/src/main/java/accord/local/SafeCommand.java index 7f10b37d..1b77676c 100644 --- a/accord-core/src/main/java/accord/local/SafeCommand.java +++ b/accord-core/src/main/java/accord/local/SafeCommand.java @@ -58,14 +58,19 @@ public abstract class SafeCommand } public <C extends Command> C update(SafeCommandStore safeStore, C update) + { + return update(safeStore, update, false); + } + + private <C extends Command> C update(SafeCommandStore safeStore, C update, boolean force) { Command prev = current(); if (prev == update) return update; set(update); - safeStore.progressLog().update(safeStore, txnId, prev, update); - safeStore.update(prev, update); + safeStore.progressLog().update(safeStore, txnId, prev, update, force); + safeStore.update(prev, update, force); return update; } @@ -90,7 +95,7 @@ public abstract class SafeCommand return prev; Command update = incidentalUpdate(prev.updateParticipants(participants)); - safeStore.progressLog().update(safeStore, txnId, prev, update); + safeStore.progressLog().update(safeStore, txnId, prev, update, false); return update; } @@ -163,6 +168,11 @@ public abstract class SafeCommand return update(safeStore, Command.applied(current().asExecuted())); } + public Command.Executed applied(SafeCommandStore safeStore, boolean forceUpdate) + { + return update(safeStore, Command.applied(current().asExecuted()), forceUpdate); + } + public Command.NotDefined uninitialised() { Invariants.requireArgument(current() == null); diff --git a/accord-core/src/main/java/accord/local/SafeCommandStore.java b/accord-core/src/main/java/accord/local/SafeCommandStore.java index 47012d48..86fc58cd 100644 --- a/accord-core/src/main/java/accord/local/SafeCommandStore.java +++ b/accord-core/src/main/java/accord/local/SafeCommandStore.java @@ -238,14 +238,14 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund */ public abstract PreLoadContext context(); - protected void update(Command prev, Command updated) + protected void update(Command prev, Command updated, boolean force) { - updateMaxConflicts(prev, updated); - updateCommandsForKey(prev, updated); - updateExclusiveSyncPoint(prev, updated); + updateMaxConflicts(prev, updated, force); + updateCommandsForKey(prev, updated, force); + updateExclusiveSyncPoint(prev, updated, force); } - public void updateExclusiveSyncPoint(Command prev, Command updated) + public void updateExclusiveSyncPoint(Command prev, Command updated, boolean force) { if (updated.txnId().kind() != Kind.ExclusiveSyncPoint || updated.txnId().domain() != Range) return; if (updated.route() == null) return; @@ -253,13 +253,13 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund SaveStatus oldSaveStatus = prev == null ? SaveStatus.Uninitialised : prev.saveStatus(); SaveStatus newSaveStatus = updated.saveStatus(); - if (newSaveStatus.known.isDefinitionKnown() && !oldSaveStatus.known.isDefinitionKnown()) + if (newSaveStatus.known.isDefinitionKnown() && (force || !oldSaveStatus.known.isDefinitionKnown())) { Ranges ranges = updated.participants().touches().toRanges(); commandStore().markExclusiveSyncPoint(this, updated.txnId(), ranges); } - if (newSaveStatus == Applied && oldSaveStatus != Applied) + if (newSaveStatus == Applied && (force || oldSaveStatus != Applied)) { Ranges ranges = updated.participants().touches().toRanges(); TxnId txnIdWithFlags = (TxnId)updated.executeAt(); @@ -289,11 +289,11 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund } } - public void updateMaxConflicts(Command prev, Command updated) + public void updateMaxConflicts(Command prev, Command updated, boolean force) { SaveStatus oldSaveStatus = prev == null ? SaveStatus.Uninitialised : prev.saveStatus(); SaveStatus newSaveStatus = updated.saveStatus(); - if (newSaveStatus.status.equals(oldSaveStatus.status) && oldSaveStatus.known.definition().isKnown()) + if (newSaveStatus.status.equals(oldSaveStatus.status) && oldSaveStatus.known.definition().isKnown() && !force) return; TxnId txnId = updated.txnId(); @@ -334,9 +334,9 @@ public abstract class SafeCommandStore implements RangesForEpochSupplier, Redund commandStore().unsafeSetRangesForEpoch(rangesForEpoch); } - public void updateCommandsForKey(Command prev, Command next) + public void updateCommandsForKey(Command prev, Command next, boolean force) { - if (!CommandsForKey.needsUpdate(this, prev, next)) + if (!CommandsForKey.needsUpdate(this, prev, next) && !force) return; TxnId txnId = next.txnId(); diff --git a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java index 8a47120d..6d3b6707 100644 --- a/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java +++ b/accord-core/src/test/java/accord/local/cfk/CommandsForKeyTest.java @@ -813,7 +813,7 @@ public class CommandsForKeyTest } @Override - protected void update(Command prev, Command updated) + protected void update(Command prev, Command updated, boolean force) { } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org