This is an automated email from the ASF dual-hosted git repository.

aber pushed a commit to branch cep-45-mutation-tracking
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-45-mutation-tracking by 
this push:
     new 70809160af Fixup inter-DC forwarding of writes from a 
coordinator-replica (fixup)
70809160af is described below

commit 70809160afb8dfd4d610865ff6337af3d9bf6f63
Author: Abe Ratnofsky <a...@aber.io>
AuthorDate: Wed May 7 11:28:09 2025 -0400

    Fixup inter-DC forwarding of writes from a coordinator-replica (fixup)
    
    Patch by Abe Ratnofsky; Reviewed by Aleksey Yeshchenko for CASSANDRA-20336
---
 .../org/apache/cassandra/net/RequestCallbacks.java |  8 +---
 .../cassandra/replication/TrackedWriteRequest.java |  7 +++-
 .../MutationTrackingWriteForwardingTest.java       | 44 ++++++++++++++++------
 3 files changed, 39 insertions(+), 20 deletions(-)

diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java 
b/src/java/org/apache/cassandra/net/RequestCallbacks.java
index 4a46b73b5a..5ec59d29e7 100644
--- a/src/java/org/apache/cassandra/net/RequestCallbacks.java
+++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java
@@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 import org.apache.cassandra.concurrent.ScheduledExecutorPlus;
 import org.slf4j.Logger;
@@ -36,7 +35,6 @@ import org.apache.cassandra.exceptions.RequestFailureReason;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.Replica;
 import org.apache.cassandra.metrics.InternodeOutboundMetrics;
-import org.apache.cassandra.replication.ForwardedWrite;
 import org.apache.cassandra.service.AbstractWriteResponseHandler;
 
 import static java.lang.String.format;
@@ -97,17 +95,13 @@ public class RequestCallbacks implements 
OutboundMessageCallbacks
      */
     public void addWithExpiration(RequestCallback<?> cb, Message<?> message, 
InetAddressAndPort to)
     {
-        // mutations need to call the overload
-        Preconditions.checkArgument((message.verb() != Verb.MUTATION_REQ && 
message.verb() != Verb.COUNTER_MUTATION_REQ) || (cb instanceof 
ForwardedWrite.LeaderCallback));
         CallbackInfo previous = callbacks.put(key(message.id(), to), new 
CallbackInfo(message, to, cb));
         assert previous == null : format("Callback already exists for id 
%d/%s! (%s)", message.id(), to, previous);
     }
 
     public void addWithExpiration(AbstractWriteResponseHandler<?> cb, 
Message<?> message, Replica to)
     {
-        Preconditions.checkArgument(message.verb() == Verb.MUTATION_REQ || 
message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() == 
Verb.PAXOS_COMMIT_REQ || message.verb() == Verb.FORWARDING_WRITE);
-        CallbackInfo previous = callbacks.put(key(message.id(), 
to.endpoint()), new CallbackInfo(message, to.endpoint(), cb));
-        assert previous == null : format("Callback already exists for id 
%d/%s! (%s)", message.id(), to.endpoint(), previous);
+        addWithExpiration(cb, message, to.endpoint());
     }
 
     @VisibleForTesting
diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java 
b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
index ba58f4f1f7..9cc7b3241c 100644
--- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
+++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java
@@ -281,7 +281,12 @@ public class TrackedWriteRequest
 
             for (Replica replica : forwardToReplicas)
             {
-                
MessagingService.instance().callbacks.addWithExpiration(handler, message, 
replica.endpoint());
+                if (handler instanceof TrackedWriteResponseHandler)
+                    
MessagingService.instance().callbacks.addWithExpiration((TrackedWriteResponseHandler)
 handler, message, replica);
+                else if (handler instanceof ForwardedWrite.LeaderCallback)
+                    
MessagingService.instance().callbacks.addWithExpiration(handler, message, 
replica.endpoint());
+                else
+                    throw new IllegalStateException();
                 logger.trace("Adding FWD message to {}@{}", message.id(), 
replica);
             }
 
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
index 108451ff81..9eb19a5fc9 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java
@@ -19,7 +19,9 @@
 package org.apache.cassandra.distributed.test.tracking;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 
 import org.junit.Test;
 
@@ -45,8 +47,13 @@ import static 
org.apache.cassandra.distributed.shared.NetworkTopology.networkTop
 
 public class MutationTrackingWriteForwardingTest extends TestBaseImpl
 {
-    private static final int NODES = 3;
-    private static final int RF = 1;
+    // Need a mix of local and remote replicas, multiple remote replicas to 
ensure inter-DC forwarding hits both
+    // paths for chosen replica in a remote DC, and all other replicas in a 
remote DC.
+    // Include an extra node to ensure non-replicas behave correctly.
+    private static final int NODES = 5;
+    private static final int RF_PER_DC = 2;
+    private static final int NUM_DCS = 2;
+    private static final int TOTAL_RF = RF_PER_DC * NUM_DCS;
 
     private static int inst(int i)
     {
@@ -57,7 +64,7 @@ public class MutationTrackingWriteForwardingTest extends 
TestBaseImpl
     public void testBasicWriteForwarding() throws Throwable
     {
         // 2 DCs, 1 replica in each, to test forwarding to instances in remote 
DCs and local DCs
-        Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(3, 
(nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2", 
"rack2"));
+        Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(5, 
(nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2", 
"rack2"));
 
         // TODO: disable background reconciliation so we can test that writes 
are reconciling immediately
         try (Cluster cluster = Cluster.build(NODES)
@@ -71,7 +78,7 @@ public class MutationTrackingWriteForwardingTest extends 
TestBaseImpl
             String keyspaceName = "basic_write_forwarding_test";
             String tableName = "tbl";
             cluster.schemaChange(format("CREATE KEYSPACE %s WITH replication = 
" +
-                                        "{'class': 'NetworkTopologyStrategy', 
'replication_factor': " + RF + "} " +
+                                        "{'class': 'NetworkTopologyStrategy', 
'replication_factor': " + RF_PER_DC + "} " +
                                         "AND replication_type='tracked';", 
keyspaceName));
             cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v 
int, primary key (k, c));", keyspaceName, tableName));
 
@@ -83,9 +90,10 @@ public class MutationTrackingWriteForwardingTest extends 
TestBaseImpl
                 cluster.coordinator(inst(inserted)).execute(format("INSERT 
INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName), 
ConsistencyLevel.ALL, inserted, inserted, inserted);
 
                 // Writes should be ack'd in the journal too, but these could 
lag behind client acks, so could be
-                // permissive here. Each write should be reconciled on the 
leader, unreconciled on the replica (until
-                // background reconciliation broadcast is implemented), and 
ignored on others.
-                IInstance replica = null;
+                // permissive here. Each write should be reconciled on 1 
leader, unreconciled on TOTAL_RF-1
+                // replicas (until background reconciliation broadcast is 
implemented), and ignored on others.
+                Set<IInvokableInstance> leaderOrNonReplica = new HashSet<>(2);
+                Set<IInvokableInstance> replicas = new HashSet<>();
                 for (IInvokableInstance instance : cluster)
                 {
                     int unreconciled = instance.callOnInstance(() -> {
@@ -97,20 +105,32 @@ public class MutationTrackingWriteForwardingTest extends 
TestBaseImpl
                     });
                     int lastUnreconciled = 
instanceUnreconciled.getOrDefault(instance, 0);
                     int newUnreconciled = unreconciled - lastUnreconciled;
-                    if (newUnreconciled == 1)
+
+                    if (newUnreconciled == 0)
+                    {
+                        // instance already reconciled (as leader) or did not 
receive new mutation ID (non-replica)
+                        leaderOrNonReplica.add(instance);
+                    }
+                    else if (newUnreconciled == 1)
+                    {
+                        // instance has not reconciled, so it's a replica 
(until reconciliation broadcast is implemented)
+                        replicas.add(instance);
+                    }
+                    else
                     {
-                        Assertions.assertThat(replica).isNull();
-                        replica = instance;
+                        Assertions.fail("Should not have more than one new 
unreconciled mutation");
                     }
                     instanceUnreconciled.put(instance, unreconciled);
                 }
-                Assertions.assertThat(replica).isNotNull();
+                Assertions.assertThat(leaderOrNonReplica).hasSize(2);
+                Assertions.assertThat(replicas).hasSize(TOTAL_RF - 1);
             }
             Assertions.assertThat(instanceUnreconciled).matches(map -> {
                 int sum = 0;
                 for (Integer value : map.values())
                     sum += value;
-                return sum == ROWS;
+                // Each write is reconciled on the leader, unreconciled on all 
other replicas
+                return sum == (ROWS * (TOTAL_RF - 1));
             });
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to