This is an automated email from the ASF dual-hosted git repository.
benedict pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new e855a1c435 Follow-up to CASSANDRA-20222: - Implement missing parts of
protocol optimisations, refine some particulars and remove
MEDIUM_PATH_WAIT_ON_RECOVERY Also fix: - Deps.txnIds -> Deps.txnIdsWithFlags
to make clear unsafety, and validate Command isn't created with flags - Save a
lossy low/high epoch we're waiting on in WaitingState so that we can
reconstruct the same Route on callback - Load any potentially invalidated
commands we had in ProgressLog to ensure they are ma [...]
e855a1c435 is described below
commit e855a1c435c3e2a73b47cc9b4dc1b8717e4761de
Author: Benedict Elliott Smith <[email protected]>
AuthorDate: Thu Jan 30 11:19:33 2025 +0000
Follow-up to CASSANDRA-20222:
- Implement missing parts of protocol optimisations, refine some
particulars and remove MEDIUM_PATH_WAIT_ON_RECOVERY
Also fix:
- Deps.txnIds -> Deps.txnIdsWithFlags to make clear unsafety, and validate
Command isn't created with flags
- Save a lossy low/high epoch we're waiting on in WaitingState so that we
can reconstruct the same Route on callback
- Load any potentially invalidated commands we had in ProgressLog to
ensure they are marked invalidated locally and notify any waiters
- Refine Invariant to accept case where a truncated dependency should
apply first but both transactions are redundant and only the waiter could not
be cleaned up
- only update CFK.maxUniqueHlc using commands we execute
- Don't update uniqueHlc when zero to avoid incompatible transaction
invariant check
patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20222
---
modules/accord | 2 +-
src/java/org/apache/cassandra/net/Verb.java | 13 ++-
.../service/accord/AccordCommandStore.java | 2 +-
.../service/accord/AccordMessageSink.java | 4 +-
.../service/accord/AccordObjectSizes.java | 10 +-
.../cassandra/service/accord/AccordService.java | 2 +-
.../cassandra/service/accord/JournalKey.java | 7 --
.../service/accord/interop/AccordInteropApply.java | 4 -
...{AwaitSerializer.java => AwaitSerializers.java} | 49 +++++++--
.../serializers/CalculateDepsSerializers.java | 77 ---------------
.../accord/serializers/RecoverySerializers.java | 15 +--
.../test/accord/AccordIncrementalRepairTest.java | 2 +-
.../service/accord/AccordCommandStoreTest.java | 5 +-
.../cassandra/service/accord/AccordTaskTest.java | 4 +-
.../service/accord/CommandChangeTest.java | 1 -
.../SimpleSimulatedAccordCommandStoreTest.java | 3 +-
.../SimulatedAccordCommandStoreTestBase.java | 4 +-
.../serializers/CommandsForKeySerializerTest.java | 109 ++++++++++++++++++---
.../apache/cassandra/utils/AccordGenerators.java | 8 --
19 files changed, 170 insertions(+), 151 deletions(-)
diff --git a/modules/accord b/modules/accord
index c7a789b1f4..f543f7dae9 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit c7a789b1f424771a4befab6bcb91edd4ab5d7198
+Subproject commit f543f7dae959e804a39e465654d702bc34db1bd1
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index 1847231299..9174b7ea8b 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -94,7 +94,6 @@ import
org.apache.cassandra.service.accord.serializers.CheckStatusSerializers;
import org.apache.cassandra.service.accord.serializers.CommitSerializers;
import org.apache.cassandra.service.accord.serializers.EnumSerializer;
import org.apache.cassandra.service.accord.serializers.FetchSerializers;
-import
org.apache.cassandra.service.accord.serializers.CalculateDepsSerializers;
import
org.apache.cassandra.service.accord.serializers.GetEphmrlReadDepsSerializers;
import
org.apache.cassandra.service.accord.serializers.InformDurableSerializers;
import org.apache.cassandra.service.accord.serializers.LatestDepsSerializers;
@@ -103,7 +102,7 @@ import
org.apache.cassandra.service.accord.serializers.QueryDurableBeforeSeriali
import org.apache.cassandra.service.accord.serializers.ReadDataSerializers;
import org.apache.cassandra.service.accord.serializers.RecoverySerializers;
import org.apache.cassandra.service.accord.serializers.SetDurableSerializers;
-import org.apache.cassandra.service.accord.serializers.AwaitSerializer;
+import org.apache.cassandra.service.accord.serializers.AwaitSerializers;
import
org.apache.cassandra.service.consensus.migration.ConsensusKeyMigrationState;
import
org.apache.cassandra.service.consensus.migration.ConsensusKeyMigrationState.ConsensusKeyMigrationFinished;
import org.apache.cassandra.service.paxos.Commit;
@@ -325,16 +324,16 @@ public enum Verb
ACCORD_BEGIN_RECOVER_REQ (132, P2, writeTimeout, IMMEDIATE,
() -> RecoverySerializers.request,
AccordService::requestHandlerOrNoop, ACCORD_BEGIN_RECOVER_RSP
),
ACCORD_BEGIN_INVALIDATE_RSP (133, P2, writeTimeout, IMMEDIATE,
() -> BeginInvalidationSerializers.reply,
AccordService::responseHandlerOrNoop
),
ACCORD_BEGIN_INVALIDATE_REQ (134, P2, writeTimeout, IMMEDIATE,
() -> BeginInvalidationSerializers.request,
AccordService::requestHandlerOrNoop, ACCORD_BEGIN_INVALIDATE_RSP
),
- ACCORD_AWAIT_RSP (136, P2, writeTimeout, IMMEDIATE,
() -> AwaitSerializer.syncReply,
AccordService::responseHandlerOrNoop
),
- ACCORD_AWAIT_REQ (135, P2, writeTimeout, IMMEDIATE,
() -> AwaitSerializer.request,
AccordService::requestHandlerOrNoop, ACCORD_AWAIT_RSP
),
- ACCORD_AWAIT_ASYNC_RSP_REQ (137, P2, writeTimeout, IMMEDIATE,
() -> AwaitSerializer.asyncReply,
AccordService::requestHandlerOrNoop
),
+ ACCORD_AWAIT_RSP (136, P2, writeTimeout, IMMEDIATE,
() -> AwaitSerializers.syncReply,
AccordService::responseHandlerOrNoop
),
+ ACCORD_AWAIT_REQ (135, P2, writeTimeout, IMMEDIATE,
() -> AwaitSerializers.request,
AccordService::requestHandlerOrNoop, ACCORD_AWAIT_RSP
),
+ ACCORD_AWAIT_ASYNC_RSP_REQ (137, P2, writeTimeout, IMMEDIATE,
() -> AwaitSerializers.asyncReply,
AccordService::requestHandlerOrNoop
),
ACCORD_WAIT_UNTIL_APPLIED_REQ (138, P2, writeTimeout, IMMEDIATE,
() -> ReadDataSerializers.waitUntilApplied,
AccordService::requestHandlerOrNoop, ACCORD_READ_RSP
),
ACCORD_STABLE_THEN_READ_REQ (139, P2, writeTimeout, IMMEDIATE,
() -> ReadDataSerializers.stableThenRead,
AccordService::requestHandlerOrNoop, ACCORD_READ_RSP
),
ACCORD_INFORM_DURABLE_REQ (140, P2, writeTimeout, IMMEDIATE,
() -> InformDurableSerializers.request,
AccordService::requestHandlerOrNoop, ACCORD_SIMPLE_RSP
),
ACCORD_CHECK_STATUS_RSP (141, P2, writeTimeout, IMMEDIATE,
() -> CheckStatusSerializers.reply,
AccordService::responseHandlerOrNoop
),
ACCORD_CHECK_STATUS_REQ (142, P2, writeTimeout, IMMEDIATE,
() -> CheckStatusSerializers.request,
AccordService::requestHandlerOrNoop, ACCORD_CHECK_STATUS_RSP
),
- ACCORD_CALCULATE_DEPS_RSP (143, P2, writeTimeout, IMMEDIATE,
() -> CalculateDepsSerializers.reply,
AccordService::responseHandlerOrNoop
),
- ACCORD_CALCULATE_DEPS_REQ (144, P2, longTimeout, IMMEDIATE,
() -> CalculateDepsSerializers.request,
AccordService::requestHandlerOrNoop, ACCORD_CALCULATE_DEPS_RSP),
+ ACCORD_RECOVER_AWAIT_RSP (143, P2, writeTimeout, IMMEDIATE,
() -> AwaitSerializers.recoverReply,
AccordService::responseHandlerOrNoop
),
+ ACCORD_RECOVER_AWAIT_REQ (144, P2, writeTimeout, IMMEDIATE,
() -> AwaitSerializers.recoverRequest,
AccordService::requestHandlerOrNoop, ACCORD_RECOVER_AWAIT_RSP),
ACCORD_GET_LATEST_DEPS_RSP (167, P2, writeTimeout, IMMEDIATE,
() -> LatestDepsSerializers.reply,
AccordService::responseHandlerOrNoop
),
ACCORD_GET_LATEST_DEPS_REQ (168, P2, writeTimeout, IMMEDIATE,
() -> LatestDepsSerializers.request,
AccordService::requestHandlerOrNoop, ACCORD_GET_LATEST_DEPS_RSP),
ACCORD_GET_EPHMRL_READ_DEPS_RSP (161, P2, writeTimeout, IMMEDIATE,
() -> GetEphmrlReadDepsSerializers.reply,
AccordService::responseHandlerOrNoop
),
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
index 8f1e08ab71..7fc135da58 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java
@@ -501,7 +501,7 @@ public class AccordCommandStore extends CommandStore
if (keys != null)
return PreLoadContext.contextFor(txnId, keys, keyHistory);
- return PreLoadContext.contextFor(txnId);
+ return txnId;
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index 897d6ac3b7..e5d897ed3b 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -133,8 +133,8 @@ public class AccordMessageSink implements MessageSink
builder.put(MessageType.ACCEPT_REQ,
Verb.ACCORD_ACCEPT_REQ);
builder.put(MessageType.ACCEPT_RSP,
Verb.ACCORD_ACCEPT_RSP);
builder.put(MessageType.NOT_ACCEPT_REQ,
Verb.ACCORD_NOT_ACCEPT_REQ);
- builder.put(MessageType.CALCULATE_DEPS_REQ,
Verb.ACCORD_CALCULATE_DEPS_REQ);
- builder.put(MessageType.CALCULATE_DEPS_RSP,
Verb.ACCORD_CALCULATE_DEPS_RSP);
+ builder.put(MessageType.RECOVER_AWAIT_REQ,
Verb.ACCORD_RECOVER_AWAIT_REQ);
+ builder.put(MessageType.RECOVER_AWAIT_RSP,
Verb.ACCORD_RECOVER_AWAIT_RSP);
builder.put(MessageType.GET_LATEST_DEPS_REQ,
Verb.ACCORD_GET_LATEST_DEPS_REQ);
builder.put(MessageType.GET_LATEST_DEPS_RSP,
Verb.ACCORD_GET_LATEST_DEPS_RSP);
builder.put(MessageType.GET_EPHEMERAL_READ_DEPS_REQ,
Verb.ACCORD_GET_EPHMRL_READ_DEPS_REQ);
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
index 346b1015c5..c218ac65d8 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java
@@ -312,7 +312,7 @@ public class AccordObjectSizes
final static long NOT_DEFINED =
measure(Command.NotDefined.notDefined(attrs(false, false, false)));
final static long PREACCEPTED =
measure(Command.PreAccepted.preaccepted(attrs(false, true, false),
SaveStatus.PreAccepted));
- final static long NOTACCEPTED =
measure(Command.NotAcceptedWithoutDefinition.notAccepted(attrs(false, false,
false), SaveStatus.NotAccepted));
+ final static long NOTACCEPTED =
measure(Command.NotAcceptedWithoutDefinition.notAccepted(attrs(false, false,
false), SaveStatus.AcceptedInvalidate));
final static long ACCEPTED =
measure(Command.Accepted.accepted(attrs(true, false, false),
SaveStatus.AcceptedMedium));
final static long COMMITTED =
measure(Command.Committed.committed(attrs(true, true, false),
SaveStatus.Committed));
final static long EXECUTED =
measure(Command.Executed.executed(attrs(true, true, true), SaveStatus.Applied));
@@ -330,16 +330,8 @@ public class AccordObjectSizes
case PreAcceptedWithDeps:
case PreAcceptedWithVote:
return PREACCEPTED;
- case NotAccepted:
- case PreNotAccepted:
case AcceptedInvalidate:
return NOTACCEPTED;
- case NotAcceptedWithDefinition:
- case NotAcceptedWithDefAndVote:
- case NotAcceptedWithDefAndDeps:
- case PreNotAcceptedWithDefinition:
- case PreNotAcceptedWithDefAndVote:
- case PreNotAcceptedWithDefAndDeps:
case AcceptedInvalidateWithDefinition:
case AcceptedMedium:
case AcceptedMediumWithDefinition:
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 005b856005..a84cfa8f07 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -1051,7 +1051,7 @@ public class AccordService implements IAccordService,
Shutdownable
private static AsyncChain<Void>
populate(CommandStoreTxnBlockedGraph.Builder state, CommandStore store, TxnId
txnId)
{
- AsyncChain<AsyncChain<Void>> submit =
store.submit(PreLoadContext.contextFor(txnId), in -> {
+ AsyncChain<AsyncChain<Void>> submit = store.submit(txnId, in -> {
AsyncChain<Void> chain = populate(state, (AccordSafeCommandStore)
in, txnId);
return chain == null ? AsyncChains.success(null) : chain;
});
diff --git a/src/java/org/apache/cassandra/service/accord/JournalKey.java
b/src/java/org/apache/cassandra/service/accord/JournalKey.java
index 8baa85be2e..b1acbd31a2 100644
--- a/src/java/org/apache/cassandra/service/accord/JournalKey.java
+++ b/src/java/org/apache/cassandra/service/accord/JournalKey.java
@@ -30,19 +30,12 @@ import accord.utils.Invariants;
import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.journal.KeySupport;
-import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.BootstrapBeganAtSerializer;
-import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.CommandDiffSerializer;
-import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.DurableBeforeSerializer;
-import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.FlyweightSerializer;
-import
org.apache.cassandra.service.accord.AccordJournalValueSerializers.RedundantBeforeSerializer;
import org.apache.cassandra.utils.ByteArrayUtil;
import static org.apache.cassandra.db.TypeSizes.BYTE_SIZE;
import static org.apache.cassandra.db.TypeSizes.INT_SIZE;
import static org.apache.cassandra.db.TypeSizes.LONG_SIZE;
import static
org.apache.cassandra.service.accord.AccordJournalValueSerializers.*;
-import static
org.apache.cassandra.service.accord.AccordJournalValueSerializers.RangesForEpochSerializer;
-import static
org.apache.cassandra.service.accord.AccordJournalValueSerializers.SafeToReadSerializer;
public final class JournalKey
{
diff --git
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
index 0f7924179b..edc5fb9629 100644
---
a/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
+++
b/src/java/org/apache/cassandra/service/accord/interop/AccordInteropApply.java
@@ -120,8 +120,6 @@ public class AccordInteropApply extends Apply implements
LocalListeners.ComplexL
default: throw new AssertionError();
case NotDefined:
case PreAccepted:
- case PreNotAccepted:
- case NotAccepted:
case AcceptedInvalidate:
case AcceptedMedium:
case AcceptedSlow:
@@ -247,8 +245,6 @@ public class AccordInteropApply extends Apply implements
LocalListeners.ComplexL
default: throw new UnhandledEnum(command.status());
case NotDefined:
case PreAccepted:
- case PreNotAccepted:
- case NotAccepted:
case AcceptedInvalidate:
case AcceptedMedium:
case AcceptedSlow:
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
similarity index 67%
rename from
src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
rename to
src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
index 905aa1d680..4d1af98514 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializer.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/AwaitSerializers.java
@@ -24,6 +24,8 @@ import accord.api.ProgressLog.BlockedUntil;
import accord.messages.Await;
import accord.messages.Await.AsyncAwaitComplete;
import accord.messages.Await.AwaitOk;
+import accord.messages.RecoverAwait;
+import accord.messages.RecoverAwait.RecoverAwaitOk;
import accord.primitives.Participants;
import accord.primitives.Route;
import accord.primitives.SaveStatus;
@@ -35,12 +37,46 @@ import org.apache.cassandra.io.util.DataInputPlus;
import org.apache.cassandra.io.util.DataOutputPlus;
import org.apache.cassandra.utils.vint.VIntCoding;
-public class AwaitSerializer
+public class AwaitSerializers
{
- public static final IVersionedSerializer<Await> request = new
IVersionedSerializer<>()
+ public static final IVersionedSerializer<Await> request = new
RequestSerializer<>()
{
@Override
- public void serialize(Await await, DataOutputPlus out, int version)
throws IOException
+ public Await deserialize(TxnId txnId, Participants<?> scope,
BlockedUntil blockedUntil, boolean notifyProgressLog, long minAwaitEpoch, long
maxAwaitEpoch, int callbackId, DataInputPlus in, int version)
+ {
+ return Await.SerializerSupport.create(txnId, scope, blockedUntil,
notifyProgressLog, minAwaitEpoch, maxAwaitEpoch, callbackId);
+ }
+ };
+
+ public static final IVersionedSerializer<RecoverAwait> recoverRequest =
new RequestSerializer<>()
+ {
+ @Override
+ public RecoverAwait deserialize(TxnId txnId, Participants<?> scope,
BlockedUntil blockedUntil, boolean notifyProgressLog, long minAwaitEpoch, long
maxAwaitEpoch, int callbackId, DataInputPlus in, int version) throws IOException
+ {
+ TxnId recoverId = CommandSerializers.txnId.deserialize(in,
version);
+ return RecoverAwait.SerializerSupport.create(txnId, scope,
blockedUntil, notifyProgressLog, minAwaitEpoch, maxAwaitEpoch, callbackId,
recoverId);
+ }
+
+ @Override
+ public void serialize(RecoverAwait await, DataOutputPlus out, int
version) throws IOException
+ {
+ super.serialize(await, out, version);
+ CommandSerializers.txnId.serialize(await.recoverId, out, version);
+ }
+
+ @Override
+ public long serializedSize(RecoverAwait await, int version)
+ {
+ return super.serializedSize(await, version) +
CommandSerializers.txnId.serializedSize(await.recoverId, version);
+ }
+ };
+
+ static abstract class RequestSerializer<A extends Await> implements
IVersionedSerializer<A>
+ {
+ abstract A deserialize(TxnId txnId, Participants<?> scope,
BlockedUntil blockedUntil, boolean notifyProgressLog, long minAwaitEpoch, long
maxAwaitEpoch, int callbackId, DataInputPlus in, int version) throws
IOException;
+
+ @Override
+ public void serialize(A await, DataOutputPlus out, int version) throws
IOException
{
CommandSerializers.txnId.serialize(await.txnId, out, version);
KeySerializers.participants.serialize(await.scope, out, version);
@@ -52,7 +88,7 @@ public class AwaitSerializer
}
@Override
- public Await deserialize(DataInputPlus in, int version) throws
IOException
+ public A deserialize(DataInputPlus in, int version) throws IOException
{
TxnId txnId = CommandSerializers.txnId.deserialize(in, version);
Participants<?> scope =
KeySerializers.participants.deserialize(in, version);
@@ -63,11 +99,11 @@ public class AwaitSerializer
long minAwaitEpoch = maxAwaitEpoch - in.readUnsignedVInt();
int callbackId = in.readUnsignedVInt32() - 1;
Invariants.require(callbackId >= -1);
- return Await.SerializerSupport.create(txnId, scope, blockedUntil,
notifyProgressLog, minAwaitEpoch, maxAwaitEpoch, callbackId);
+ return deserialize(txnId, scope, blockedUntil, notifyProgressLog,
minAwaitEpoch, maxAwaitEpoch, callbackId, in, version);
}
@Override
- public long serializedSize(Await await, int version)
+ public long serializedSize(A await, int version)
{
return CommandSerializers.txnId.serializedSize(await.txnId,
version)
+ KeySerializers.participants.serializedSize(await.scope,
version)
@@ -79,6 +115,7 @@ public class AwaitSerializer
};
public static final IVersionedSerializer<AwaitOk> syncReply = new
EnumSerializer<>(AwaitOk.class);
+ public static final IVersionedSerializer<RecoverAwaitOk> recoverReply =
new EnumSerializer<>(RecoverAwaitOk.class);
public static final IVersionedSerializer<AsyncAwaitComplete> asyncReply =
new IVersionedSerializer<>()
{
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/CalculateDepsSerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/CalculateDepsSerializers.java
deleted file mode 100644
index ab9f596374..0000000000
---
a/src/java/org/apache/cassandra/service/accord/serializers/CalculateDepsSerializers.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.service.accord.serializers;
-
-import java.io.IOException;
-
-import accord.messages.CalculateDeps;
-import accord.messages.CalculateDeps.CalculateDepsOk;
-import accord.primitives.Route;
-import accord.primitives.Timestamp;
-import accord.primitives.TxnId;
-import org.apache.cassandra.io.IVersionedSerializer;
-import org.apache.cassandra.io.util.DataInputPlus;
-import org.apache.cassandra.io.util.DataOutputPlus;
-import
org.apache.cassandra.service.accord.serializers.CommandSerializers.ExecuteAtSerializer;
-
-public class CalculateDepsSerializers
-{
- public static final IVersionedSerializer<CalculateDeps> request = new
TxnRequestSerializer.WithUnsyncedSerializer<>()
- {
- @Override
- public void serializeBody(CalculateDeps msg, DataOutputPlus out, int
version) throws IOException
- {
- ExecuteAtSerializer.serialize(msg.executeAt, out);
- }
-
- @Override
- public CalculateDeps deserializeBody(DataInputPlus in, int version,
TxnId txnId, Route<?> scope, long waitForEpoch, long minEpoch) throws
IOException
- {
- Timestamp executeAt = ExecuteAtSerializer.deserialize(in);
- return CalculateDeps.SerializationSupport.create(txnId, scope,
waitForEpoch, minEpoch, executeAt);
- }
-
- @Override
- public long serializedBodySize(CalculateDeps msg, int version)
- {
- return ExecuteAtSerializer.serializedSize(msg.executeAt);
- }
- };
-
- public static final IVersionedSerializer<CalculateDepsOk> reply = new
IVersionedSerializer<>()
- {
- @Override
- public void serialize(CalculateDepsOk reply, DataOutputPlus out, int
version) throws IOException
- {
- DepsSerializers.deps.serialize(reply.deps, out, version);
- }
-
- @Override
- public CalculateDepsOk deserialize(DataInputPlus in, int version)
throws IOException
- {
- return new CalculateDepsOk(DepsSerializers.deps.deserialize(in,
version));
- }
-
- @Override
- public long serializedSize(CalculateDepsOk reply, int version)
- {
- return DepsSerializers.deps.serializedSize(reply.deps, version);
- }
- };
-}
diff --git
a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
index bc1ec7fcbd..ee0000f1a4 100644
---
a/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
+++
b/src/java/org/apache/cassandra/service/accord/serializers/RecoverySerializers.java
@@ -34,6 +34,7 @@ import accord.primitives.FullRoute;
import accord.primitives.Known.KnownDeps;
import accord.primitives.LatestDeps;
import accord.primitives.PartialTxn;
+import accord.primitives.Participants;
import accord.primitives.Route;
import accord.primitives.Status;
import accord.primitives.Timestamp;
@@ -101,9 +102,9 @@ public class RecoverySerializers
latestDeps.serialize(recoverOk.deps, out, version);
DepsSerializers.deps.serialize(recoverOk.earlierWait, out,
version);
DepsSerializers.deps.serialize(recoverOk.earlierNoWait, out,
version);
- DepsSerializers.deps.serialize(recoverOk.laterWait, out, version);
- DepsSerializers.deps.serialize(recoverOk.laterNoWait, out,
version);
+ DepsSerializers.deps.serialize(recoverOk.laterCoordRejects, out,
version);
out.writeBoolean(recoverOk.selfAcceptsFastPath);
+
KeySerializers.nullableParticipants.serialize(recoverOk.coordinatorAcceptsFastPath,
out, version);
out.writeBoolean(recoverOk.supersedingRejects);
CommandSerializers.nullableWrites.serialize(recoverOk.writes, out,
version);
}
@@ -121,9 +122,9 @@ public class RecoverySerializers
return new RecoverNack(kind, supersededBy);
}
- RecoverOk deserializeOk(TxnId txnId, Status status, Ballot accepted,
Timestamp executeAt, @Nonnull LatestDeps deps, Deps earlierWait, Deps
earlierNoWait, Deps laterWait, Deps laterNoWait, boolean acceptsFastPath,
boolean rejectsFastPath, Writes writes, Result result, DataInputPlus in, int
version)
+ RecoverOk deserializeOk(TxnId txnId, Status status, Ballot accepted,
Timestamp executeAt, @Nonnull LatestDeps deps, Deps earlierWait, Deps
earlierNoWait, Deps laterCoordRejects, boolean acceptsFastPath, @Nullable
Participants<?> coordinatorAcceptsFastPath, boolean rejectsFastPath, Writes
writes, Result result, DataInputPlus in, int version)
{
- return new RecoverOk(txnId, status, accepted, executeAt, deps,
earlierWait, earlierNoWait, laterWait, laterNoWait, acceptsFastPath,
rejectsFastPath, writes, result);
+ return new RecoverOk(txnId, status, accepted, executeAt, deps,
earlierWait, earlierNoWait, laterCoordRejects, acceptsFastPath,
coordinatorAcceptsFastPath, rejectsFastPath, writes, result);
}
@Override
@@ -148,8 +149,8 @@ public class RecoverySerializers
DepsSerializers.deps.deserialize(in, version),
DepsSerializers.deps.deserialize(in, version),
DepsSerializers.deps.deserialize(in, version),
- DepsSerializers.deps.deserialize(in, version),
in.readBoolean(),
+
KeySerializers.nullableParticipants.deserialize(in, version),
in.readBoolean(),
CommandSerializers.nullableWrites.deserialize(in, version),
result,
@@ -171,9 +172,9 @@ public class RecoverySerializers
size += latestDeps.serializedSize(recoverOk.deps, version);
size += DepsSerializers.deps.serializedSize(recoverOk.earlierWait,
version);
size +=
DepsSerializers.deps.serializedSize(recoverOk.earlierNoWait, version);
- size += DepsSerializers.deps.serializedSize(recoverOk.laterWait,
version);
- size += DepsSerializers.deps.serializedSize(recoverOk.laterNoWait,
version);
+ size +=
DepsSerializers.deps.serializedSize(recoverOk.laterCoordRejects, version);
size += TypeSizes.sizeof(recoverOk.selfAcceptsFastPath);
+ size +=
KeySerializers.nullableParticipants.serializedSize(recoverOk.coordinatorAcceptsFastPath,
version);
size += TypeSizes.sizeof(recoverOk.supersedingRejects);
size +=
CommandSerializers.nullableWrites.serializedSize(recoverOk.writes, version);
return size;
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
index 72db50303a..231610b198 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordIncrementalRepairTest.java
@@ -243,7 +243,7 @@ public class AccordIncrementalRepairTest extends
AccordTestBase
long now = Clock.Global.currentTimeMillis();
if (now - start > TimeUnit.MINUTES.toMillis(1))
throw new AssertionError("Timeout");
-
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(PreLoadContext.contextFor(txnId),
key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
+
AsyncChains.awaitUninterruptibly(node.commandStores().ifLocal(txnId,
key.toUnseekable(), 0, Long.MAX_VALUE, safeStore -> {
SafeCommand command = safeStore.get(txnId,
StoreParticipants.empty(txnId));
Assert.assertNotNull(command.current());
if (command.current().status().hasBeen(Status.Applied))
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
index 547ee77568..6cd168ac43 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordCommandStoreTest.java
@@ -61,6 +61,7 @@ import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.accord.api.AccordRoutingKey.TokenKey;
import org.apache.cassandra.service.accord.api.PartitionKey;
+import
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializerTest.TestSafeCommandStore;
import org.apache.cassandra.service.accord.serializers.ResultSerializers;
import org.apache.cassandra.service.consensus.TransactionalMode;
import org.apache.cassandra.utils.Pair;
@@ -162,8 +163,8 @@ public class AccordCommandStoreTest
AccordSafeCommandsForKey cfk = new
AccordSafeCommandsForKey(loaded(key, null));
cfk.initialize();
- cfk.set(cfk.current().update(command1).cfk());
- cfk.set(cfk.current().update(command2).cfk());
+ cfk.set(cfk.current().update(new
TestSafeCommandStore(command1.txnId()), command1).cfk());
+ cfk.set(cfk.current().update(new
TestSafeCommandStore(command1.txnId()), command2).cfk());
AccordKeyspace.getCommandsForKeyUpdater(commandStore.id(),
(TokenKey)cfk.key(), cfk.current(), null,
commandStore.nextSystemTimestampMicros()).run();
logger.info("E: {}", cfk);
diff --git a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
index 6317008c86..1ef9b0227e 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordTaskTest.java
@@ -126,7 +126,7 @@ public class AccordTaskTest
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
- getUninterruptibly(commandStore.execute(contextFor(txnId), instance ->
{
+ getUninterruptibly(commandStore.execute(txnId, instance -> {
// TODO review: This change to `ifInitialized` was done in a lot
of places and it doesn't preserve this property
// I fixed this reference to point to `ifLoadedAndInitialised` and
but didn't update other places
Assert.assertNull(instance.ifInitialised(txnId));
@@ -140,7 +140,7 @@ public class AccordTaskTest
AccordCommandStore commandStore =
createAccordCommandStore(clock::incrementAndGet, "ks", "tbl");
TxnId txnId = txnId(1, clock.incrementAndGet(), 1);
- getUninterruptibly(commandStore.execute(contextFor(txnId), safe -> {
+ getUninterruptibly(commandStore.execute(txnId, safe -> {
StoreParticipants participants = StoreParticipants.empty(txnId);
SafeCommand command = safe.get(txnId, participants);
Assert.assertNotNull(command);
diff --git
a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
index 40d75816d3..b24a60a670 100644
--- a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java
@@ -47,7 +47,6 @@ import org.assertj.core.api.SoftAssertions;
import static accord.api.Journal.*;
import static accord.impl.CommandChange.*;
-import static accord.impl.CommandChange.getFlags;
import static accord.utils.Property.qt;
import static
org.apache.cassandra.cql3.statements.schema.CreateTableStatement.parse;
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimpleSimulatedAccordCommandStoreTest.java
b/test/unit/org/apache/cassandra/service/accord/SimpleSimulatedAccordCommandStoreTest.java
index dd8678c1ef..5afc99d906 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimpleSimulatedAccordCommandStoreTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimpleSimulatedAccordCommandStoreTest.java
@@ -20,7 +20,6 @@ package org.apache.cassandra.service.accord;
import org.junit.Test;
-import accord.local.PreLoadContext;
import accord.local.StoreParticipants;
import accord.primitives.SaveStatus;
import accord.primitives.TxnId;
@@ -41,7 +40,7 @@ public class SimpleSimulatedAccordCommandStoreTest extends
SimulatedAccordComman
for (int i = 0, examples = 100; i < examples; i++)
{
TxnId id = AccordGens.txnIds().next(rs);
- instance.process(PreLoadContext.contextFor(id), (safe) -> {
+ instance.process(id, (safe) -> {
var safeCommand = safe.get(id,
StoreParticipants.empty(id));
var command = safeCommand.current();
Assertions.assertThat(command.saveStatus()).isEqualTo(SaveStatus.Uninitialised);
diff --git
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
index ad8a995794..6ec60a3778 100644
---
a/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
+++
b/test/unit/org/apache/cassandra/service/accord/SimulatedAccordCommandStoreTestBase.java
@@ -321,7 +321,7 @@ public abstract class SimulatedAccordCommandStoreTestBase
extends CQLTester
{
var range = deps.rangeDeps.range(i);
Assertions.assertThat(rangeConflicts).describedAs("Txn %s
had an unexpected range", txnId).containsKey(range);
- var conflict = deps.rangeDeps.txnIdsForRangeIndex(i);
+ var conflict =
deps.rangeDeps.txnIdsWithFlagsForRangeIndex(i);
List<TxnId> expectedConflict = rangeConflicts.get(range);
Assertions.assertThat(conflict).describedAs("Txn %s
Expected range %s to have different conflicting txns", txnId,
range).isEqualTo(expectedConflict);
}
@@ -344,7 +344,7 @@ public abstract class SimulatedAccordCommandStoreTestBase
extends CQLTester
{
Assertions.assertThat(deps.keyDeps.keys()).describedAs("Txn %s
Keys", txnId).isEqualTo(RoutingKeys.of(keyConflicts.keySet()));
for (var key : keyConflicts.keySet())
-
Assertions.assertThat(deps.keyDeps.txnIds(key)).describedAs("Txn %s for key
%s", txnId, key).isEqualTo(keyConflicts.get(key));
+
Assertions.assertThat(deps.keyDeps.txnIdsWithFlags(key)).describedAs("Txn %s
for key %s", txnId, key).isEqualTo(keyConflicts.get(key));
}
}
diff --git
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
index a6a3531d8a..ddd5146462 100644
---
a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
+++
b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java
@@ -28,7 +28,10 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import java.util.function.BooleanSupplier;
+import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.LongUnaryOperator;
@@ -42,36 +45,57 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
+import accord.api.Agent;
+import accord.api.DataStore;
+import accord.api.Journal;
import accord.api.Key;
+import accord.api.ProgressLog;
+import accord.api.Result;
import accord.api.RoutingKey;
+import accord.api.Timeouts;
+import accord.impl.AbstractSafeCommandStore;
+import accord.impl.DefaultLocalListeners;
+import accord.impl.DefaultRemoteListeners;
import accord.local.Command;
+import accord.local.CommandStore;
+import accord.local.DurableBefore;
import accord.local.ICommand;
import accord.local.Node;
+import accord.local.NodeCommandStoreService;
+import accord.local.PreLoadContext;
+import accord.local.SafeCommand;
+import accord.local.SafeCommandStore;
import accord.local.StoreParticipants;
import accord.local.cfk.CommandsForKey;
import accord.local.cfk.CommandsForKey.InternalStatus;
import accord.local.cfk.CommandsForKey.TxnInfo;
import accord.local.cfk.CommandsForKey.Unmanaged;
+import accord.local.cfk.SafeCommandsForKey;
import accord.local.cfk.Serialize;
+import accord.messages.ReplyContext;
import accord.primitives.Ballot;
import accord.primitives.KeyDeps;
import accord.primitives.Known;
import accord.primitives.PartialDeps;
import accord.primitives.PartialTxn;
import accord.primitives.RangeDeps;
+import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.SaveStatus;
import accord.primitives.Status;
import accord.primitives.Timestamp;
import accord.primitives.Txn;
import accord.primitives.TxnId;
+import accord.primitives.Unseekables;
import accord.primitives.Writes;
+import accord.topology.TopologyManager;
import accord.utils.AccordGens;
import accord.utils.Gen;
import accord.utils.Gens;
import accord.utils.RandomSource;
import accord.utils.SortedArrays;
import accord.utils.UnhandledEnum;
+import accord.utils.async.AsyncChain;
import org.agrona.collections.Int2ObjectHashMap;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.dht.Murmur3Partitioner;
@@ -191,8 +215,6 @@ public class CommandsForKeySerializerTest
case PreAcceptedWithDeps:
return Command.PreAccepted.preaccepted(builder(),
saveStatus);
case AcceptedInvalidate:
- case PreNotAccepted:
- case NotAccepted:
return
Command.NotAcceptedWithoutDefinition.notAccepted(builder(), saveStatus);
case AcceptedMedium:
case AcceptedMediumWithDefinition:
@@ -201,12 +223,6 @@ public class CommandsForKeySerializerTest
case AcceptedSlowWithDefinition:
case AcceptedSlowWithDefAndVote:
case AcceptedInvalidateWithDefinition:
- case PreNotAcceptedWithDefinition:
- case PreNotAcceptedWithDefAndVote:
- case PreNotAcceptedWithDefAndDeps:
- case NotAcceptedWithDefinition:
- case NotAcceptedWithDefAndVote:
- case NotAcceptedWithDefAndDeps:
case PreCommittedWithDefinition:
case PreCommittedWithDefAndDeps:
case PreCommittedWithDefAndFixedDeps:
@@ -289,8 +305,6 @@ public class CommandsForKeySerializerTest
case PreApplied:
case Applied:
isDurable = source.nextBoolean();
- case PreNotAccepted:
- case NotAccepted:
case AcceptedInvalidate:
case AcceptedMedium:
case AcceptedSlow:
@@ -503,7 +517,7 @@ public class CommandsForKeySerializerTest
{
int next = source.nextInt(commands.size());
Command command = commands.get(next);
- cfk = cfk.update(command).cfk();
+ cfk = cfk.update(new TestSafeCommandStore(command.txnId()),
command).cfk();
commands.set(next, commands.get(commands.size() - 1));
commands.remove(commands.size() - 1);
}
@@ -615,4 +629,77 @@ public class CommandsForKeySerializerTest
CommandsForKey roundTrip = Serialize.fromBytes(pk, buffer);
Assert.assertEquals(expected, roundTrip);
}
+
+ static class TestCommandStore extends CommandStore implements Agent
+ {
+ static final TestCommandStore INSTANCE = new TestCommandStore();
+ protected TestCommandStore()
+ {
+ super(0,
+ null,
+ null,
+ null,
+ ignore -> new ProgressLog.NoOpProgressLog(),
+ ignore -> new DefaultLocalListeners(new
DefaultRemoteListeners((a, b, c, d, e)->{}),
DefaultLocalListeners.DefaultNotifySink.INSTANCE),
+ new EpochUpdateHolder());
+ }
+
+ @Override public boolean inStore() { return true; }
+ @Override public Journal.Loader loader() { throw new
UnsupportedOperationException(); }
+ @Override public Agent agent() { return this; }
+ @Override public AsyncChain<Void> execute(PreLoadContext context,
Consumer<? super SafeCommandStore> consumer) { return null; }
+ @Override public <T> AsyncChain<T> submit(PreLoadContext context,
Function<? super SafeCommandStore, T> apply) { throw new
UnsupportedOperationException(); }
+ @Override public void shutdown() { }
+ @Override protected void registerTransitive(SafeCommandStore
safeStore, RangeDeps deps){ }
+ @Override public <T> AsyncChain<T> submit(Callable<T> task) { throw
new UnsupportedOperationException(); }
+ @Override public void onRecover(Node node, Result success, Throwable
fail) { throw new UnsupportedOperationException(); }
+ @Override public void onInconsistentTimestamp(Command command,
Timestamp prev, Timestamp next) { throw new UnsupportedOperationException(); }
+ @Override public void onFailedBootstrap(String phase, Ranges ranges,
Runnable retry, Throwable failure) { throw new UnsupportedOperationException();
}
+ @Override public void onStale(Timestamp staleSince, Ranges ranges) {
throw new UnsupportedOperationException(); }
+ @Override public void onUncaughtException(Throwable t) { throw new
UnsupportedOperationException(); }
+ @Override public void onCaughtException(Throwable t, String context) {
throw new UnsupportedOperationException(); }
+ @Override public long preAcceptTimeout() { throw new
UnsupportedOperationException(); }
+ @Override public long cfkHlcPruneDelta() { return 0; }
+ @Override public int cfkPruneInterval() { return 0; }
+ @Override public long maxConflictsHlcPruneDelta() { return 0; }
+ @Override public long maxConflictsPruneInterval() { return 0; }
+ @Override public Txn emptySystemTxn(Txn.Kind kind, Routable.Domain
domain) { throw new UnsupportedOperationException(); }
+ @Override public long attemptCoordinationDelay(Node node,
SafeCommandStore safeStore, TxnId txnId, TimeUnit units, int retryCount) {
return 0; }
+ @Override public long seekProgressDelay(Node node, SafeCommandStore
safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil blockedUntil,
TimeUnit units) { return 0; }
+ @Override public long retryAwaitTimeout(Node node, SafeCommandStore
safeStore, TxnId txnId, int retryCount, ProgressLog.BlockedUntil retrying,
TimeUnit units) { return 0; }
+ @Override public long localExpiresAt(TxnId txnId, Status.Phase phase,
TimeUnit unit) { return 0; }
+ @Override public long expiresAt(ReplyContext replyContext, TimeUnit
unit) { return 0; }
+ }
+
+ public static class TestSafeCommandStore extends AbstractSafeCommandStore
+ {
+ public TestSafeCommandStore(PreLoadContext context)
+ {
+ super(context, TestCommandStore.INSTANCE);
+ }
+
+ @Override protected CommandStoreCaches tryGetCaches() { return null; }
+ @Override protected SafeCommand add(SafeCommand safeCommand,
CommandStoreCaches caches) { return null; }
+ @Override protected SafeCommandsForKey add(SafeCommandsForKey safeCfk,
CommandStoreCaches caches) { return null; }
+ @Override protected SafeCommand getInternal(TxnId txnId) { return
null; }
+ @Override protected SafeCommandsForKey getInternal(RoutingKey key) {
return null; }
+ @Override public DataStore dataStore() { return null; }
+ @Override public Agent agent() { return null; }
+ @Override public ProgressLog progressLog() { return null; }
+ @Override public NodeCommandStoreService node() { return new
NodeCommandStoreService()
+ {
+ @Override public long epoch() { return 0;}
+ @Override public Node.Id id() { return Node.Id.NONE; }
+ @Override public Timeouts timeouts() { return null; }
+ @Override public DurableBefore durableBefore() { return null;}
+ @Override public Timestamp uniqueNow() { return null; }
+ @Override public Timestamp uniqueNow(Timestamp atLeast) { return
null; }
+ @Override public TopologyManager topology() { return null; }
+ @Override public long now() { return 0; }
+ @Override public long elapsed(TimeUnit unit) { return 0; }
+ }; }
+ @Override public boolean visit(Unseekables<?> keysOrRanges, TxnId
testTxnId, Txn.Kind.Kinds testKind, TestStartedAt testStartedAt, Timestamp
testStartAtTimestamp, ComputeIsDep computeIsDep, AllCommandVisitor visit) {
return false; }
+ @Override public <P1, P2> void visit(Unseekables<?> keysOrRanges,
Timestamp startedBefore, Txn.Kind.Kinds testKind, ActiveCommandVisitor<P1, P2>
visit, P1 p1, P2 p2) { }
+ }
+
}
diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
index fef3f9a83f..05c731f7db 100644
--- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java
@@ -254,14 +254,6 @@ public class AccordGenerators
case PreAcceptedWithVote:
case PreAcceptedWithDeps:
return
Command.PreAccepted.preaccepted(attributes(saveStatus), saveStatus);
- case PreNotAccepted:
- case PreNotAcceptedWithDefinition:
- case PreNotAcceptedWithDefAndDeps:
- case PreNotAcceptedWithDefAndVote:
- case NotAccepted:
- case NotAcceptedWithDefinition:
- case NotAcceptedWithDefAndDeps:
- case NotAcceptedWithDefAndVote:
case AcceptedInvalidate:
return
Command.NotAcceptedWithoutDefinition.acceptedInvalidate(attributes(saveStatus));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]