This is an automated email from the ASF dual-hosted git repository. bdeggleston pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new 612d7b1b5d CEP-15 (C*) increase message timeouts for range barrier messages 612d7b1b5d is described below commit 612d7b1b5da5bff4cb26fe5749a8d44b416611a4 Author: Blake Eggleston <bl...@ultrablake.com> AuthorDate: Mon Aug 26 15:56:22 2024 -0700 CEP-15 (C*) increase message timeouts for range barrier messages Patch by Blake Eggleston; Reviewed by Ariel Weisberg for CASSANDRA-19926 --- .../service/accord/AccordMessageSink.java | 31 +++++++++++++++++----- 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java index bf6c5ae06f..eda0f75b87 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java +++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java @@ -26,10 +26,13 @@ import java.util.List; import java.util.Map; import java.util.Set; +import accord.messages.*; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.utils.Clock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,11 +40,6 @@ import accord.api.Agent; import accord.api.MessageSink; import accord.local.AgentExecutor; import accord.local.Node; -import accord.messages.Callback; -import accord.messages.MessageType; -import accord.messages.Reply; -import accord.messages.ReplyContext; -import accord.messages.Request; import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.net.Message; @@ -227,12 +225,33 @@ public class AccordMessageSink implements MessageSink messaging.send(message, endpoint); } + private static boolean isRangeBarrier(Request request) + { + if (!(request instanceof TxnRequest)) + return false; + + TxnRequest<?> txnRequest = (TxnRequest<?>) request; + if (!txnRequest.txnId.kind().isSyncPoint()) + return false; + + return txnRequest.txnId.domain().isRange(); + } + @Override public void send(Node.Id to, Request request, AgentExecutor executor, Callback callback) { Verb verb = getVerb(request); Preconditions.checkNotNull(verb, "Verb is null for type %s", request.type()); - Message<Request> message = Message.out(verb, request); + Message<Request> message; + if (isRangeBarrier(request)) + { + long nowNanos = Clock.Global.nanoTime(); + message = Message.out(verb, request, nowNanos + DatabaseDescriptor.getAccordRangeBarrierTimeoutNanos()); + } + else + { + message = Message.out(verb, request); + } InetAddressAndPort endpoint = endpointMapper.mappedEndpoint(to); logger.trace("Sending {} {} to {}", verb, message.payload, endpoint); messaging.sendWithCallback(message, endpoint, new AccordCallback<>(executor, (Callback<Reply>) callback, endpointMapper)); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org