This is an automated email from the ASF dual-hosted git repository.
aleksey pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push:
new 2230fb55cd CEP-15: Extend Accord MessageType with a side effect flag
2230fb55cd is described below
commit 2230fb55cd0c0ee9c4d0732ab1a5ad2ffb28e15f
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Fri Jun 2 11:45:41 2023 +0100
CEP-15: Extend Accord MessageType with a side effect flag
patch by Aleksey Yeschenko; reviewed by Benedic Elliott Smith for
CASSANDRA-18561
---
modules/accord | 2 +-
src/java/org/apache/cassandra/net/Verb.java | 14 +++++++-------
.../apache/cassandra/service/accord/AccordJournal.java | 10 +++++-----
.../cassandra/service/accord/AccordMessageSink.java | 16 ++++++++--------
.../cassandra/service/accord/AccordMessageSinkTest.java | 2 +-
5 files changed, 22 insertions(+), 22 deletions(-)
diff --git a/modules/accord b/modules/accord
index 3d0ff07cd5..8830d97ba5 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 3d0ff07cd5c7db43390b85afa593e6f76471d886
+Subproject commit 8830d97ba517fb2d0f7f22e8e6b886a98839e694
diff --git a/src/java/org/apache/cassandra/net/Verb.java
b/src/java/org/apache/cassandra/net/Verb.java
index 781261e655..c9a678a2ca 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -229,8 +229,8 @@ public enum Verb
// accord
ACCORD_SIMPLE_RSP (119, P2, writeTimeout, REQUEST_RESPONSE,
() -> EnumSerializer.simpleReply, RESPONSE_HANDLER
),
- ACCORD_PREACCEPT_RSP (121, P2, writeTimeout, REQUEST_RESPONSE,
() -> PreacceptSerializers.reply, RESPONSE_HANDLER
),
- ACCORD_PREACCEPT_REQ (120, P2, writeTimeout, IMMEDIATE,
() -> PreacceptSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_PREACCEPT_RSP ),
+ ACCORD_PRE_ACCEPT_RSP (121, P2, writeTimeout, REQUEST_RESPONSE,
() -> PreacceptSerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_PRE_ACCEPT_REQ (120, P2, writeTimeout, IMMEDIATE,
() -> PreacceptSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_PRE_ACCEPT_RSP ),
ACCORD_ACCEPT_RSP (124, P2, writeTimeout, REQUEST_RESPONSE,
() -> AcceptSerializers.reply, RESPONSE_HANDLER
),
ACCORD_ACCEPT_REQ (122, P2, writeTimeout, IMMEDIATE,
() -> AcceptSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP ),
ACCORD_ACCEPT_INVALIDATE_REQ (123, P2, writeTimeout, IMMEDIATE,
() -> AcceptSerializers.invalidate, () ->
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP ),
@@ -240,13 +240,13 @@ public enum Verb
ACCORD_COMMIT_INVALIDATE_REQ (126, P2, writeTimeout, IMMEDIATE,
() -> CommitSerializers.invalidate, () ->
AccordService.instance().verbHandler() ),
ACCORD_APPLY_RSP (130, P2, writeTimeout, REQUEST_RESPONSE,
() -> ApplySerializers.reply, RESPONSE_HANDLER
),
ACCORD_APPLY_REQ (129, P2, writeTimeout, IMMEDIATE,
() -> ApplySerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_APPLY_RSP ),
- ACCORD_RECOVER_RSP (132, P2, writeTimeout, REQUEST_RESPONSE,
() -> RecoverySerializers.reply, RESPONSE_HANDLER
),
- ACCORD_RECOVER_REQ (131, P2, writeTimeout, IMMEDIATE,
() -> RecoverySerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_RECOVER_RSP ),
+ ACCORD_BEGIN_RECOVER_RSP (132, P2, writeTimeout, REQUEST_RESPONSE,
() -> RecoverySerializers.reply, RESPONSE_HANDLER
),
+ ACCORD_BEGIN_RECOVER_REQ (131, P2, writeTimeout, IMMEDIATE,
() -> RecoverySerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_BEGIN_RECOVER_RSP ),
ACCORD_BEGIN_INVALIDATE_RSP (134, P2, writeTimeout, REQUEST_RESPONSE,
() -> BeginInvalidationSerializers.reply, RESPONSE_HANDLER
),
ACCORD_BEGIN_INVALIDATE_REQ (133, P2, writeTimeout, IMMEDIATE,
() -> BeginInvalidationSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_BEGIN_INVALIDATE_RSP ),
- ACCORD_WAIT_COMMIT_RSP (136, P2, writeTimeout, REQUEST_RESPONSE,
() -> WaitOnCommitSerializer.reply, RESPONSE_HANDLER
),
- ACCORD_WAIT_COMMIT_REQ (135, P2, writeTimeout, IMMEDIATE,
() -> WaitOnCommitSerializer.request, () ->
AccordService.instance().verbHandler(), ACCORD_WAIT_COMMIT_RSP ),
- ACCORD_INFORM_OF_TXNID_REQ (137, P2, writeTimeout, IMMEDIATE,
() -> InformOfTxnIdSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
+ ACCORD_WAIT_ON_COMMIT_RSP (136, P2, writeTimeout, REQUEST_RESPONSE,
() -> WaitOnCommitSerializer.reply, RESPONSE_HANDLER
),
+ ACCORD_WAIT_ON_COMMIT_REQ (135, P2, writeTimeout, IMMEDIATE,
() -> WaitOnCommitSerializer.request, () ->
AccordService.instance().verbHandler(), ACCORD_WAIT_ON_COMMIT_RSP ),
+ ACCORD_INFORM_OF_TXN_REQ (137, P2, writeTimeout, IMMEDIATE,
() -> InformOfTxnIdSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
ACCORD_INFORM_HOME_DURABLE_REQ (138, P2, writeTimeout, IMMEDIATE,
() -> InformHomeDurableSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
ACCORD_INFORM_DURABLE_REQ (139, P2, writeTimeout, IMMEDIATE,
() -> InformDurableSerializers.request, () ->
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP ),
ACCORD_CHECK_STATUS_RSP (141, P2, writeTimeout, REQUEST_RESPONSE,
() -> CheckStatusSerializers.reply, RESPONSE_HANDLER
),
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index da50dd15da..87fb14e738 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -382,10 +382,10 @@ public class AccordJournal
*/
public enum Type implements IVersionedSerializer<TxnRequest<?>>
{
- PREACCEPT_REQ (0, MessageType.PREACCEPT_REQ,
PreacceptSerializers.request),
- ACCEPT_REQ (1, MessageType.ACCEPT_REQ, AcceptSerializers.request
),
- COMMIT_REQ (2, MessageType.COMMIT_REQ, CommitSerializers.request
),
- APPLY_REQ (3, MessageType.APPLY_REQ, ApplySerializers.request
);
+ PREACCEPT_REQ (0, MessageType.PRE_ACCEPT_REQ,
PreacceptSerializers.request),
+ ACCEPT_REQ (1, MessageType.ACCEPT_REQ,
AcceptSerializers.request ),
+ COMMIT_REQ (2, MessageType.COMMIT_REQ,
CommitSerializers.request ),
+ APPLY_REQ (3, MessageType.APPLY_REQ, ApplySerializers.request
);
final int id;
final MessageType msgType;
@@ -460,7 +460,7 @@ public class AccordJournal
static boolean mustMakeDurable(TxnRequest<?> message)
{
- return msgTypeToTypeMap.containsKey(message.type());
+ return message.type().hasSideEffects;
}
@Override
diff --git
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index ff83f70756..7bfcb82de5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -55,24 +55,24 @@ public class AccordMessageSink implements MessageSink
private VerbMapping()
{
- mapping.put(MessageType.PREACCEPT_REQ,
Verb.ACCORD_PREACCEPT_REQ);
- mapping.put(MessageType.PREACCEPT_RSP,
Verb.ACCORD_PREACCEPT_RSP);
+ mapping.put(MessageType.PRE_ACCEPT_REQ,
Verb.ACCORD_PRE_ACCEPT_REQ);
+ mapping.put(MessageType.PRE_ACCEPT_RSP,
Verb.ACCORD_PRE_ACCEPT_RSP);
mapping.put(MessageType.ACCEPT_REQ,
Verb.ACCORD_ACCEPT_REQ);
mapping.put(MessageType.ACCEPT_RSP,
Verb.ACCORD_ACCEPT_RSP);
mapping.put(MessageType.ACCEPT_INVALIDATE_REQ,
Verb.ACCORD_ACCEPT_INVALIDATE_REQ);
mapping.put(MessageType.COMMIT_REQ,
Verb.ACCORD_COMMIT_REQ);
- mapping.put(MessageType.COMMIT_INVALIDATE,
Verb.ACCORD_COMMIT_INVALIDATE_REQ);
+ mapping.put(MessageType.COMMIT_INVALIDATE_REQ,
Verb.ACCORD_COMMIT_INVALIDATE_REQ);
mapping.put(MessageType.APPLY_REQ,
Verb.ACCORD_APPLY_REQ);
mapping.put(MessageType.APPLY_RSP,
Verb.ACCORD_APPLY_RSP);
mapping.put(MessageType.READ_REQ,
Verb.ACCORD_READ_REQ);
mapping.put(MessageType.READ_RSP,
Verb.ACCORD_READ_RSP);
- mapping.put(MessageType.BEGIN_RECOVER_REQ,
Verb.ACCORD_RECOVER_REQ);
- mapping.put(MessageType.BEGIN_RECOVER_RSP,
Verb.ACCORD_RECOVER_RSP);
+ mapping.put(MessageType.BEGIN_RECOVER_REQ,
Verb.ACCORD_BEGIN_RECOVER_REQ);
+ mapping.put(MessageType.BEGIN_RECOVER_RSP,
Verb.ACCORD_BEGIN_RECOVER_RSP);
mapping.put(MessageType.BEGIN_INVALIDATE_REQ,
Verb.ACCORD_BEGIN_INVALIDATE_REQ);
mapping.put(MessageType.BEGIN_INVALIDATE_RSP,
Verb.ACCORD_BEGIN_INVALIDATE_RSP);
- mapping.put(MessageType.WAIT_ON_COMMIT_REQ,
Verb.ACCORD_WAIT_COMMIT_REQ);
- mapping.put(MessageType.WAIT_ON_COMMIT_RSP,
Verb.ACCORD_WAIT_COMMIT_RSP);
- mapping.put(MessageType.INFORM_TXNID_REQ,
Verb.ACCORD_INFORM_OF_TXNID_REQ);
+ mapping.put(MessageType.WAIT_ON_COMMIT_REQ,
Verb.ACCORD_WAIT_ON_COMMIT_REQ);
+ mapping.put(MessageType.WAIT_ON_COMMIT_RSP,
Verb.ACCORD_WAIT_ON_COMMIT_RSP);
+ mapping.put(MessageType.INFORM_OF_TXN_REQ,
Verb.ACCORD_INFORM_OF_TXN_REQ);
mapping.put(MessageType.INFORM_HOME_DURABLE_REQ,Verb.ACCORD_INFORM_HOME_DURABLE_REQ);
mapping.put(MessageType.INFORM_DURABLE_REQ,
Verb.ACCORD_INFORM_DURABLE_REQ);
mapping.put(MessageType.CHECK_STATUS_REQ,
Verb.ACCORD_CHECK_STATUS_REQ);
diff --git
a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
index a1657bb38f..145bb64278 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
@@ -45,7 +45,7 @@ public class AccordMessageSinkTest
// There was an issue where the reply was the wrong verb
// see CASSANDRA-18375
InformOfTxnId info = Mockito.mock(InformOfTxnId.class);
- Message<InformOfTxnId> req =
Message.builder(Verb.ACCORD_INFORM_OF_TXNID_REQ, info).build();
+ Message<InformOfTxnId> req =
Message.builder(Verb.ACCORD_INFORM_OF_TXN_REQ, info).build();
SimpleReply reply = SimpleReply.Ok;
Messaging messaging = Mockito.mock(Messaging.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]