This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new 612d7b1b5d CEP-15 (C*) increase message timeouts for range barrier 
messages
612d7b1b5d is described below

commit 612d7b1b5da5bff4cb26fe5749a8d44b416611a4
Author: Blake Eggleston <bl...@ultrablake.com>
AuthorDate: Mon Aug 26 15:56:22 2024 -0700

    CEP-15 (C*) increase message timeouts for range barrier messages
    
    Patch by Blake Eggleston; Reviewed by Ariel Weisberg for CASSANDRA-19926
---
 .../service/accord/AccordMessageSink.java          | 31 +++++++++++++++++-----
 1 file changed, 25 insertions(+), 6 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java 
b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
index bf6c5ae06f..eda0f75b87 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
@@ -26,10 +26,13 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import accord.messages.*;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.utils.Clock;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,11 +40,6 @@ import accord.api.Agent;
 import accord.api.MessageSink;
 import accord.local.AgentExecutor;
 import accord.local.Node;
-import accord.messages.Callback;
-import accord.messages.MessageType;
-import accord.messages.Reply;
-import accord.messages.ReplyContext;
-import accord.messages.Request;
 import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.Message;
@@ -227,12 +225,33 @@ public class AccordMessageSink implements MessageSink
         messaging.send(message, endpoint);
     }
 
+    private static boolean isRangeBarrier(Request request)
+    {
+        if (!(request instanceof TxnRequest))
+            return false;
+
+        TxnRequest<?> txnRequest = (TxnRequest<?>) request;
+        if (!txnRequest.txnId.kind().isSyncPoint())
+            return false;
+
+        return txnRequest.txnId.domain().isRange();
+    }
+
     @Override
     public void send(Node.Id to, Request request, AgentExecutor executor, 
Callback callback)
     {
         Verb verb = getVerb(request);
         Preconditions.checkNotNull(verb, "Verb is null for type %s", 
request.type());
-        Message<Request> message = Message.out(verb, request);
+        Message<Request> message;
+        if (isRangeBarrier(request))
+        {
+            long nowNanos = Clock.Global.nanoTime();
+            message = Message.out(verb, request, nowNanos + 
DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos());
+        }
+        else
+        {
+            message = Message.out(verb, request);
+        }
         InetAddressAndPort endpoint = endpointMapper.mappedEndpoint(to);
         logger.trace("Sending {} {} to {}", verb, message.payload, endpoint);
         messaging.sendWithCallback(message, endpoint, new 
AccordCallback<>(executor, (Callback<Reply>) callback, endpointMapper));


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to