This is an automated email from the ASF dual-hosted git repository.

aleksey pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 2230fb55cd CEP-15: Extend Accord MessageType with a side effect flag
2230fb55cd is described below

commit 2230fb55cd0c0ee9c4d0732ab1a5ad2ffb28e15f
Author: Aleksey Yeschenko <[email protected]>
AuthorDate: Fri Jun 2 11:45:41 2023 +0100

    CEP-15: Extend Accord MessageType with a side effect flag
    
    patch by Aleksey Yeschenko; reviewed by Benedic Elliott Smith for
    CASSANDRA-18561
---
 modules/accord                                           |  2 +-
 src/java/org/apache/cassandra/net/Verb.java              | 14 +++++++-------
 .../apache/cassandra/service/accord/AccordJournal.java   | 10 +++++-----
 .../cassandra/service/accord/AccordMessageSink.java      | 16 ++++++++--------
 .../cassandra/service/accord/AccordMessageSinkTest.java  |  2 +-
 5 files changed, 22 insertions(+), 22 deletions(-)

diff --git a/modules/accord b/modules/accord
index 3d0ff07cd5..8830d97ba5 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 3d0ff07cd5c7db43390b85afa593e6f76471d886
+Subproject commit 8830d97ba517fb2d0f7f22e8e6b886a98839e694
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 781261e655..c9a678a2ca 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -229,8 +229,8 @@ public enum Verb
 
     // accord
     ACCORD_SIMPLE_RSP               (119, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> EnumSerializer.simpleReply,           RESPONSE_HANDLER                   
                                         ),
-    ACCORD_PREACCEPT_RSP            (121, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> PreacceptSerializers.reply,           RESPONSE_HANDLER                   
                                         ),
-    ACCORD_PREACCEPT_REQ            (120, P2, writeTimeout, IMMEDIATE,         
 () -> PreacceptSerializers.request,         () -> 
AccordService.instance().verbHandler(), ACCORD_PREACCEPT_RSP          ),
+    ACCORD_PRE_ACCEPT_RSP           (121, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> PreacceptSerializers.reply,           RESPONSE_HANDLER                   
                                         ),
+    ACCORD_PRE_ACCEPT_REQ           (120, P2, writeTimeout, IMMEDIATE,         
 () -> PreacceptSerializers.request,         () -> 
AccordService.instance().verbHandler(), ACCORD_PRE_ACCEPT_RSP         ),
     ACCORD_ACCEPT_RSP               (124, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> AcceptSerializers.reply,              RESPONSE_HANDLER                   
                                         ),
     ACCORD_ACCEPT_REQ               (122, P2, writeTimeout, IMMEDIATE,         
 () -> AcceptSerializers.request,            () -> 
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP             ),
     ACCORD_ACCEPT_INVALIDATE_REQ    (123, P2, writeTimeout, IMMEDIATE,         
 () -> AcceptSerializers.invalidate,         () -> 
AccordService.instance().verbHandler(), ACCORD_ACCEPT_RSP             ),
@@ -240,13 +240,13 @@ public enum Verb
     ACCORD_COMMIT_INVALIDATE_REQ    (126, P2, writeTimeout, IMMEDIATE,         
 () -> CommitSerializers.invalidate,         () -> 
AccordService.instance().verbHandler()                                ),
     ACCORD_APPLY_RSP                (130, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> ApplySerializers.reply,               RESPONSE_HANDLER                   
                                         ),
     ACCORD_APPLY_REQ                (129, P2, writeTimeout, IMMEDIATE,         
 () -> ApplySerializers.request,             () -> 
AccordService.instance().verbHandler(), ACCORD_APPLY_RSP              ),
-    ACCORD_RECOVER_RSP              (132, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> RecoverySerializers.reply,            RESPONSE_HANDLER                   
                                         ),
-    ACCORD_RECOVER_REQ              (131, P2, writeTimeout, IMMEDIATE,         
 () -> RecoverySerializers.request,          () -> 
AccordService.instance().verbHandler(), ACCORD_RECOVER_RSP            ),
+    ACCORD_BEGIN_RECOVER_RSP        (132, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> RecoverySerializers.reply,            RESPONSE_HANDLER                   
                                         ),
+    ACCORD_BEGIN_RECOVER_REQ        (131, P2, writeTimeout, IMMEDIATE,         
 () -> RecoverySerializers.request,          () -> 
AccordService.instance().verbHandler(), ACCORD_BEGIN_RECOVER_RSP      ),
     ACCORD_BEGIN_INVALIDATE_RSP     (134, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> BeginInvalidationSerializers.reply,   RESPONSE_HANDLER                   
                                         ),
     ACCORD_BEGIN_INVALIDATE_REQ     (133, P2, writeTimeout, IMMEDIATE,         
 () -> BeginInvalidationSerializers.request, () -> 
AccordService.instance().verbHandler(), ACCORD_BEGIN_INVALIDATE_RSP   ),
-    ACCORD_WAIT_COMMIT_RSP          (136, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> WaitOnCommitSerializer.reply,         RESPONSE_HANDLER                   
                                         ),
-    ACCORD_WAIT_COMMIT_REQ          (135, P2, writeTimeout, IMMEDIATE,         
 () -> WaitOnCommitSerializer.request,       () -> 
AccordService.instance().verbHandler(), ACCORD_WAIT_COMMIT_RSP        ),
-    ACCORD_INFORM_OF_TXNID_REQ      (137, P2, writeTimeout, IMMEDIATE,         
 () -> InformOfTxnIdSerializers.request,     () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
+    ACCORD_WAIT_ON_COMMIT_RSP       (136, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> WaitOnCommitSerializer.reply,         RESPONSE_HANDLER                   
                                         ),
+    ACCORD_WAIT_ON_COMMIT_REQ       (135, P2, writeTimeout, IMMEDIATE,         
 () -> WaitOnCommitSerializer.request,       () -> 
AccordService.instance().verbHandler(), ACCORD_WAIT_ON_COMMIT_RSP     ),
+    ACCORD_INFORM_OF_TXN_REQ        (137, P2, writeTimeout, IMMEDIATE,         
 () -> InformOfTxnIdSerializers.request,     () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
     ACCORD_INFORM_HOME_DURABLE_REQ  (138, P2, writeTimeout, IMMEDIATE,         
 () -> InformHomeDurableSerializers.request, () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
     ACCORD_INFORM_DURABLE_REQ       (139, P2, writeTimeout, IMMEDIATE,         
 () -> InformDurableSerializers.request,     () -> 
AccordService.instance().verbHandler(), ACCORD_SIMPLE_RSP             ),
     ACCORD_CHECK_STATUS_RSP         (141, P2, writeTimeout, REQUEST_RESPONSE,  
 () -> CheckStatusSerializers.reply,         RESPONSE_HANDLER                   
                                         ),
diff --git a/src/java/org/apache/cassandra/service/accord/AccordJournal.java 
b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
index da50dd15da..87fb14e738 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordJournal.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordJournal.java
@@ -382,10 +382,10 @@ public class AccordJournal
      */
     public enum Type implements IVersionedSerializer<TxnRequest<?>>
     {
-        PREACCEPT_REQ (0, MessageType.PREACCEPT_REQ, 
PreacceptSerializers.request),
-        ACCEPT_REQ    (1, MessageType.ACCEPT_REQ,    AcceptSerializers.request 
  ),
-        COMMIT_REQ    (2, MessageType.COMMIT_REQ,    CommitSerializers.request 
  ),
-        APPLY_REQ     (3, MessageType.APPLY_REQ,     ApplySerializers.request  
  );
+        PREACCEPT_REQ (0, MessageType.PRE_ACCEPT_REQ, 
PreacceptSerializers.request),
+        ACCEPT_REQ    (1, MessageType.ACCEPT_REQ,     
AcceptSerializers.request   ),
+        COMMIT_REQ    (2, MessageType.COMMIT_REQ,     
CommitSerializers.request   ),
+        APPLY_REQ     (3, MessageType.APPLY_REQ,      ApplySerializers.request 
   );
 
         final int id;
         final MessageType msgType;
@@ -460,7 +460,7 @@ public class AccordJournal
 
         static boolean mustMakeDurable(TxnRequest<?> message)
         {
-            return msgTypeToTypeMap.containsKey(message.type());
+            return message.type().hasSideEffects;
         }
 
         @Override
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index ff83f70756..7bfcb82de5 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -55,24 +55,24 @@ public class AccordMessageSink implements MessageSink
 
         private VerbMapping()
         {
-            mapping.put(MessageType.PREACCEPT_REQ,          
Verb.ACCORD_PREACCEPT_REQ);
-            mapping.put(MessageType.PREACCEPT_RSP,          
Verb.ACCORD_PREACCEPT_RSP);
+            mapping.put(MessageType.PRE_ACCEPT_REQ,         
Verb.ACCORD_PRE_ACCEPT_REQ);
+            mapping.put(MessageType.PRE_ACCEPT_RSP,         
Verb.ACCORD_PRE_ACCEPT_RSP);
             mapping.put(MessageType.ACCEPT_REQ,             
Verb.ACCORD_ACCEPT_REQ);
             mapping.put(MessageType.ACCEPT_RSP,             
Verb.ACCORD_ACCEPT_RSP);
             mapping.put(MessageType.ACCEPT_INVALIDATE_REQ,  
Verb.ACCORD_ACCEPT_INVALIDATE_REQ);
             mapping.put(MessageType.COMMIT_REQ,             
Verb.ACCORD_COMMIT_REQ);
-            mapping.put(MessageType.COMMIT_INVALIDATE,      
Verb.ACCORD_COMMIT_INVALIDATE_REQ);
+            mapping.put(MessageType.COMMIT_INVALIDATE_REQ,  
Verb.ACCORD_COMMIT_INVALIDATE_REQ);
             mapping.put(MessageType.APPLY_REQ,              
Verb.ACCORD_APPLY_REQ);
             mapping.put(MessageType.APPLY_RSP,              
Verb.ACCORD_APPLY_RSP);
             mapping.put(MessageType.READ_REQ,               
Verb.ACCORD_READ_REQ);
             mapping.put(MessageType.READ_RSP,               
Verb.ACCORD_READ_RSP);
-            mapping.put(MessageType.BEGIN_RECOVER_REQ,      
Verb.ACCORD_RECOVER_REQ);
-            mapping.put(MessageType.BEGIN_RECOVER_RSP,      
Verb.ACCORD_RECOVER_RSP);
+            mapping.put(MessageType.BEGIN_RECOVER_REQ,      
Verb.ACCORD_BEGIN_RECOVER_REQ);
+            mapping.put(MessageType.BEGIN_RECOVER_RSP,      
Verb.ACCORD_BEGIN_RECOVER_RSP);
             mapping.put(MessageType.BEGIN_INVALIDATE_REQ,   
Verb.ACCORD_BEGIN_INVALIDATE_REQ);
             mapping.put(MessageType.BEGIN_INVALIDATE_RSP,   
Verb.ACCORD_BEGIN_INVALIDATE_RSP);
-            mapping.put(MessageType.WAIT_ON_COMMIT_REQ,     
Verb.ACCORD_WAIT_COMMIT_REQ);
-            mapping.put(MessageType.WAIT_ON_COMMIT_RSP,     
Verb.ACCORD_WAIT_COMMIT_RSP);
-            mapping.put(MessageType.INFORM_TXNID_REQ,       
Verb.ACCORD_INFORM_OF_TXNID_REQ);
+            mapping.put(MessageType.WAIT_ON_COMMIT_REQ,     
Verb.ACCORD_WAIT_ON_COMMIT_REQ);
+            mapping.put(MessageType.WAIT_ON_COMMIT_RSP,     
Verb.ACCORD_WAIT_ON_COMMIT_RSP);
+            mapping.put(MessageType.INFORM_OF_TXN_REQ,      
Verb.ACCORD_INFORM_OF_TXN_REQ);
             
mapping.put(MessageType.INFORM_HOME_DURABLE_REQ,Verb.ACCORD_INFORM_HOME_DURABLE_REQ);
             mapping.put(MessageType.INFORM_DURABLE_REQ,     
Verb.ACCORD_INFORM_DURABLE_REQ);
             mapping.put(MessageType.CHECK_STATUS_REQ,       
Verb.ACCORD_CHECK_STATUS_REQ);
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
index a1657bb38f..145bb64278 100644
--- a/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
+++ b/test/unit/org/apache/cassandra/service/accord/AccordMessageSinkTest.java
@@ -45,7 +45,7 @@ public class AccordMessageSinkTest
         // There was an issue where the reply was the wrong verb
         // see CASSANDRA-18375
         InformOfTxnId info = Mockito.mock(InformOfTxnId.class);
-        Message<InformOfTxnId> req = 
Message.builder(Verb.ACCORD_INFORM_OF_TXNID_REQ, info).build();
+        Message<InformOfTxnId> req = 
Message.builder(Verb.ACCORD_INFORM_OF_TXN_REQ, info).build();
         SimpleReply reply = SimpleReply.Ok;
 
         Messaging messaging = Mockito.mock(Messaging.class);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to