This is an automated email from the ASF dual-hosted git repository.

bdeggleston pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 96d850aa9d90787aa8fc6668d9a826beb5d94e2d
Author: Abe Ratnofsky <a...@aber.io>
AuthorDate: Tue Mar 25 12:41:57 2025 -0400

    CEP-45: Query forwarding
    
    Patch by Abe Ratnofsky; Reviewed by Blake Eggleston for CASSANDRA-20309
---
 CHANGES.txt                                        |   1 +
 .../apache/cassandra/db/MutationVerbHandler.java   |  20 +-
 src/java/org/apache/cassandra/net/Message.java     |   7 +
 .../org/apache/cassandra/net/MessagingService.java |  10 +
 src/java/org/apache/cassandra/net/ParamType.java   |   5 +-
 .../org/apache/cassandra/net/RequestCallbacks.java |   6 +-
 src/java/org/apache/cassandra/net/Verb.java        |   2 +
 .../cassandra/replication/CoordinatorLog.java      |   3 +
 .../cassandra/replication/CoordinatorLogId.java    |  19 +
 .../cassandra/replication/ForwardedWrite.java      | 479 +++++++++++++++++++++
 .../cassandra/replication/LocalMutationStates.java |   1 +
 .../apache/cassandra/replication/MutationId.java   |   2 +-
 .../cassandra/replication/MutationSummary.java     |   1 +
 .../replication/MutationTrackingService.java       |   6 +
 .../org/apache/cassandra/replication/Shard.java    |   4 +-
 .../cassandra/replication/TrackedWriteRequest.java |  86 ++--
 .../service/TrackedWriteResponseHandler.java       |   3 +-
 .../apache/cassandra/tcm/membership/NodeId.java    |  22 +
 .../MutationTrackingWriteForwardingTest.java       | 117 +++++
 19 files changed, 756 insertions(+), 38 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index fb9f8fbeed..0bc461129a 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 cep-45-mutation-tracking
+ * CEP-45: Query forwarding (CASSANDRA-20309)
  * Fix mutation tracking startup (CASSANDRA-20540)
  * Mutation tracking journal integration, read, and write path 
(CASSANDRA-20304, CASSANDRA-20305, CASSANDRA-20308)
  * Introduce MutationJournal for coordinator logs (CASSANDRA-20353)
diff --git a/src/java/org/apache/cassandra/db/MutationVerbHandler.java 
b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
index c30fae63b4..96e3fa7e7e 100644
--- a/src/java/org/apache/cassandra/db/MutationVerbHandler.java
+++ b/src/java/org/apache/cassandra/db/MutationVerbHandler.java
@@ -17,9 +17,13 @@
  */
 package org.apache.cassandra.db;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.cassandra.exceptions.WriteTimeoutException;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.*;
+import org.apache.cassandra.replication.ForwardedWrite;
 import org.apache.cassandra.tracing.Tracing;
 
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
@@ -28,12 +32,24 @@ import static 
org.apache.cassandra.utils.MonotonicClock.Global.approxTime;
 
 public class MutationVerbHandler extends AbstractMutationVerbHandler<Mutation>
 {
+    private static final Logger logger = 
LoggerFactory.getLogger(MutationVerbHandler.class);
+
     public static final MutationVerbHandler instance = new 
MutationVerbHandler();
 
-    private void respond(Message<?> respondTo, InetAddressAndPort 
respondToAddress)
+    private void respond(Message<?> incoming, InetAddressAndPort 
respondToAddress)
     {
+        // Local tracked writes respond in TrackedWriteResponseHandler
+        Message<NoPayload> response = incoming.emptyResponse();
         Tracing.trace("Enqueuing response to {}", respondToAddress);
-        MessagingService.instance().send(respondTo.emptyResponse(), 
respondToAddress);
+        logger.trace("Enqueuing response to {}", respondToAddress);
+        MessagingService.instance().send(response, respondToAddress);
+
+        ForwardedWrite.CoordinatorAckInfo ackTo = 
(ForwardedWrite.CoordinatorAckInfo) 
incoming.header.params().get(ParamType.COORDINATOR_ACK_INFO);
+        if (ackTo != null)
+        {
+            logger.trace("Enqueuing response for direct acknowledgement of 
forwarded tracked mutation to coordinator {}", ackTo.coordinator);
+            MessagingService.instance().send(response, ackTo.coordinator);
+        }
     }
 
     private void failed()
diff --git a/src/java/org/apache/cassandra/net/Message.java 
b/src/java/org/apache/cassandra/net/Message.java
index b6a5a849e5..fe041b816f 100644
--- a/src/java/org/apache/cassandra/net/Message.java
+++ b/src/java/org/apache/cassandra/net/Message.java
@@ -731,6 +731,13 @@ public class Message<T>
             return this;
         }
 
+        public Builder<T> withRequestTime(Dispatcher.RequestTime requestTime)
+        {
+            this.createdAtNanos = requestTime.startedAtNanos();
+            this.expiresAtNanos = 
requestTime.computeDeadline(verb.expiresAfterNanos());
+            return this;
+        }
+
         public Message<T> build()
         {
             if (verb == null)
diff --git a/src/java/org/apache/cassandra/net/MessagingService.java 
b/src/java/org/apache/cassandra/net/MessagingService.java
index 97a019642a..7ddada9e0a 100644
--- a/src/java/org/apache/cassandra/net/MessagingService.java
+++ b/src/java/org/apache/cassandra/net/MessagingService.java
@@ -30,6 +30,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +44,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.metrics.MessagingMetrics;
+import org.apache.cassandra.replication.ForwardedWrite;
 import org.apache.cassandra.service.AbstractWriteResponseHandler;
 import org.apache.cassandra.utils.ExecutorUtils;
 import org.apache.cassandra.utils.FBUtilities;
@@ -449,6 +451,14 @@ public class MessagingService extends 
MessagingServiceMBeanImpl implements Messa
         send(message, to.endpoint(), null);
     }
 
+    public void sendForwardedWriteWithCallback(Message message, Replica to, 
ForwardedWrite.LeaderCallback handler)
+    {
+        Preconditions.checkArgument(message.verb() == Verb.MUTATION_REQ);
+        Preconditions.checkArgument(message.callBackOnFailure());
+        callbacks.addWithExpiration(handler, message, to.endpoint());
+        send(message, to.endpoint(), null);
+    }
+
     /**
      * Send a message to a given endpoint. This method adheres to the fire and 
forget
      * style messaging.
diff --git a/src/java/org/apache/cassandra/net/ParamType.java 
b/src/java/org/apache/cassandra/net/ParamType.java
index 2367b1a390..e01a3ac869 100644
--- a/src/java/org/apache/cassandra/net/ParamType.java
+++ b/src/java/org/apache/cassandra/net/ParamType.java
@@ -20,6 +20,7 @@ package org.apache.cassandra.net;
 import javax.annotation.Nullable;
 
 import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.replication.ForwardedWrite;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.utils.Int32Serializer;
 import org.apache.cassandra.utils.Int64Serializer;
@@ -54,7 +55,9 @@ public enum ParamType
     ROW_INDEX_READ_SIZE_WARN         (13, Int64Serializer.serializer),
     CUSTOM_MAP                       (14, CustomParamsSerializer.serializer),
     TOO_MANY_REFERENCED_INDEXES_WARN (16, Int32Serializer.serializer),
-    TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer);
+    TOO_MANY_REFERENCED_INDEXES_FAIL (17, Int32Serializer.serializer),
+    // Different from RESPOND_TO because it's an additional recipient of the 
acknowledgement
+    COORDINATOR_ACK_INFO             (18, 
ForwardedWrite.CoordinatorAckInfo.serializer);
 
     final int id;
     final IVersionedSerializer serializer;
diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java 
b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index ee63c5a3e6..4a46b73b5a 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -25,6 +25,7 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.slf4j.Logger;
@@ -35,6 +36,7 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.metrics.InternodeOutboundMetrics;
+import org.apache.cassandra.replication.ForwardedWrite;
 import org.apache.cassandra.service.AbstractWriteResponseHandler;
 
 import static java.lang.String.format;
@@ -96,14 +98,14 @@ public class RequestCallbacks implements 
OutboundMessageCallbacks
     public void addWithExpiration(RequestCallback<?> cb, Message<?> message, 
InetAddressAndPort to)
     {
         // mutations need to call the overload
-        assert message.verb() != Verb.MUTATION_REQ && message.verb() != 
Verb.COUNTER_MUTATION_REQ;
+        Preconditions.checkArgument((message.verb() != Verb.MUTATION_REQ && 
message.verb() != Verb.COUNTER_MUTATION_REQ) || (cb instanceof 
ForwardedWrite.LeaderCallback));
         CallbackInfo previous = callbacks.put(key(message.id(), to), new 
CallbackInfo(message, to, cb));
         assert previous == null : format("Callback already exists for id 
%d/%s! (%s)", message.id(), to, previous);
     }
 
     public void addWithExpiration(AbstractWriteResponseHandler<?> cb, 
Message<?> message, Replica to)
     {
-        assert message.verb() == Verb.MUTATION_REQ || message.verb() == 
Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ;
+        Preconditions.checkArgument(message.verb() == Verb.MUTATION_REQ || 
message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() == 
Verb.PAXOS_COMMIT_REQ || message.verb() == Verb.FORWARDING_WRITE);
         CallbackInfo previous = callbacks.put(key(message.id(), 
to.endpoint()), new CallbackInfo(message, to.endpoint(), cb));
         assert previous == null : format("Callback already exists for id 
%d/%s! (%s)", message.id(), to.endpoint(), previous);
     }
diff --git a/src/java/org/apache/cassandra/net/Verb.java 
b/src/java/org/apache/cassandra/net/Verb.java
index 7dcfc5b473..032270b456 100644
--- a/src/java/org/apache/cassandra/net/Verb.java
+++ b/src/java/org/apache/cassandra/net/Verb.java
@@ -72,6 +72,7 @@ import org.apache.cassandra.repair.messages.SyncResponse;
 import org.apache.cassandra.repair.messages.SyncRequest;
 import org.apache.cassandra.repair.messages.ValidationResponse;
 import org.apache.cassandra.repair.messages.ValidationRequest;
+import org.apache.cassandra.replication.ForwardedWrite;
 import org.apache.cassandra.schema.SchemaMutationsSerializer;
 import org.apache.cassandra.schema.SchemaPullVerbHandler;
 import org.apache.cassandra.schema.SchemaPushVerbHandler;
@@ -247,6 +248,7 @@ public enum Verb
     READ_RECONCILE_SEND        (901, P0, rpcTimeout,   READ,             () -> 
ReadReconcileSend.serializer,          () -> ReadReconcileSend.verbHandler      
                          ),
     READ_RECONCILE_RCV         (902, P0, rpcTimeout,   MUTATION,         () -> 
ReadReconcileReceive.serializer,       () -> ReadReconcileReceive.verbHandler   
                          ),
     READ_RECONCILE_NOTIFY      (903, P0, rpcTimeout,   REQUEST_RESPONSE, () -> 
ReadReconcileNotify.serializer,        () -> ReadReconcileNotify.verbHandler    
                          ),
+    FORWARDING_WRITE           (904, P3, writeTimeout, MUTATION,         () -> 
ForwardedWrite.Request.serializer,     () -> ForwardedWrite.verbHandler),
 
     TRACKED_PARTITION_READ_RSP (906, P2, readTimeout,  REQUEST_RESPONSE, () -> 
TrackedDataResponse.serializer,        () -> ResponseVerbHandler.instance       
                          ),
     TRACKED_PARTITION_READ_REQ (907, P3, readTimeout,  READ,             () -> 
TrackedRead.DataRequest.serializer,    () -> TrackedRead.verbHandler,           
TRACKED_PARTITION_READ_RSP),
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLog.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
index 0abd0db93e..74712fd968 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLog.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLog.java
@@ -21,6 +21,8 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import com.google.common.base.Preconditions;
+
 import org.apache.cassandra.db.Mutation;
 import org.apache.cassandra.db.PartitionPosition;
 import org.apache.cassandra.dht.AbstractBounds;
@@ -72,6 +74,7 @@ public abstract class CoordinatorLog
 
     void witnessedRemoteMutation(MutationId mutationId, int onHostId)
     {
+        Preconditions.checkArgument(!mutationId.isNone());
         logger.trace("witnessed remote mutation {} from {}", mutationId, 
onHostId);
         lock.writeLock().lock();
         try
diff --git a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java 
b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
index 55c38485b0..37d4f25851 100644
--- a/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
+++ b/src/java/org/apache/cassandra/replication/CoordinatorLogId.java
@@ -28,6 +28,8 @@ import java.util.Comparator;
 
 public class CoordinatorLogId implements Serializable
 {
+    private static final CoordinatorLogId NONE = new 
CoordinatorLogId(Integer.MIN_VALUE, Integer.MIN_VALUE);
+
     /** TCM host ID */
     protected final int hostId;
 
@@ -79,6 +81,21 @@ public class CoordinatorLogId implements Serializable
         return (int) coordinatorLogId;
     }
 
+    public static CoordinatorLogId none()
+    {
+        return NONE;
+    }
+
+    static boolean isNone(int hostId, int hostLogId)
+    {
+        return hostId == NONE.hostId && hostLogId == NONE.hostLogId;
+    }
+
+    public boolean isNone()
+    {
+        return this == NONE || isNone(hostId, hostLogId);
+    }
+
     @Override
     public String toString()
     {
@@ -115,6 +132,8 @@ public class CoordinatorLogId implements Serializable
         {
             int hostId = in.readInt();
             int hostLogId = in.readInt();
+            if (isNone(hostId, hostLogId))
+                return none();
             return new CoordinatorLogId(hostId, hostLogId);
         }
 
diff --git a/src/java/org/apache/cassandra/replication/ForwardedWrite.java 
b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
new file mode 100644
index 0000000000..51a072a423
--- /dev/null
+++ b/src/java/org/apache/cassandra/replication/ForwardedWrite.java
@@ -0,0 +1,479 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
+import org.apache.cassandra.db.*;
+import org.apache.cassandra.locator.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.exceptions.RequestFailureReason;
+import org.apache.cassandra.io.IVersionedSerializer;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.net.IVerbHandler;
+import org.apache.cassandra.net.Message;
+import org.apache.cassandra.net.MessageFlag;
+import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.net.RequestCallback;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.transport.Dispatcher;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static org.apache.cassandra.net.Verb.MUTATION_REQ;
+
+/**
+ * For a forwarded write there are 2 nodes involved in coordination, a 
coordinator and a leader. The coordinator is the
+ * node that the client is communicating with, and the leader is the mutation 
replica that is handling the mutation
+ * tracking for that write.
+ */
+public class ForwardedWrite
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(ForwardedWrite.class);
+
+    public interface Request
+    {
+        enum Kind
+        {
+            MUTATION(0);
+
+            private final byte id;
+
+            Kind(int id)
+            {
+                this.id = (byte) id;
+            }
+
+            IVersionedSerializer<Request> serializer()
+            {
+                switch (this)
+                {
+                    case MUTATION:
+                        return MutationRequest.serializer;
+                    default:
+                        throw new IllegalStateException("Unhandled kind " + 
this);
+                }
+            }
+
+            static final IVersionedSerializer<Kind> serializer = new 
IVersionedSerializer<Request.Kind>()
+            {
+                @Override
+                public void serialize(Kind kind, DataOutputPlus out, int 
version) throws IOException
+                {
+                    out.writeByte(kind.id);
+
+                }
+
+                @Override
+                public Kind deserialize(DataInputPlus in, int version) throws 
IOException
+                {
+                    byte id = in.readByte();
+                    switch (id)
+                    {
+                        case 0:
+                            return MUTATION;
+                        default:
+                            throw new IllegalStateException("Unknown kind: " + 
id);
+                    }
+                }
+
+                @Override
+                public long serializedSize(Kind kind, int version)
+                {
+                    return TypeSizes.BYTE_SIZE;
+                }
+            };
+        }
+
+        Kind kind();
+        DecoratedKey key();
+        void applyLocallyAndForwardToReplicas(CoordinatorAckInfo ackTo);
+
+        IVersionedSerializer<Request> serializer = new IVersionedSerializer<>()
+        {
+            @Override
+            public void serialize(Request request, DataOutputPlus out, int 
version) throws IOException
+            {
+                Kind.serializer.serialize(request.kind(), out, version);
+                request.kind().serializer().serialize(request, out, version);
+            }
+
+            @Override
+            public Request deserialize(DataInputPlus in, int version) throws 
IOException
+            {
+                Kind kind = Kind.serializer.deserialize(in, version);
+                return kind.serializer().deserialize(in, version);
+            }
+
+            @Override
+            public long serializedSize(Request request, int version)
+            {
+                long size = Kind.serializer.serializedSize(request.kind(), 
version);
+                size += request.kind().serializer().serializedSize(request, 
version);
+                return size;
+            }
+        };
+    }
+
+    public static class MutationRequest implements Request
+    {
+        private final Mutation mutation;
+        private final Set<NodeId> recipients;
+
+        private static Set<NodeId> nodeIds(ReplicaPlan.ForWrite plan)
+        {
+            ClusterMetadata cm = ClusterMetadata.current();
+            Set<NodeId> recipients = new HashSet<>(plan.liveAndDown().size());
+            for (Replica replica : plan.liveAndDown())
+                recipients.add(cm.directory.peerId(replica.endpoint()));
+            return recipients;
+        }
+
+        MutationRequest(Mutation mutation, ReplicaPlan.ForWrite plan)
+        {
+            this(mutation, nodeIds(plan));
+        }
+
+        public MutationRequest(Mutation mutation, Set<NodeId> recipients)
+        {
+            Preconditions.checkArgument(mutation.id().isNone());
+            this.mutation = mutation;
+            this.recipients = recipients;
+        }
+
+        @Override
+        public Kind kind()
+        {
+            return Kind.MUTATION;
+        }
+
+        @Override
+        public DecoratedKey key()
+        {
+            return mutation.key();
+        }
+
+        @Override
+        public void applyLocallyAndForwardToReplicas(CoordinatorAckInfo ackTo)
+        {
+            Preconditions.checkState(ackTo != null);
+            Preconditions.checkArgument(mutation.id().isNone());
+            String keyspaceName = mutation.getKeyspaceName();
+            Token token = mutation.key().getToken();
+
+            MutationId id = 
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
+            // Do not wait for handler completion, since the coordinator is 
already waiting and we don't want to block the stage
+            LeaderCallback handler = new LeaderCallback(keyspaceName, 
mutation.key().getToken(), id, ackTo);
+            applyLocallyAndForwardToReplicas(mutation.withMutationId(id), 
recipients, handler, ackTo);
+        }
+
+        // TODO: refactor common with applyLocallyAndSendToReplicas
+        private static void applyLocallyAndForwardToReplicas(Mutation 
mutation, Set<NodeId> recipients, LeaderCallback handler, CoordinatorAckInfo 
ackTo)
+        {
+            Preconditions.checkState(ackTo != null);
+            ClusterMetadata cm = ClusterMetadata.current();
+            String localDataCenter = cm.locator.local().datacenter;
+
+            boolean applyLocally = false;
+
+            // this DC replicas
+            List<Replica> localDCReplicas = null;
+
+            // extra-DC, grouped by DC
+            Map<String, List<Replica>> remoteDCReplicas = null;
+
+            // only need to create a Message for non-local writes
+            Message<Mutation> message = null;
+
+            // Expensive, but easier to work with Replica than 
InetAddressAndPort for now
+            Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+            EndpointsForToken endpoints = 
cm.placements.get(keyspace.getMetadata().params.replication).writes.forToken(mutation.key().getToken()).get();
+            Map<NodeId, Replica> replicas = new HashMap<>(recipients.size());
+            for (Replica replica : endpoints)
+                replicas.put(cm.directory.peerId(replica.endpoint()), replica);
+
+            // For performance, Mutation caches serialized buffers that are 
computed lazily in serializedBuffer(). That
+            // computation is not synchronized however, and we will 
potentially call that method concurrently for each
+            // dispatched message (not that concurrent calls to 
serializedBuffer() are "unsafe" per se, just that they
+            // may result in multiple computations, making the caching 
optimization moot). So forcing the serialization
+            // here to make sure it's already cached/computed when it's 
concurrently used later.
+            // Side note: we have one cached buffers for each used 
EncodingVersion and this only pre-compute the one for
+            // the current version, but it's just an optimization, and we're 
ok not optimizing for mixed-version clusters.
+            Mutation.serializer.prepareSerializedBuffer(mutation, 
MessagingService.current_version);
+
+            for (NodeId recipient : recipients)
+            {
+                if (cm.myNodeId().equals(recipient))
+                {
+                    applyLocally = true;
+                    continue;
+                }
+
+                if (message == null)
+                    message = Message.builder(MUTATION_REQ, mutation)
+                                     .withRequestTime(handler.getRequestTime())
+                                     
.withFlag(MessageFlag.CALL_BACK_ON_FAILURE)
+                                     
.withParam(ParamType.COORDINATOR_ACK_INFO, ackTo)
+                                     .withId(ackTo.id)
+                                     .build();
+
+                Replica replica = replicas.get(recipient);
+                String dc = cm.locator.location(replica.endpoint()).datacenter;
+
+                if (localDataCenter.equals(dc))
+                {
+                    if (localDCReplicas == null)
+                        localDCReplicas = new ArrayList<>();
+                    localDCReplicas.add(replica);
+                }
+                else
+                {
+                    if (remoteDCReplicas == null)
+                        remoteDCReplicas = new HashMap<>();
+
+                    List<Replica> messages = remoteDCReplicas.get(dc);
+                    if (messages == null)
+                        messages = remoteDCReplicas.computeIfAbsent(dc, ignore 
-> new ArrayList<>(3)); // most DCs will have <= 3 replicas
+                    messages.add(replica);
+                }
+            }
+
+            Preconditions.checkState(applyLocally); // the leader is always a 
replica
+            TrackedWriteRequest.applyMutationLocally(mutation, handler);
+
+            if (localDCReplicas != null)
+                for (Replica replica : localDCReplicas)
+                    MessagingService.instance().sendWithCallback(message, 
replica.endpoint(), handler);
+
+            if (remoteDCReplicas != null)
+            {
+                // for each datacenter, send the message to one node to relay 
the write to other replicas
+                for (List<Replica> dcReplicas : remoteDCReplicas.values())
+                    TrackedWriteRequest.sendMessagesToRemoteDC(message, 
EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler, 
ackTo);
+            }
+        }
+
+        public static final IVersionedSerializer<Request> serializer = new 
IVersionedSerializer<>()
+        {
+            @Override
+            public void serialize(Request r, DataOutputPlus out, int version) 
throws IOException
+            {
+                MutationRequest request = (MutationRequest) r;
+                Mutation.serializer.serialize(request.mutation, out, version);
+                out.writeInt(request.recipients.size());
+                for (NodeId recipient : request.recipients)
+                    NodeId.messagingSerializer.serialize(recipient, out, 
version);
+            }
+
+            @Override
+            public Request deserialize(DataInputPlus in, int version) throws 
IOException
+            {
+                Mutation mutation = Mutation.serializer.deserialize(in, 
version);
+                int numRecipients = in.readInt();
+                Set<NodeId> recipients = 
Sets.newHashSetWithExpectedSize(numRecipients);
+                for (int i = 0; i < numRecipients; i++)
+                    recipients.add(NodeId.messagingSerializer.deserialize(in, 
version));
+                return new MutationRequest(mutation, recipients);
+            }
+
+            @Override
+            public long serializedSize(Request r, int version)
+            {
+                MutationRequest request = (MutationRequest) r;
+                long size = 
Mutation.serializer.serializedSize(request.mutation, version);
+                size += TypeSizes.INT_SIZE;
+                for (NodeId recipient : request.recipients)
+                    size += 
NodeId.messagingSerializer.serializedSize(recipient, version);
+                return size;
+            }
+        };
+    }
+
+    public static AbstractWriteResponseHandler<Object> 
forwardMutation(Mutation mutation, ReplicaPlan.ForWrite plan, 
AbstractReplicationStrategy strategy, Dispatcher.RequestTime requestTime)
+    {
+        // find leader
+        NodeProximity proximity = DatabaseDescriptor.getNodeProximity();
+        ClusterMetadata cm = ClusterMetadata.current();
+        Token token = mutation.key().getToken();
+        Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
+        EndpointsForRange endpoints = 
cm.placements.get(keyspace.getMetadata().params.replication).writes.forRange(token).get();
+        if (logger.isTraceEnabled())
+            logger.trace("Finding best leader from replicas {}", endpoints);
+
+        // TODO: Should match ReplicaPlans.findCounterLeaderReplica, including 
DC-local priority, current health, severity, etc.
+        Replica leader = 
proximity.sortedByProximity(FBUtilities.getBroadcastAddressAndPort(), 
endpoints).get(0);
+
+        // create callback and forward to leader
+        if (logger.isTraceEnabled())
+            logger.trace("Selected {} as leader for mutation with key {}", 
leader.endpoint(), mutation.key());
+
+        AbstractWriteResponseHandler<Object> handler = 
strategy.getWriteResponseHandler(plan, null, WriteType.SIMPLE, null, 
requestTime);
+
+        // Add callbacks for replicas to respond directly to coordinator
+        Message<Request> toLeader = Message.out(Verb.FORWARDING_WRITE, new 
MutationRequest(mutation, plan));
+        for (Replica endpoint : endpoints)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Adding forwarding callback for response from {} 
id {}", endpoint, toLeader.id());
+            MessagingService.instance().callbacks.addWithExpiration(handler, 
toLeader, endpoint);
+        }
+
+        MessagingService.instance().send(toLeader, leader.endpoint());
+
+        return handler;
+    }
+
+    public static final IVerbHandler<Request> verbHandler = new 
IVerbHandler<>()
+    {
+        @Override
+        public void doVerb(Message<Request> incoming)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Received incoming ForwardedWriteRequest {} id 
{}", incoming, incoming.id());
+            CoordinatorAckInfo ackTo = 
CoordinatorAckInfo.toCoordinator(incoming.from(), incoming.id());
+            Request request = incoming.payload;
+
+            // Once we support epoch changes, check epoch from coordinator 
here, after potential queueing on the Stage
+            try
+            {
+                request.applyLocallyAndForwardToReplicas(ackTo);
+            }
+            catch (Exception e)
+            {
+                logger.error("Exception while executing forwarded write with 
key {} on leader", request.key(), e);
+                
MessagingService.instance().respondWithFailure(RequestFailureReason.UNKNOWN, 
incoming);
+            }
+        }
+    };
+
+    // Leader just needs to acknowledge propagation for its own log, not for 
client consistency level
+    // See 
org.apache.cassandra.service.TrackedWriteResponseHandler.onResponse, this class 
should probably merge with that one
+    public static class LeaderCallback implements RequestCallback<NoPayload>
+    {
+        private final String keyspace;
+        private final Token token;
+        private final MutationId id;
+        private final CoordinatorAckInfo ackTo;
+        private final Dispatcher.RequestTime requestTime = 
Dispatcher.RequestTime.forImmediateExecution();
+
+        public LeaderCallback(String keyspace, Token token, MutationId id, 
CoordinatorAckInfo ackTo)
+        {
+            this.keyspace = keyspace;
+            this.token = token;
+            this.id = id;
+            this.ackTo = ackTo;
+        }
+
+        @Override
+        public void onResponse(Message<NoPayload> msg)
+        {
+            // Local mutations are witnessed from Keyspace.applyInternalTracked
+            if (msg != null)
+                
MutationTrackingService.instance.witnessedRemoteMutation(keyspace, token, id, 
msg.from());
+
+            // Local write needs to be ack'd to coordinator
+            if (msg == null && ackTo != null)
+            {
+                Message<NoPayload> message = 
Message.builder(Verb.MUTATION_RSP, NoPayload.noPayload)
+                                                    
.from(FBUtilities.getBroadcastAddressAndPort())
+                                                    .withId(ackTo.id)
+                                                    .build();
+                MessagingService.instance().send(message, ackTo.coordinator);
+            }
+        }
+
+        @Override
+        public void onFailure(InetAddressAndPort from, RequestFailureReason 
failureReason)
+        {
+            logger.error("Got failure from {} reason {}", from, failureReason);
+        }
+
+        @Override
+        public boolean invokeOnFailure()
+        {
+            return true;
+        }
+
+        public Dispatcher.RequestTime getRequestTime()
+        {
+            return requestTime;
+        }
+    }
+
+    public static class CoordinatorAckInfo
+    {
+        public static IVersionedSerializer<CoordinatorAckInfo> serializer = 
new IVersionedSerializer<>()
+        {
+            @Override
+            public void serialize(CoordinatorAckInfo ackTo, DataOutputPlus 
out, int version) throws IOException
+            {
+                
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(ackTo.coordinator,
 out, version);
+                out.writeLong(ackTo.id);
+            }
+
+            @Override
+            public CoordinatorAckInfo deserialize(DataInputPlus in, int 
version) throws IOException
+            {
+                InetAddressAndPort coordinator = 
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.deserialize(in, 
version);
+                long id = in.readLong();
+                return new CoordinatorAckInfo(coordinator, id);
+            }
+
+            @Override
+            public long serializedSize(CoordinatorAckInfo ackTo, int version)
+            {
+                long size = 0;
+                size += 
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(ackTo.coordinator,
 version);
+                size += TypeSizes.LONG_SIZE;
+                return size;
+            }
+        };
+
+        public final InetAddressAndPort coordinator;
+        public final long id;
+
+        private CoordinatorAckInfo(InetAddressAndPort coordinator, long id)
+        {
+            this.coordinator = coordinator;
+            this.id = id;
+        }
+
+        private static CoordinatorAckInfo toCoordinator(InetAddressAndPort 
coordinator, long messageId)
+        {
+            return new CoordinatorAckInfo(coordinator, messageId);
+        }
+    }
+}
diff --git a/src/java/org/apache/cassandra/replication/LocalMutationStates.java 
b/src/java/org/apache/cassandra/replication/LocalMutationStates.java
index 9bb1b4d834..ce1f13dd98 100644
--- a/src/java/org/apache/cassandra/replication/LocalMutationStates.java
+++ b/src/java/org/apache/cassandra/replication/LocalMutationStates.java
@@ -124,6 +124,7 @@ class LocalMutationStates
 
     void finishWriting(Mutation mutation)
     {
+        Preconditions.checkArgument(!mutation.id().isNone());
         Entry entry = statesMap.get(mutation.id().offset());
         Preconditions.checkNotNull(entry);
         entry.visibility = Visibility.VISIBLE;
diff --git a/src/java/org/apache/cassandra/replication/MutationId.java 
b/src/java/org/apache/cassandra/replication/MutationId.java
index ea33708833..5faf4db7d5 100644
--- a/src/java/org/apache/cassandra/replication/MutationId.java
+++ b/src/java/org/apache/cassandra/replication/MutationId.java
@@ -33,7 +33,7 @@ import org.apache.cassandra.io.util.DataOutputPlus;
  */
 public class MutationId extends ShortMutationId
 {
-    private static final long NONE_LOG_ID = Long.MIN_VALUE;
+    private static final long NONE_LOG_ID = CoordinatorLogId.none().asLong();
     private static final long NONE_SEQUENCE_ID = Long.MIN_VALUE;
     private static final int NONE_OFFSET = offset(NONE_SEQUENCE_ID);
     private static final int NONE_TIMESTAMP = timestamp(NONE_SEQUENCE_ID);
diff --git a/src/java/org/apache/cassandra/replication/MutationSummary.java 
b/src/java/org/apache/cassandra/replication/MutationSummary.java
index 3483f710ec..53a18d1046 100644
--- a/src/java/org/apache/cassandra/replication/MutationSummary.java
+++ b/src/java/org/apache/cassandra/replication/MutationSummary.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.*;
 
 import com.google.common.base.Preconditions;
+
 import org.agrona.collections.Long2ObjectHashMap;
 import org.apache.cassandra.db.Digest;
 import org.apache.cassandra.db.TypeSizes;
diff --git 
a/src/java/org/apache/cassandra/replication/MutationTrackingService.java 
b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
index 0c48c3e498..3e48840531 100644
--- a/src/java/org/apache/cassandra/replication/MutationTrackingService.java
+++ b/src/java/org/apache/cassandra/replication/MutationTrackingService.java
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 import java.util.function.IntSupplier;
 
+import com.google.common.base.Preconditions;
+
 import org.agrona.collections.IntArrayList;
 import org.apache.cassandra.db.DecoratedKey;
 import org.apache.cassandra.db.Mutation;
@@ -92,16 +94,19 @@ public class MutationTrackingService
 
     public void witnessedRemoteMutation(String keyspace, Token token, 
MutationId mutationId, InetAddressAndPort onHost)
     {
+        Preconditions.checkArgument(!mutationId.isNone());
         getOrCreate(keyspace).witnessedRemoteMutation(token, mutationId, 
onHost);
     }
 
     public void startWriting(Mutation mutation)
     {
+        Preconditions.checkArgument(!mutation.id().isNone());
         getOrCreate(mutation.getKeyspaceName()).startWriting(mutation);
     }
 
     public void finishWriting(Mutation mutation)
     {
+        Preconditions.checkArgument(!mutation.id().isNone());
         getOrCreate(mutation.getKeyspaceName()).finishWriting(mutation);
     }
 
@@ -153,6 +158,7 @@ public class MutationTrackingService
 
         static KeyspaceShards make(KeyspaceMetadata keyspace, ClusterMetadata 
cluster, IntSupplier logIdProvider)
         {
+            
Preconditions.checkArgument(keyspace.params.replicationType.isTracked());
             Map<Range<Token>, Shard> shards = new HashMap<>();
             
cluster.placements.get(keyspace.params.replication).writes.forEach((tokenRange, 
forRange) -> {
                IntArrayList participants = new IntArrayList(forRange.size(), 
IntArrayList.DEFAULT_NULL_VALUE);
diff --git a/src/java/org/apache/cassandra/replication/Shard.java 
b/src/java/org/apache/cassandra/replication/Shard.java
index 538e9a57c0..72ff1119ca 100644
--- a/src/java/org/apache/cassandra/replication/Shard.java
+++ b/src/java/org/apache/cassandra/replication/Shard.java
@@ -52,7 +52,9 @@ public class Shard
         this.sinceEpoch = sinceEpoch;
         this.logs = new NonBlockingHashMapLong<>();
         this.currentLocalLog = startNewLog(localHostId, 
logIdProvider.getAsInt(), participants);
-        logs.put(currentLocalLog.logId.asLong(), currentLocalLog);
+        CoordinatorLogId logId = currentLocalLog.logId;
+        Preconditions.checkArgument(!logId.isNone());
+        logs.put(logId.asLong(), currentLocalLog);
     }
 
     MutationId nextId()
diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java 
b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
index 5ff3acb34e..7ae7b35e77 100644
--- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
+++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
@@ -49,14 +49,17 @@ import org.apache.cassandra.net.ForwardingInfo;
 import org.apache.cassandra.net.Message;
 import org.apache.cassandra.net.MessageFlag;
 import org.apache.cassandra.net.MessagingService;
+import org.apache.cassandra.net.NoPayload;
+import org.apache.cassandra.net.ParamType;
+import org.apache.cassandra.net.RequestCallback;
 import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.service.AbstractWriteResponseHandler;
 import org.apache.cassandra.service.TrackedWriteResponseHandler;
 import org.apache.cassandra.tracing.Tracing;
 import org.apache.cassandra.transport.Dispatcher;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.MonotonicClock;
 
-import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 import static 
org.apache.cassandra.metrics.ClientRequestsMetricsHolder.writeMetrics;
 import static org.apache.cassandra.net.Verb.MUTATION_REQ;
@@ -72,34 +75,37 @@ public class TrackedWriteRequest
      * @param consistencyLevel the consistency level for the write operation
      * @param requestTime object holding times when request got enqueued and 
started execution
      */
-    public static TrackedWriteResponseHandler perform(
+    public static AbstractWriteResponseHandler<?> perform(
         Mutation mutation, ConsistencyLevel consistencyLevel, 
Dispatcher.RequestTime requestTime)
     {
         Tracing.trace("Determining replicas for mutation");
 
+        Preconditions.checkArgument(mutation.id().isNone());
         String keyspaceName = mutation.getKeyspaceName();
         Keyspace keyspace = Keyspace.open(keyspaceName);
         Token token = mutation.key().getToken();
 
-        MutationId id = 
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
-        mutation = mutation.withMutationId(id);
-
         ReplicaPlan.ForWrite plan = ReplicaPlans.forWrite(keyspace, 
consistencyLevel, token, ReplicaPlans.writeNormal);
+        AbstractReplicationStrategy rs = plan.replicationStrategy();
 
-        if (plan.lookup(FBUtilities.getBroadcastAddressAndPort()) != null)
-            writeMetrics.localRequests.mark();
-        else
+        if (plan.lookup(FBUtilities.getBroadcastAddressAndPort()) == null)
+        {
+            if (logger.isTraceEnabled())
+                logger.trace("Remote tracked request {} {}", mutation, plan);
             writeMetrics.remoteRequests.mark();
+            return ForwardedWrite.forwardMutation(mutation, plan, rs, 
requestTime);
+        }
 
-        AbstractReplicationStrategy rs = plan.replicationStrategy();
-
-        TrackedWriteResponseHandler handler =
-            TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, 
null, WriteType.SIMPLE, null, requestTime),
-                                             keyspaceName,
-                                             mutation.key().getToken(),
-                                             id);
+        if (logger.isTraceEnabled())
+            logger.trace("Local tracked request {} {}", mutation, plan);
+        writeMetrics.localRequests.mark();
+        MutationId id = 
MutationTrackingService.instance.nextMutationId(keyspaceName, token);
+        mutation = mutation.withMutationId(id);
+        TrackedWriteResponseHandler handler = 
TrackedWriteResponseHandler.wrap(rs.getWriteResponseHandler(plan, null, 
WriteType.SIMPLE, null, requestTime),
+                                         keyspaceName,
+                                         mutation.key().getToken(),
+                                         id);
         applyLocallyAndSendToReplicas(mutation, plan, handler);
-
         return handler;
     }
 
@@ -142,7 +148,12 @@ public class TrackedWriteRequest
             }
 
             if (message == null)
-                message = Message.outWithFlags(MUTATION_REQ, mutation, 
handler.getRequestTime(), singletonList(MessageFlag.CALL_BACK_ON_FAILURE));
+            {
+                Message.Builder<Mutation> builder = 
Message.builder(MUTATION_REQ, mutation)
+                                 .withRequestTime(handler.getRequestTime())
+                                 .withFlag(MessageFlag.CALL_BACK_ON_FAILURE);
+                message = builder.build();
+            }
 
             String dc = 
DatabaseDescriptor.getLocator().location(destination.endpoint()).datacenter;
 
@@ -175,31 +186,42 @@ public class TrackedWriteRequest
         {
             // for each datacenter, send the message to one node to relay the 
write to other replicas
             for (List<Replica> dcReplicas : remoteDCReplicas.values())
-                sendMessagesToRemoteDC(message, 
EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler);
+                sendMessagesToRemoteDC(message, 
EndpointsForToken.copyOf(mutation.key().getToken(), dcReplicas), handler, null);
         }
     }
 
-    private static void applyMutationLocally(Mutation mutation, 
TrackedWriteResponseHandler handler)
+    static void applyMutationLocally(Mutation mutation, 
RequestCallback<NoPayload> handler)
     {
+        Preconditions.checkArgument(handler instanceof 
TrackedWriteResponseHandler || handler instanceof 
ForwardedWrite.LeaderCallback);
         Stage.MUTATION.maybeExecuteImmediately(new 
LocalMutationRunnable(mutation, handler));
     }
 
     private static class LocalMutationRunnable implements 
DebuggableTask.RunnableDebuggableTask
     {
         private final Mutation mutation;
-        private final TrackedWriteResponseHandler handler;
+        private final RequestCallback<NoPayload> handler;
 
-        LocalMutationRunnable(Mutation mutation, TrackedWriteResponseHandler 
handler)
+        LocalMutationRunnable(Mutation mutation, RequestCallback<NoPayload> 
handler)
         {
+            Preconditions.checkArgument(handler instanceof 
TrackedWriteResponseHandler || handler instanceof 
ForwardedWrite.LeaderCallback);
             this.mutation = mutation;
             this.handler = handler;
         }
 
+        private Dispatcher.RequestTime getRequestTime()
+        {
+            if (handler instanceof TrackedWriteResponseHandler)
+                return ((TrackedWriteResponseHandler) 
handler).getRequestTime();
+            if (handler instanceof ForwardedWrite.LeaderCallback)
+                return ((ForwardedWrite.LeaderCallback) 
handler).getRequestTime();
+            throw new IllegalStateException();
+        }
+
         @Override
         public final void run()
         {
             long now = MonotonicClock.Global.approxTime.now();
-            long deadline = 
handler.getRequestTime().computeDeadline(MUTATION_REQ.expiresAfterNanos());
+            long deadline = 
getRequestTime().computeDeadline(MUTATION_REQ.expiresAfterNanos());
 
             if (now > deadline)
             {
@@ -224,13 +246,13 @@ public class TrackedWriteRequest
         @Override
         public long creationTimeNanos()
         {
-            return handler.getRequestTime().enqueuedAtNanos();
+            return getRequestTime().enqueuedAtNanos();
         }
 
         @Override
         public long startTimeNanos()
         {
-            return handler.getRequestTime().startedAtNanos();
+            return getRequestTime().startedAtNanos();
         }
 
         @Override
@@ -245,9 +267,10 @@ public class TrackedWriteRequest
     /*
      * Send the message to the first replica of targets, and have it forward 
the message to others in its DC
      */
-    private static void sendMessagesToRemoteDC(Message<? extends IMutation> 
message,
-                                               EndpointsForToken targets,
-                                               TrackedWriteResponseHandler 
handler)
+    static void sendMessagesToRemoteDC(Message<? extends IMutation> message,
+                                        EndpointsForToken targets,
+                                        RequestCallback<NoPayload> handler,
+                                        ForwardedWrite.CoordinatorAckInfo 
ackTo)
     {
         final Replica target;
 
@@ -258,7 +281,7 @@ public class TrackedWriteRequest
 
             for (Replica replica : forwardToReplicas)
             {
-                
MessagingService.instance().callbacks.addWithExpiration(handler, message, 
replica);
+                
MessagingService.instance().callbacks.addWithExpiration(handler, message, 
replica.endpoint());
                 logger.trace("Adding FWD message to {}@{}", message.id(), 
replica);
             }
 
@@ -272,9 +295,14 @@ public class TrackedWriteRequest
         {
             target = targets.get(0);
         }
+        if (ackTo != null)
+            message = message.withParam(ParamType.COORDINATOR_ACK_INFO, ackTo);
 
         Tracing.trace("Sending mutation to remote replica {}", target);
-        MessagingService.instance().sendWriteWithCallback(message, target, 
handler);
+        if (handler instanceof ForwardedWrite.LeaderCallback)
+            
MessagingService.instance().sendForwardedWriteWithCallback(message, target, 
(ForwardedWrite.LeaderCallback) handler);
+        else
+            MessagingService.instance().sendWriteWithCallback(message, target, 
(AbstractWriteResponseHandler<?>) handler);
         logger.trace("Sending message to {}@{}", message.id(), target);
     }
 
diff --git 
a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java 
b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
index b224cbf1c0..56528f89c8 100644
--- a/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
+++ b/src/java/org/apache/cassandra/service/TrackedWriteResponseHandler.java
@@ -54,10 +54,9 @@ public class TrackedWriteResponseHandler extends 
AbstractWriteResponseHandler<No
     @Override
     public void onResponse(Message<NoPayload> msg)
     {
-        /* local mutations are witnessed from Keyspace.applyInternalTracked */
+        // Local mutations are witnessed from Keyspace.applyInternalTracked
         if (msg != null)
             MutationTrackingService.instance.witnessedRemoteMutation(keyspace, 
token, mutationId, msg.from());
-
         wrapped.onResponse(msg);
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeId.java 
b/src/java/org/apache/cassandra/tcm/membership/NodeId.java
index f011314815..e9dead773d 100644
--- a/src/java/org/apache/cassandra/tcm/membership/NodeId.java
+++ b/src/java/org/apache/cassandra/tcm/membership/NodeId.java
@@ -25,6 +25,7 @@ import java.util.UUID;
 import com.google.common.primitives.Ints;
 
 import org.apache.cassandra.db.TypeSizes;
+import org.apache.cassandra.io.IVersionedSerializer;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
 import org.apache.cassandra.tcm.MultiStepOperation;
@@ -123,4 +124,25 @@ public class NodeId implements Comparable<NodeId>, 
MultiStepOperation.SequenceKe
             return TypeSizes.sizeofUnsignedVInt(t.id);
         }
     }
+
+    public static final IVersionedSerializer<NodeId> messagingSerializer = new 
IVersionedSerializer<NodeId>()
+    {
+        @Override
+        public void serialize(NodeId n, DataOutputPlus out, int version) 
throws IOException
+        {
+            out.writeUnsignedVInt32(n.id);
+        }
+
+        @Override
+        public NodeId deserialize(DataInputPlus in, int version) throws 
IOException
+        {
+            return new NodeId(in.readUnsignedVInt32());
+        }
+
+        @Override
+        public long serializedSize(NodeId n, int version)
+        {
+            return TypeSizes.sizeofUnsignedVInt(n.id);
+        }
+    };
 }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
new file mode 100644
index 0000000000..108451ff81
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.dht.Range;
+import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.api.IInstance;
+import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import org.apache.cassandra.replication.MutationSummary;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.schema.Schema;
+import org.apache.cassandra.schema.TableId;
+import org.assertj.core.api.Assertions;
+
+import static java.lang.String.format;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.dcAndRack;
+import static 
org.apache.cassandra.distributed.shared.NetworkTopology.networkTopology;
+
+public class MutationTrackingWriteForwardingTest extends TestBaseImpl
+{
+    private static final int NODES = 3;
+    private static final int RF = 1;
+
+    private static int inst(int i)
+    {
+        return (i % NODES) + 1;
+    }
+
+    @Test
+    public void testBasicWriteForwarding() throws Throwable
+    {
+        // 2 DCs, 1 replica in each, to test forwarding to instances in remote 
DCs and local DCs
+        Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(3, 
(nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2", 
"rack2"));
+
+        // TODO: disable background reconciliation so we can test that writes 
are reconciling immediately
+        try (Cluster cluster = Cluster.build(NODES)
+                                      .withConfig(cfg -> 
cfg.with(Feature.NETWORK)
+                                                            
.with(Feature.GOSSIP)
+                                                            
.set("mutation_tracking_enabled", "true")
+                                                            
.set("write_request_timeout", "1000ms"))
+                                      .withNodeIdTopology(topology)
+                                      .start())
+        {
+            String keyspaceName = "basic_write_forwarding_test";
+            String tableName = "tbl";
+            cluster.schemaChange(format("CREATE KEYSPACE %s WITH replication = 
" +
+                                        "{'class': 'NetworkTopologyStrategy', 
'replication_factor': " + RF + "} " +
+                                        "AND replication_type='tracked';", 
keyspaceName));
+            cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v 
int, primary key (k, c));", keyspaceName, tableName));
+
+            Map<IInstance, Integer> instanceUnreconciled = new HashMap<>();
+            int ROWS = 100;
+            for (int inserted = 0; inserted < ROWS; inserted++)
+            {
+                // Writes should be completed for the client, regardless of 
whether they are forwarded or not
+                cluster.coordinator(inst(inserted)).execute(format("INSERT 
INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName), 
ConsistencyLevel.ALL, inserted, inserted, inserted);
+
+                // Writes should be ack'd in the journal too, but these could 
lag behind client acks, so could be
+                // permissive here. Each write should be reconciled on the 
leader, unreconciled on the replica (until
+                // background reconciliation broadcast is implemented), and 
ignored on others.
+                IInstance replica = null;
+                for (IInvokableInstance instance : cluster)
+                {
+                    int unreconciled = instance.callOnInstance(() -> {
+                        Token token = 
DatabaseDescriptor.getPartitioner().getMinimumToken();
+                        Range<Token> fullRange = new Range<>(token, token);
+                        TableId tableId = 
Schema.instance.getTableMetadata(keyspaceName, tableName).id;
+                        MutationSummary summary = 
MutationTrackingService.instance.createSummaryForRange(fullRange, tableId, 
true);
+                        return summary.unreconciledIds();
+                    });
+                    int lastUnreconciled = 
instanceUnreconciled.getOrDefault(instance, 0);
+                    int newUnreconciled = unreconciled - lastUnreconciled;
+                    if (newUnreconciled == 1)
+                    {
+                        Assertions.assertThat(replica).isNull();
+                        replica = instance;
+                    }
+                    instanceUnreconciled.put(instance, unreconciled);
+                }
+                Assertions.assertThat(replica).isNotNull();
+            }
+            Assertions.assertThat(instanceUnreconciled).matches(map -> {
+                int sum = 0;
+                for (Integer value : map.values())
+                    sum += value;
+                return sum == ROWS;
+            });
+        }
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to