This is an automated email from the ASF dual-hosted git repository.

benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new e855a1c435 Follow-up to CASSANDRA-20222:  - Implement missing parts of 
protocol optimisations, refine some particulars and remove 
MEDIUM_PATH_WAIT_ON_RECOVERY Also fix:  - Deps.txnIds -> Deps.txnIdsWithFlags 
to make clear unsafety, and validate Command isn't created with flags  - Save a 
lossy low/high epoch we're waiting on in WaitingState so that we can 
reconstruct the same Route on callback  - Load any potentially invalidated 
commands we had in ProgressLog to ensure they are ma [...]
e855a1c435 is described below

commit e855a1c435c3e2a73b47cc9b4dc1b8717e4761de
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Jan 30 11:19:33 2025 +0000

    Follow-up to CASSANDRA-20222:
     - Implement missing parts of protocol optimisations, refine some 
particulars and remove MEDIUM_PATH_WAIT_ON_RECOVERY
    Also fix:
     - Deps.txnIds -> Deps.txnIdsWithFlags to make clear unsafety, and validate 
Command isn't created with flags
     - Save a lossy low/high epoch we're waiting on in WaitingState so that we 
can reconstruct the same Route on callback
     - Load any potentially invalidated commands we had in ProgressLog to 
ensure they are marked invalidated locally and notify any waiters
     - Refine Invariant to accept case where a truncated dependency should 
apply first but both transactions are redundant and only the waiter could not 
be cleaned up
     - only update CFK.maxUniqueHlc using commands we execute
     - Don't update uniqueHlc when zero to avoid incompatible transaction 
invariant check
    
    patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20222
---
 modules/accord                                     |   2 +-
 src/java/org/apache/cassandra/net/Verb.java        |  13 ++-
 .../service/accord/AccordCommandStore.java         |   2 +-
 .../service/accord/AccordMessageSink.java          |   4 +-
 .../service/accord/AccordObjectSizes.java          |  10 +-
 .../cassandra/service/accord/AccordService.java    |   2 +-
 .../cassandra/service/accord/JournalKey.java       |   7 --
 .../service/accord/interop/AccordInteropApply.java |   4 -
 ...{AwaitSerializer.java => AwaitSerializers.java} |  49 +++++++--
 .../serializers/CalculateDepsSerializers.java      |  77 ---------------
 .../accord/serializers/RecoverySerializers.java    |  15 +--
 .../test/accord/AccordIncrementalRepairTest.java   |   2 +-
 .../service/accord/AccordCommandStoreTest.java     |   5 +-
 .../cassandra/service/accord/AccordTaskTest.java   |   4 +-
 .../service/accord/CommandChangeTest.java          |   1 -
 .../SimpleSimulatedAccordCommandStoreTest.java     |   3 +-
 .../SimulatedAccordCommandStoreTestBase.java       |   4 +-
 .../serializers/CommandsForKeySerializerTest.java  | 109 ++++++++++++++++++---
 .../apache/cassandra/utils/AccordGenerators.java   |   8 --
 19 files changed, 170 insertions(+), 151 deletions(-)

diff --git a/modules/accord b/modules/accord
index c7a789b1f4..f543f7dae9 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit c7a789b1f424771a4befab6bcb91edd4ab5d7198
+Subproject commit f543f7dae959e804a39e465654d702bc34db1bd1
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 1847231299..9174b7ea8b 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -94,7 +94,6 @@ import 
org.apache.cassandra.service.accord.serializers.CheckStatusSerializers;
 import org.apache.cassandra.service.accord.serializers.CommitSerializers;
 import org.apache.cassandra.service.accord.serializers.EnumSerializer;
 import org.apache.cassandra.service.accord.serializers.FetchSerializers;
-import 
org.apache.cassandra.service.accord.serializers.CalculateDepsSerializers;
 import 
org.apache.cassandra.service.accord.serializers.GetEphmrlReadDepsSerializers;
 import 
org.apache.cassandra.service.accord.serializers.InformDurableSerializers;
 import org.apache.cassandra.service.accord.serializers.LatestDepsSerializers;
@@ -103,7 +102,7 @@ import 
org.apache.cassandra.service.accord.serializers.QueryDurableBeforeSeriali
 import org.apache.cassandra.service.accord.serializers.ReadDataSerializers;
 import org.apache.cassandra.service.accord.serializers.RecoverySerializers;
 import org.apache.cassandra.service.accord.serializers.SetDurableSerializers;
-import org.apache.cassandra.service.accord.serializers.AwaitSerializer;
+import org.apache.cassandra.service.accord.serializers.AwaitSerializers;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusKeyMigrationState;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusKeyMigrationState.ConsensusKeyMigrationFinished;
 import org.apache.cassandra.service.paxos.Commit;
@@ -325,16 +324,16 @@ public enum Verb
     ACCORD_BEGIN_RECOVER_REQ        (132, P2, writeTimeout, IMMEDIATE,         
 () -> RecoverySerializers.request,          
AccordService::requestHandlerOrNoop, ACCORD_BEGIN_RECOVER_RSP                  
),
     ACCORD_BEGIN_INVALIDATE_RSP     (133, P2, writeTimeout, IMMEDIATE,         
 () -> BeginInvalidationSerializers.reply,   
AccordService::responseHandlerOrNoop                                           
),
     ACCORD_BEGIN_INVALIDATE_REQ     (134, P2, writeTimeout, IMMEDIATE,         
 () -> BeginInvalidationSerializers.request, 
AccordService::requestHandlerOrNoop, ACCORD_BEGIN_INVALIDATE_RSP               
),
-    ACCORD_AWAIT_RSP                (136, P2, writeTimeout, IMMEDIATE,         
 () -> AwaitSerializer.syncReply,            
AccordService::responseHandlerOrNoop                                           
),
-    ACCORD_AWAIT_REQ                (135, P2, writeTimeout, IMMEDIATE,         
 () -> AwaitSerializer.request,              
AccordService::requestHandlerOrNoop, ACCORD_AWAIT_RSP                          
),
-    ACCORD_AWAIT_ASYNC_RSP_REQ      (137, P2, writeTimeout, IMMEDIATE,         
 () -> AwaitSerializer.asyncReply,           
AccordService::requestHandlerOrNoop                                            
),
+    ACCORD_AWAIT_RSP                (136, P2, writeTimeout, IMMEDIATE,         
 () -> AwaitSerializers.syncReply,           
AccordService::responseHandlerOrNoop                                           
),
+    ACCORD_AWAIT_REQ                (135, P2, writeTimeout, IMMEDIATE,         
 () -> AwaitSerializers.request,             
AccordService::requestHandlerOrNoop, ACCORD_AWAIT_RSP                          
),
+    ACCORD_AWAIT_ASYNC_RSP_REQ      (137, P2, writeTimeout, IMMEDIATE,         
 () -> AwaitSerializers.asyncReply,          
AccordService::requestHandlerOrNoop                                            
),
     ACCORD_WAIT_UNTIL_APPLIED_REQ   (138, P2, writeTimeout, IMMEDIATE,         
 () -> ReadDataSerializers.waitUntilApplied, 
AccordService::requestHandlerOrNoop, ACCORD_READ_RSP                           
),
     ACCORD_STABLE_THEN_READ_REQ     (139, P2, writeTimeout, IMMEDIATE,         
 () -> ReadDataSerializers.stableThenRead,   
AccordService::requestHandlerOrNoop, ACCORD_READ_RSP                           
),
     ACCORD_INFORM_DURABLE_REQ       (140, P2, writeTimeout, IMMEDIATE,         
 () -> InformDurableSerializers.request,     
AccordService::requestHandlerOrNoop, ACCORD_SIMPLE_RSP                         
),
     ACCORD_CHECK_STATUS_RSP         (141, P2, writeTimeout, IMMEDIATE,         
 () -> CheckStatusSerializers.reply,         
AccordService::responseHandlerOrNoop                                           
),
     ACCORD_CHECK_STATUS_REQ         (142, P2, writeTimeout, IMMEDIATE,         
 () -> CheckStatusSerializers.request,       
AccordService::requestHandlerOrNoop, ACCORD_CHECK_STATUS_RSP                   
),
-    ACCORD_CALCULATE_DEPS_RSP       (143, P2, writeTimeout, IMMEDIATE,         
 () -> CalculateDepsSerializers.reply,       
AccordService::responseHandlerOrNoop                                           
),
-    ACCORD_CALCULATE_DEPS_REQ       (144, P2, longTimeout,  IMMEDIATE,         
 () -> CalculateDepsSerializers.request,     
AccordService::requestHandlerOrNoop, ACCORD_CALCULATE_DEPS_RSP),
+    ACCORD_RECOVER_AWAIT_RSP        (143, P2, writeTimeout, IMMEDIATE,         
 () -> AwaitSerializers.recoverReply,        
AccordService::responseHandlerOrNoop                                           
),
+    ACCORD_RECOVER_AWAIT_REQ        (144, P2, writeTimeout,  IMMEDIATE,        
 () -> AwaitSerializers.recoverRequest,      
AccordService::requestHandlerOrNoop, ACCORD_RECOVER_AWAIT_RSP),
     ACCORD_GET_LATEST_DEPS_RSP      (167, P2, writeTimeout, IMMEDIATE,         
 () -> LatestDepsSerializers.reply,          
AccordService::responseHandlerOrNoop                                           
),
     ACCORD_GET_LATEST_DEPS_REQ      (168, P2, writeTimeout, IMMEDIATE,         
 () -> LatestDepsSerializers.request,        
AccordService::requestHandlerOrNoop, ACCORD_GET_LATEST_DEPS_RSP),
     ACCORD_GET_EPHMRL_READ_DEPS_RSP (161, P2, writeTimeout, IMMEDIATE,         
 () -> GetEphmrlReadDepsSerializers.reply,   
AccordService::responseHandlerOrNoop                                           
),
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java 
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 8f1e08ab71..7fc135da58 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -501,7 +501,7 @@ public class AccordCommandStore extends CommandStore
             if (keys != null)
                 return PreLoadContext.contextFor(txnId, keys, keyHistory);
 
-            return PreLoadContext.contextFor(txnId);
+            return txnId;
         }
 
         @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index 897d6ac3b7..e5d897ed3b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -133,8 +133,8 @@ public class AccordMessageSink implements MessageSink
             builder.put(MessageType.ACCEPT_REQ,                               
Verb.ACCORD_ACCEPT_REQ);
             builder.put(MessageType.ACCEPT_RSP,                               
Verb.ACCORD_ACCEPT_RSP);
             builder.put(MessageType.NOT_ACCEPT_REQ,                           
Verb.ACCORD_NOT_ACCEPT_REQ);
-            builder.put(MessageType.CALCULATE_DEPS_REQ,                       
Verb.ACCORD_CALCULATE_DEPS_REQ);
-            builder.put(MessageType.CALCULATE_DEPS_RSP,                       
Verb.ACCORD_CALCULATE_DEPS_RSP);
+            builder.put(MessageType.RECOVER_AWAIT_REQ,                        
Verb.ACCORD_RECOVER_AWAIT_REQ);
+            builder.put(MessageType.RECOVER_AWAIT_RSP,                        
Verb.ACCORD_RECOVER_AWAIT_RSP);
             builder.put(MessageType.GET_LATEST_DEPS_REQ,                      
Verb.ACCORD_GET_LATEST_DEPS_REQ);
             builder.put(MessageType.GET_LATEST_DEPS_RSP,                      
Verb.ACCORD_GET_LATEST_DEPS_RSP);
             builder.put(MessageType.GET_EPHEMERAL_READ_DEPS_REQ,              
Verb.ACCORD_GET_EPHMRL_READ_DEPS_REQ);
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java 
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 346b1015c5..c218ac65d8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -312,7 +312,7 @@ public class AccordObjectSizes
 
         final static long NOT_DEFINED = 
measure(Command.NotDefined.notDefined(attrs(false, false, false)));
         final static long PREACCEPTED = 
measure(Command.PreAccepted.preaccepted(attrs(false, true, false), 
SaveStatus.PreAccepted));
-        final static long NOTACCEPTED = 
measure(Command.NotAcceptedWithoutDefinition.notAccepted(attrs(false, false, 
false), SaveStatus.NotAccepted));
+        final static long NOTACCEPTED = 
measure(Command.NotAcceptedWithoutDefinition.notAccepted(attrs(false, false, 
false), SaveStatus.AcceptedInvalidate));
         final static long ACCEPTED = 
measure(Command.Accepted.accepted(attrs(true, false, false), 
SaveStatus.AcceptedMedium));
         final static long COMMITTED = 
measure(Command.Committed.committed(attrs(true, true, false), 
SaveStatus.Committed));
         final static long EXECUTED = 
measure(Command.Executed.executed(attrs(true, true, true), SaveStatus.Applied));
@@ -330,16 +330,8 @@ public class AccordObjectSizes
                 case PreAcceptedWithDeps:
                 case PreAcceptedWithVote:
                     return PREACCEPTED;
-                case NotAccepted:
-                case PreNotAccepted:
                 case AcceptedInvalidate:
                     return NOTACCEPTED;
-                case NotAcceptedWithDefinition:
-                case NotAcceptedWithDefAndVote:
-                case NotAcceptedWithDefAndDeps:
-                case PreNotAcceptedWithDefinition:
-                case PreNotAcceptedWithDefAndVote:
-                case PreNotAcceptedWithDefAndDeps:
                 case AcceptedInvalidateWithDefinition:
                 case AcceptedMedium:
                 case AcceptedMediumWithDefinition:
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 005b856005..a84cfa8f07 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -1051,7 +1051,7 @@ public class AccordService implements IAccordService, 
Shutdownable
 
     private static AsyncChain<Void> 
populate(CommandStoreTxnBlockedGraph.Builder state, CommandStore store, TxnId 
txnId)
     {
-        AsyncChain<AsyncChain<Void>> submit = 
store.submit(PreLoadContext.contextFor(txnId), in -> {
+        AsyncChain<AsyncChain<Void>> submit = store.submit(txnId, in -> {
             AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore) 
in, txnId);
             return chain == null ? AsyncChains.success(null) : chain;
         });
diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java 
b/src/java/org/apache/cassandra/service/accord/JournalKey.java
index 8baa85be2e..b1acbd31a2 100644
--- a/src/java/org/apache/cassandra/service/accord/JournalKey.java
+++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java
@@ -30,19 +30,12 @@ import accord.utils.Invariants;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.journal.KeySupport;
-import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.BootstrapBeganAtSerializer;
-import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.CommandDiffSerializer;
-import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeSerializer;
-import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
-import 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.RedundantBeforeSerializer;
 import org.apache.cassandra.utils.ByteArrayUtil;
 
 import static org.apache.cassandra.db.TypeSizes.BYTE_SIZE;
 import static org.apache.cassandra.db.TypeSizes.INT_SIZE;
 import static org.apache.cassandra.db.TypeSizes.LONG_SIZE;
 import static 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.*;
-import static 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.RangesForEpochSerializer;
-import static 
org.apache.cassandra.service.accord.AccordJournalValueSerializers.SafeToReadSerializer;
 
 public final class JournalKey
 {
diff --git 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
index 0f7924179b..edc5fb9629 100644
--- 
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
+++ 
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
@@ -120,8 +120,6 @@ public class AccordInteropApply extends Apply implements 
LocalListeners.ComplexL
             default: throw new AssertionError();
             case NotDefined:
             case PreAccepted:
-            case PreNotAccepted:
-            case NotAccepted:
             case AcceptedInvalidate:
             case AcceptedMedium:
             case AcceptedSlow:
@@ -247,8 +245,6 @@ public class AccordInteropApply extends Apply implements 
LocalListeners.ComplexL
             default: throw new UnhandledEnum(command.status());
             case NotDefined:
             case PreAccepted:
-            case PreNotAccepted:
-            case NotAccepted:
             case AcceptedInvalidate:
             case AcceptedMedium:
             case AcceptedSlow:
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java 
b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
similarity index 67%
rename from 
src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
rename to 
src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
index 905aa1d680..4d1af98514 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
@@ -24,6 +24,8 @@ import accord.api.ProgressLog.BlockedUntil;
 import accord.messages.Await;
 import accord.messages.Await.AsyncAwaitComplete;
 import accord.messages.Await.AwaitOk;
+import accord.messages.RecoverAwait;
+import accord.messages.RecoverAwait.RecoverAwaitOk;
 import accord.primitives.Participants;
 import accord.primitives.Route;
 import accord.primitives.SaveStatus;
@@ -35,12 +37,46 @@ import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.utils.vint.VIntCoding;
 
-public class AwaitSerializer
+public class AwaitSerializers
 {
-    public static final IVersionedSerializer<Await> request = new 
IVersionedSerializer<>()
+    public static final IVersionedSerializer<Await> request = new 
RequestSerializer<>()
     {
         @Override
-        public void serialize(Await await, DataOutputPlus out, int version) 
throws IOException
+        public Await deserialize(TxnId txnId, Participants<?> scope, 
BlockedUntil blockedUntil, boolean notifyProgressLog, long minAwaitEpoch, long 
maxAwaitEpoch, int callbackId, DataInputPlus in, int version)
+        {
+            return Await.SerializerSupport.create(txnId, scope, blockedUntil, 
notifyProgressLog, minAwaitEpoch, maxAwaitEpoch, callbackId);
+        }
+    };
+
+    public static final IVersionedSerializer<RecoverAwait> recoverRequest = 
new RequestSerializer<>()
+    {
+        @Override
+        public RecoverAwait deserialize(TxnId txnId, Participants<?> scope, 
BlockedUntil blockedUntil, boolean notifyProgressLog, long minAwaitEpoch, long 
maxAwaitEpoch, int callbackId, DataInputPlus in, int version) throws IOException
+        {
+            TxnId recoverId = CommandSerializers.txnId.deserialize(in, 
version);
+            return RecoverAwait.SerializerSupport.create(txnId, scope, 
blockedUntil, notifyProgressLog, minAwaitEpoch, maxAwaitEpoch, callbackId, 
recoverId);
+        }
+
+        @Override
+        public void serialize(RecoverAwait await, DataOutputPlus out, int 
version) throws IOException
+        {
+            super.serialize(await, out, version);
+            CommandSerializers.txnId.serialize(await.recoverId, out, version);
+        }
+
+        @Override
+        public long serializedSize(RecoverAwait await, int version)
+        {
+            return super.serializedSize(await, version) + 
CommandSerializers.txnId.serializedSize(await.recoverId, version);
+        }
+    };
+
+    static abstract class RequestSerializer<A extends Await> implements 
IVersionedSerializer<A>
+    {
+        abstract A deserialize(TxnId txnId, Participants<?> scope, 
BlockedUntil blockedUntil, boolean notifyProgressLog, long minAwaitEpoch, long 
maxAwaitEpoch, int callbackId, DataInputPlus in, int version) throws 
IOException;
+
+        @Override
+        public void serialize(A await, DataOutputPlus out, int version) throws 
IOException
         {
             CommandSerializers.txnId.serialize(await.txnId, out, version);
             KeySerializers.participants.serialize(await.scope, out, version);
@@ -52,7 +88,7 @@ public class AwaitSerializer
         }
 
         @Override
-        public Await deserialize(DataInputPlus in, int version) throws 
IOException
+        public A deserialize(DataInputPlus in, int version) throws IOException
         {
             TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
             Participants<?> scope = 
KeySerializers.participants.deserialize(in, version);
@@ -63,11 +99,11 @@ public class AwaitSerializer
             long minAwaitEpoch = maxAwaitEpoch - in.readUnsignedVInt();
             int callbackId = in.readUnsignedVInt32() - 1;
             Invariants.require(callbackId >= -1);
-            return Await.SerializerSupport.create(txnId, scope, blockedUntil, 
notifyProgressLog, minAwaitEpoch, maxAwaitEpoch, callbackId);
+            return deserialize(txnId, scope, blockedUntil, notifyProgressLog, 
minAwaitEpoch, maxAwaitEpoch, callbackId, in, version);
         }
 
         @Override
-        public long serializedSize(Await await, int version)
+        public long serializedSize(A await, int version)
         {
             return CommandSerializers.txnId.serializedSize(await.txnId, 
version)
                    + KeySerializers.participants.serializedSize(await.scope, 
version)
@@ -79,6 +115,7 @@ public class AwaitSerializer
     };
 
     public static final IVersionedSerializer<AwaitOk> syncReply = new 
EnumSerializer<>(AwaitOk.class);
+    public static final IVersionedSerializer<RecoverAwaitOk> recoverReply = 
new EnumSerializer<>(RecoverAwaitOk.class);
 
     public static final IVersionedSerializer<AsyncAwaitComplete> asyncReply = 
new IVersionedSerializer<>()
     {
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/CalculateDepsSerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/CalculateDepsSerializers.java
deleted file mode 100644
index ab9f596374..0000000000
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/CalculateDepsSerializers.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord.serializers;
-
-import java.io.IOException;
-
-import accord.messages.CalculateDeps;
-import accord.messages.CalculateDeps.CalculateDepsOk;
-import accord.primitives.Route;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import 
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer;
-
-public class CalculateDepsSerializers
-{
-    public static final IVersionedSerializer<CalculateDeps> request = new 
TxnRequestSerializer.WithUnsyncedSerializer<>()
-    {
-        @Override
-        public void serializeBody(CalculateDeps msg, DataOutputPlus out, int 
version) throws IOException
-        {
-            ExecuteAtSerializer.serialize(msg.executeAt, out);
-        }
-
-        @Override
-        public CalculateDeps deserializeBody(DataInputPlus in, int version, 
TxnId txnId, Route<?> scope, long waitForEpoch, long minEpoch) throws 
IOException
-        {
-            Timestamp executeAt = ExecuteAtSerializer.deserialize(in);
-            return CalculateDeps.SerializationSupport.create(txnId, scope, 
waitForEpoch, minEpoch, executeAt);
-        }
-
-        @Override
-        public long serializedBodySize(CalculateDeps msg, int version)
-        {
-            return ExecuteAtSerializer.serializedSize(msg.executeAt);
-        }
-    };
-
-    public static final IVersionedSerializer<CalculateDepsOk> reply = new 
IVersionedSerializer<>()
-    {
-        @Override
-        public void serialize(CalculateDepsOk reply, DataOutputPlus out, int 
version) throws IOException
-        {
-            DepsSerializers.deps.serialize(reply.deps, out, version);
-        }
-
-        @Override
-        public CalculateDepsOk deserialize(DataInputPlus in, int version) 
throws IOException
-        {
-            return new CalculateDepsOk(DepsSerializers.deps.deserialize(in, 
version));
-        }
-
-        @Override
-        public long serializedSize(CalculateDepsOk reply, int version)
-        {
-            return DepsSerializers.deps.serializedSize(reply.deps, version);
-        }
-    };
-}
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index bc1ec7fcbd..ee0000f1a4 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -34,6 +34,7 @@ import accord.primitives.FullRoute;
 import accord.primitives.Known.KnownDeps;
 import accord.primitives.LatestDeps;
 import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
 import accord.primitives.Route;
 import accord.primitives.Status;
 import accord.primitives.Timestamp;
@@ -101,9 +102,9 @@ public class RecoverySerializers
             latestDeps.serialize(recoverOk.deps, out, version);
             DepsSerializers.deps.serialize(recoverOk.earlierWait, out, 
version);
             DepsSerializers.deps.serialize(recoverOk.earlierNoWait, out, 
version);
-            DepsSerializers.deps.serialize(recoverOk.laterWait, out, version);
-            DepsSerializers.deps.serialize(recoverOk.laterNoWait, out, 
version);
+            DepsSerializers.deps.serialize(recoverOk.laterCoordRejects, out, 
version);
             out.writeBoolean(recoverOk.selfAcceptsFastPath);
+            
KeySerializers.nullableParticipants.serialize(recoverOk.coordinatorAcceptsFastPath,
 out, version);
             out.writeBoolean(recoverOk.supersedingRejects);
             CommandSerializers.nullableWrites.serialize(recoverOk.writes, out, 
version);
         }
@@ -121,9 +122,9 @@ public class RecoverySerializers
             return new RecoverNack(kind, supersededBy);
         }
 
-        RecoverOk deserializeOk(TxnId txnId, Status status, Ballot accepted, 
Timestamp executeAt, @Nonnull LatestDeps deps, Deps earlierWait, Deps 
earlierNoWait, Deps laterWait, Deps laterNoWait, boolean acceptsFastPath, 
boolean rejectsFastPath, Writes writes, Result result, DataInputPlus in, int 
version)
+        RecoverOk deserializeOk(TxnId txnId, Status status, Ballot accepted, 
Timestamp executeAt, @Nonnull LatestDeps deps, Deps earlierWait, Deps 
earlierNoWait, Deps laterCoordRejects, boolean acceptsFastPath, @Nullable 
Participants<?> coordinatorAcceptsFastPath, boolean rejectsFastPath, Writes 
writes, Result result, DataInputPlus in, int version)
         {
-            return new RecoverOk(txnId, status, accepted, executeAt, deps, 
earlierWait, earlierNoWait, laterWait, laterNoWait, acceptsFastPath, 
rejectsFastPath, writes, result);
+            return new RecoverOk(txnId, status, accepted, executeAt, deps, 
earlierWait, earlierNoWait, laterCoordRejects, acceptsFastPath, 
coordinatorAcceptsFastPath, rejectsFastPath, writes, result);
         }
 
         @Override
@@ -148,8 +149,8 @@ public class RecoverySerializers
                                  DepsSerializers.deps.deserialize(in, version),
                                  DepsSerializers.deps.deserialize(in, version),
                                  DepsSerializers.deps.deserialize(in, version),
-                                 DepsSerializers.deps.deserialize(in, version),
                                  in.readBoolean(),
+                                 
KeySerializers.nullableParticipants.deserialize(in, version),
                                  in.readBoolean(),
                                  
CommandSerializers.nullableWrites.deserialize(in, version),
                                  result,
@@ -171,9 +172,9 @@ public class RecoverySerializers
             size += latestDeps.serializedSize(recoverOk.deps, version);
             size += DepsSerializers.deps.serializedSize(recoverOk.earlierWait, 
version);
             size += 
DepsSerializers.deps.serializedSize(recoverOk.earlierNoWait, version);
-            size += DepsSerializers.deps.serializedSize(recoverOk.laterWait, 
version);
-            size += DepsSerializers.deps.serializedSize(recoverOk.laterNoWait, 
version);
+            size += 
DepsSerializers.deps.serializedSize(recoverOk.laterCoordRejects, version);
             size += TypeSizes.sizeof(recoverOk.selfAcceptsFastPath);
+            size += 
KeySerializers.nullableParticipants.serializedSize(recoverOk.coordinatorAcceptsFastPath,
 version);
             size += TypeSizes.sizeof(recoverOk.supersedingRejects);
             size += 
CommandSerializers.nullableWrites.serializedSize(recoverOk.writes, version);
             return size;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
index 72db50303a..231610b198 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
@@ -243,7 +243,7 @@ public class AccordIncrementalRepairTest extends 
AccordTestBase
             long now = Clock.Global.currentTimeMillis();
             if (now - start > TimeUnit.MINUTES.toMillis(1))
                 throw new AssertionError("Timeout");
-            
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId),
 key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+            
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(txnId, 
key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
                 SafeCommand command = safeStore.get(txnId, 
StoreParticipants.empty(txnId));
                 Assert.assertNotNull(command.current());
                 if (command.current().status().hasBeen(Status.Applied))
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index 547ee77568..6cd168ac43 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.schema.TableMetadata;
 import org.apache.cassandra.service.StorageService;
 import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
 import org.apache.cassandra.service.accord.api.PartitionKey;
+import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializerTest.TestSafeCommandStore;
 import org.apache.cassandra.service.accord.serializers.ResultSerializers;
 import org.apache.cassandra.service.consensus.TransactionalMode;
 import org.apache.cassandra.utils.Pair;
@@ -162,8 +163,8 @@ public class AccordCommandStoreTest
         AccordSafeCommandsForKey cfk = new 
AccordSafeCommandsForKey(loaded(key, null));
         cfk.initialize();
 
-        cfk.set(cfk.current().update(command1).cfk());
-        cfk.set(cfk.current().update(command2).cfk());
+        cfk.set(cfk.current().update(new 
TestSafeCommandStore(command1.txnId()), command1).cfk());
+        cfk.set(cfk.current().update(new 
TestSafeCommandStore(command1.txnId()), command2).cfk());
 
         AccordKeyspace.getCommandsForKeyUpdater(commandStore.id(), 
(TokenKey)cfk.key(), cfk.current(), null, 
commandStore.nextSystemTimestampMicros()).run();
         logger.info("E: {}", cfk);
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
index 6317008c86..1ef9b0227e 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
@@ -126,7 +126,7 @@ public class AccordTaskTest
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
         TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
 
-        getUninterruptibly(commandStore.execute(contextFor(txnId), instance -> 
{
+        getUninterruptibly(commandStore.execute(txnId, instance -> {
             // TODO review: This change to `ifInitialized` was done in a lot 
of places and it doesn't preserve this property
             // I fixed this reference to point to `ifLoadedAndInitialised` and 
but didn't update other places
             Assert.assertNull(instance.ifInitialised(txnId));
@@ -140,7 +140,7 @@ public class AccordTaskTest
         AccordCommandStore commandStore = 
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
         TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
 
-        getUninterruptibly(commandStore.execute(contextFor(txnId), safe -> {
+        getUninterruptibly(commandStore.execute(txnId, safe -> {
             StoreParticipants participants = StoreParticipants.empty(txnId);
             SafeCommand command = safe.get(txnId, participants);
             Assert.assertNotNull(command);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java 
b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
index 40d75816d3..b24a60a670 100644
--- a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
@@ -47,7 +47,6 @@ import org.assertj.core.api.SoftAssertions;
 
 import static accord.api.Journal.*;
 import static accord.impl.CommandChange.*;
-import static accord.impl.CommandChange.getFlags;
 import static accord.utils.Property.qt;
 import static 
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimpleSimulatedAccordCommandStoreTest.java
 
b/test/unit/org/apache/cassandra/service/accord/SimpleSimulatedAccordCommandStoreTest.java
index dd8678c1ef..5afc99d906 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimpleSimulatedAccordCommandStoreTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimpleSimulatedAccordCommandStoreTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord;
 
 import org.junit.Test;
 
-import accord.local.PreLoadContext;
 import accord.local.StoreParticipants;
 import accord.primitives.SaveStatus;
 import accord.primitives.TxnId;
@@ -41,7 +40,7 @@ public class SimpleSimulatedAccordCommandStoreTest extends 
SimulatedAccordComman
                 for (int i = 0, examples = 100; i < examples; i++)
                 {
                     TxnId id = AccordGens.txnIds().next(rs);
-                    instance.process(PreLoadContext.contextFor(id), (safe) -> {
+                    instance.process(id, (safe) -> {
                         var safeCommand = safe.get(id, 
StoreParticipants.empty(id));
                         var command = safeCommand.current();
                         
Assertions.assertThat(command.saveStatus()).isEqualTo(SaveStatus.Uninitialised);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
index ad8a995794..6ec60a3778 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
@@ -321,7 +321,7 @@ public abstract class SimulatedAccordCommandStoreTestBase 
extends CQLTester
                 {
                     var range = deps.rangeDeps.range(i);
                     Assertions.assertThat(rangeConflicts).describedAs("Txn %s 
had an unexpected range", txnId).containsKey(range);
-                    var conflict = deps.rangeDeps.txnIdsForRangeIndex(i);
+                    var conflict = 
deps.rangeDeps.txnIdsWithFlagsForRangeIndex(i);
                     List<TxnId> expectedConflict = rangeConflicts.get(range);
                     Assertions.assertThat(conflict).describedAs("Txn %s 
Expected range %s to have different conflicting txns", txnId, 
range).isEqualTo(expectedConflict);
                 }
@@ -344,7 +344,7 @@ public abstract class SimulatedAccordCommandStoreTestBase 
extends CQLTester
         {
             Assertions.assertThat(deps.keyDeps.keys()).describedAs("Txn %s 
Keys", txnId).isEqualTo(RoutingKeys.of(keyConflicts.keySet()));
             for (var key : keyConflicts.keySet())
-                
Assertions.assertThat(deps.keyDeps.txnIds(key)).describedAs("Txn %s for key 
%s", txnId, key).isEqualTo(keyConflicts.get(key));
+                
Assertions.assertThat(deps.keyDeps.txnIdsWithFlags(key)).describedAs("Txn %s 
for key %s", txnId, key).isEqualTo(keyConflicts.get(key));
         }
     }
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index a6a3531d8a..ddd5146462 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -28,7 +28,10 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
 import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.IntSupplier;
 import java.util.function.LongUnaryOperator;
@@ -42,36 +45,57 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Journal;
 import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.api.Result;
 import accord.api.RoutingKey;
+import accord.api.Timeouts;
+import accord.impl.AbstractSafeCommandStore;
+import accord.impl.DefaultLocalListeners;
+import accord.impl.DefaultRemoteListeners;
 import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.DurableBefore;
 import accord.local.ICommand;
 import accord.local.Node;
+import accord.local.NodeCommandStoreService;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
 import accord.local.StoreParticipants;
 import accord.local.cfk.CommandsForKey;
 import accord.local.cfk.CommandsForKey.InternalStatus;
 import accord.local.cfk.CommandsForKey.TxnInfo;
 import accord.local.cfk.CommandsForKey.Unmanaged;
+import accord.local.cfk.SafeCommandsForKey;
 import accord.local.cfk.Serialize;
+import accord.messages.ReplyContext;
 import accord.primitives.Ballot;
 import accord.primitives.KeyDeps;
 import accord.primitives.Known;
 import accord.primitives.PartialDeps;
 import accord.primitives.PartialTxn;
 import accord.primitives.RangeDeps;
+import accord.primitives.Ranges;
 import accord.primitives.Routable;
 import accord.primitives.SaveStatus;
 import accord.primitives.Status;
 import accord.primitives.Timestamp;
 import accord.primitives.Txn;
 import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
 import accord.primitives.Writes;
+import accord.topology.TopologyManager;
 import accord.utils.AccordGens;
 import accord.utils.Gen;
 import accord.utils.Gens;
 import accord.utils.RandomSource;
 import accord.utils.SortedArrays;
 import accord.utils.UnhandledEnum;
+import accord.utils.async.AsyncChain;
 import org.agrona.collections.Int2ObjectHashMap;
 import org.apache.cassandra.SchemaLoader;
 import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -191,8 +215,6 @@ public class CommandsForKeySerializerTest
                 case PreAcceptedWithDeps:
                     return Command.PreAccepted.preaccepted(builder(), 
saveStatus);
                 case AcceptedInvalidate:
-                case PreNotAccepted:
-                case NotAccepted:
                     return 
Command.NotAcceptedWithoutDefinition.notAccepted(builder(), saveStatus);
                 case AcceptedMedium:
                 case AcceptedMediumWithDefinition:
@@ -201,12 +223,6 @@ public class CommandsForKeySerializerTest
                 case AcceptedSlowWithDefinition:
                 case AcceptedSlowWithDefAndVote:
                 case AcceptedInvalidateWithDefinition:
-                case PreNotAcceptedWithDefinition:
-                case PreNotAcceptedWithDefAndVote:
-                case PreNotAcceptedWithDefAndDeps:
-                case NotAcceptedWithDefinition:
-                case NotAcceptedWithDefAndVote:
-                case NotAcceptedWithDefAndDeps:
                 case PreCommittedWithDefinition:
                 case PreCommittedWithDefAndDeps:
                 case PreCommittedWithDefAndFixedDeps:
@@ -289,8 +305,6 @@ public class CommandsForKeySerializerTest
                 case PreApplied:
                 case Applied:
                     isDurable = source.nextBoolean();
-                case PreNotAccepted:
-                case NotAccepted:
                 case AcceptedInvalidate:
                 case AcceptedMedium:
                 case AcceptedSlow:
@@ -503,7 +517,7 @@ public class CommandsForKeySerializerTest
             {
                 int next = source.nextInt(commands.size());
                 Command command = commands.get(next);
-                cfk = cfk.update(command).cfk();
+                cfk = cfk.update(new TestSafeCommandStore(command.txnId()), 
command).cfk();
                 commands.set(next, commands.get(commands.size() - 1));
                 commands.remove(commands.size() - 1);
             }
@@ -615,4 +629,77 @@ public class CommandsForKeySerializerTest
         CommandsForKey roundTrip = Serialize.fromBytes(pk, buffer);
         Assert.assertEquals(expected, roundTrip);
     }
+
+    static class TestCommandStore extends CommandStore implements Agent
+    {
+        static final TestCommandStore INSTANCE = new TestCommandStore();
+        protected TestCommandStore()
+        {
+            super(0,
+                  null,
+                  null,
+                  null,
+                  ignore -> new ProgressLog.NoOpProgressLog(),
+                  ignore -> new DefaultLocalListeners(new 
DefaultRemoteListeners((a, b, c, d, e)->{}), 
DefaultLocalListeners.DefaultNotifySink.INSTANCE),
+                  new EpochUpdateHolder());
+        }
+
+        @Override public boolean inStore() { return true; }
+        @Override public Journal.Loader loader() { throw new 
UnsupportedOperationException(); }
+        @Override public Agent agent() { return this; }
+        @Override public AsyncChain<Void> execute(PreLoadContext context, 
Consumer<? super SafeCommandStore> consumer) { return null; }
+        @Override public <T> AsyncChain<T> submit(PreLoadContext context, 
Function<? super SafeCommandStore, T> apply) { throw new 
UnsupportedOperationException(); }
+        @Override public void shutdown() { }
+        @Override protected void registerTransitive(SafeCommandStore 
safeStore, RangeDeps deps){ }
+        @Override public <T> AsyncChain<T> submit(Callable<T> task) { throw 
new UnsupportedOperationException(); }
+        @Override public void onRecover(Node node, Result success, Throwable 
fail) { throw new UnsupportedOperationException(); }
+        @Override public void onInconsistentTimestamp(Command command, 
Timestamp prev, Timestamp next) { throw new UnsupportedOperationException(); }
+        @Override public void onFailedBootstrap(String phase, Ranges ranges, 
Runnable retry, Throwable failure) { throw new UnsupportedOperationException(); 
}
+        @Override public void onStale(Timestamp staleSince, Ranges ranges) { 
throw new UnsupportedOperationException(); }
+        @Override public void onUncaughtException(Throwable t) { throw new 
UnsupportedOperationException(); }
+        @Override public void onCaughtException(Throwable t, String context) { 
throw new UnsupportedOperationException(); }
+        @Override public long preAcceptTimeout() { throw new 
UnsupportedOperationException(); }
+        @Override public long cfkHlcPruneDelta() { return 0; }
+        @Override public int cfkPruneInterval() { return 0; }
+        @Override public long maxConflictsHlcPruneDelta() { return 0; }
+        @Override public long maxConflictsPruneInterval() { return 0; }
+        @Override public Txn emptySystemTxn(Txn.Kind kind, Routable.Domain 
domain) { throw new UnsupportedOperationException(); }
+        @Override public long attemptCoordinationDelay(Node node, 
SafeCommandStore safeStore, TxnId txnId, TimeUnit units, int retryCount) { 
return 0; }
+        @Override public long seekProgressDelay(Node node, SafeCommandStore 
safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil blockedUntil, 
TimeUnit units) { return 0; }
+        @Override public long retryAwaitTimeout(Node node, SafeCommandStore 
safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil retrying, 
TimeUnit units) { return 0; }
+        @Override public long localExpiresAt(TxnId txnId, Status.Phase phase, 
TimeUnit unit) { return 0; }
+        @Override public long expiresAt(ReplyContext replyContext, TimeUnit 
unit) { return 0; }
+    }
+
+    public static class TestSafeCommandStore extends AbstractSafeCommandStore
+    {
+        public TestSafeCommandStore(PreLoadContext context)
+        {
+            super(context, TestCommandStore.INSTANCE);
+        }
+
+        @Override protected CommandStoreCaches tryGetCaches() { return null; }
+        @Override protected SafeCommand add(SafeCommand safeCommand, 
CommandStoreCaches caches) { return null; }
+        @Override protected SafeCommandsForKey add(SafeCommandsForKey safeCfk, 
CommandStoreCaches caches) { return null; }
+        @Override protected SafeCommand getInternal(TxnId txnId) { return 
null; }
+        @Override protected SafeCommandsForKey getInternal(RoutingKey key) { 
return null; }
+        @Override public DataStore dataStore() { return null; }
+        @Override public Agent agent() { return null; }
+        @Override public ProgressLog progressLog() { return null; }
+        @Override public NodeCommandStoreService node() { return new 
NodeCommandStoreService()
+        {
+            @Override public long epoch() { return 0;}
+            @Override public Node.Id id() { return Node.Id.NONE; }
+            @Override public Timeouts timeouts() { return null; }
+            @Override public DurableBefore durableBefore() { return null;}
+            @Override public Timestamp uniqueNow() { return null; }
+            @Override public Timestamp uniqueNow(Timestamp atLeast) { return 
null; }
+            @Override public TopologyManager topology() { return null; }
+            @Override public long now() { return 0; }
+            @Override public long elapsed(TimeUnit unit) { return 0; }
+        }; }
+        @Override public boolean visit(Unseekables<?> keysOrRanges, TxnId 
testTxnId, Txn.Kind.Kinds testKind, TestStartedAt testStartedAt, Timestamp 
testStartAtTimestamp, ComputeIsDep computeIsDep, AllCommandVisitor visit) { 
return false; }
+        @Override public <P1, P2> void visit(Unseekables<?> keysOrRanges, 
Timestamp startedBefore, Txn.Kind.Kinds testKind, ActiveCommandVisitor<P1, P2> 
visit, P1 p1, P2 p2) { }
+    }
+
 }
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java 
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index fef3f9a83f..05c731f7db 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -254,14 +254,6 @@ public class AccordGenerators
                 case PreAcceptedWithVote:
                 case PreAcceptedWithDeps:
                     return 
Command.PreAccepted.preaccepted(attributes(saveStatus), saveStatus);
-                case PreNotAccepted:
-                case PreNotAcceptedWithDefinition:
-                case PreNotAcceptedWithDefAndDeps:
-                case PreNotAcceptedWithDefAndVote:
-                case NotAccepted:
-                case NotAcceptedWithDefinition:
-                case NotAcceptedWithDefAndDeps:
-                case NotAcceptedWithDefAndVote:
                 case AcceptedInvalidate:
                     return 
Command.NotAcceptedWithoutDefinition.acceptedInvalidate(attributes(saveStatus));
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to