This is an automated email from the ASF dual-hosted git repository. aber pushed a commit to branch cep-45-mutation-tracking in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-45-mutation-tracking by this push: new 70809160af Fixup inter-DC forwarding of writes from a coordinator-replica (fixup) 70809160af is described below commit 70809160afb8dfd4d610865ff6337af3d9bf6f63 Author: Abe Ratnofsky <a...@aber.io> AuthorDate: Wed May 7 11:28:09 2025 -0400 Fixup inter-DC forwarding of writes from a coordinator-replica (fixup) Patch by Abe Ratnofsky; Reviewed by Aleksey Yeshchenko for CASSANDRA-20336 --- .../org/apache/cassandra/net/RequestCallbacks.java | 8 +--- .../cassandra/replication/TrackedWriteRequest.java | 7 +++- .../MutationTrackingWriteForwardingTest.java | 44 ++++++++++++++++------ 3 files changed, 39 insertions(+), 20 deletions(-) diff --git a/src/java/org/apache/cassandra/net/RequestCallbacks.java b/src/java/org/apache/cassandra/net/RequestCallbacks.java index 4a46b73b5a..5ec59d29e7 100644 --- a/src/java/org/apache/cassandra/net/RequestCallbacks.java +++ b/src/java/org/apache/cassandra/net/RequestCallbacks.java @@ -25,7 +25,6 @@ import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; import org.apache.cassandra.concurrent.ScheduledExecutorPlus; import org.slf4j.Logger; @@ -36,7 +35,6 @@ import org.apache.cassandra.exceptions.RequestFailureReason; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.metrics.InternodeOutboundMetrics; -import org.apache.cassandra.replication.ForwardedWrite; import org.apache.cassandra.service.AbstractWriteResponseHandler; import static java.lang.String.format; @@ -97,17 +95,13 @@ public class RequestCallbacks implements OutboundMessageCallbacks */ public void addWithExpiration(RequestCallback<?> cb, Message<?> message, InetAddressAndPort to) { - // mutations need to call the overload - Preconditions.checkArgument((message.verb() != Verb.MUTATION_REQ && message.verb() != Verb.COUNTER_MUTATION_REQ) || (cb instanceof ForwardedWrite.LeaderCallback)); CallbackInfo previous = callbacks.put(key(message.id(), to), new CallbackInfo(message, to, cb)); assert previous == null : format("Callback already exists for id %d/%s! (%s)", message.id(), to, previous); } public void addWithExpiration(AbstractWriteResponseHandler<?> cb, Message<?> message, Replica to) { - Preconditions.checkArgument(message.verb() == Verb.MUTATION_REQ || message.verb() == Verb.COUNTER_MUTATION_REQ || message.verb() == Verb.PAXOS_COMMIT_REQ || message.verb() == Verb.FORWARDING_WRITE); - CallbackInfo previous = callbacks.put(key(message.id(), to.endpoint()), new CallbackInfo(message, to.endpoint(), cb)); - assert previous == null : format("Callback already exists for id %d/%s! (%s)", message.id(), to.endpoint(), previous); + addWithExpiration(cb, message, to.endpoint()); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java index ba58f4f1f7..9cc7b3241c 100644 --- a/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java +++ b/src/java/org/apache/cassandra/replication/TrackedWriteRequest.java @@ -281,7 +281,12 @@ public class TrackedWriteRequest for (Replica replica : forwardToReplicas) { - MessagingService.instance().callbacks.addWithExpiration(handler, message, replica.endpoint()); + if (handler instanceof TrackedWriteResponseHandler) + MessagingService.instance().callbacks.addWithExpiration((TrackedWriteResponseHandler) handler, message, replica); + else if (handler instanceof ForwardedWrite.LeaderCallback) + MessagingService.instance().callbacks.addWithExpiration(handler, message, replica.endpoint()); + else + throw new IllegalStateException(); logger.trace("Adding FWD message to {}@{}", message.id(), replica); } diff --git a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java index 108451ff81..9eb19a5fc9 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/tracking/MutationTrackingWriteForwardingTest.java @@ -19,7 +19,9 @@ package org.apache.cassandra.distributed.test.tracking; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.junit.Test; @@ -45,8 +47,13 @@ import static org.apache.cassandra.distributed.shared.NetworkTopology.networkTop public class MutationTrackingWriteForwardingTest extends TestBaseImpl { - private static final int NODES = 3; - private static final int RF = 1; + // Need a mix of local and remote replicas, multiple remote replicas to ensure inter-DC forwarding hits both + // paths for chosen replica in a remote DC, and all other replicas in a remote DC. + // Include an extra node to ensure non-replicas behave correctly. + private static final int NODES = 5; + private static final int RF_PER_DC = 2; + private static final int NUM_DCS = 2; + private static final int TOTAL_RF = RF_PER_DC * NUM_DCS; private static int inst(int i) { @@ -57,7 +64,7 @@ public class MutationTrackingWriteForwardingTest extends TestBaseImpl public void testBasicWriteForwarding() throws Throwable { // 2 DCs, 1 replica in each, to test forwarding to instances in remote DCs and local DCs - Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(3, (nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2", "rack2")); + Map<Integer, NetworkTopology.DcAndRack> topology = networkTopology(5, (nodeid) -> nodeid % 2 == 1 ? dcAndRack("dc1", "rack1") : dcAndRack("dc2", "rack2")); // TODO: disable background reconciliation so we can test that writes are reconciling immediately try (Cluster cluster = Cluster.build(NODES) @@ -71,7 +78,7 @@ public class MutationTrackingWriteForwardingTest extends TestBaseImpl String keyspaceName = "basic_write_forwarding_test"; String tableName = "tbl"; cluster.schemaChange(format("CREATE KEYSPACE %s WITH replication = " + - "{'class': 'NetworkTopologyStrategy', 'replication_factor': " + RF + "} " + + "{'class': 'NetworkTopologyStrategy', 'replication_factor': " + RF_PER_DC + "} " + "AND replication_type='tracked';", keyspaceName)); cluster.schemaChange(format("CREATE TABLE %s.%s (k int, c int, v int, primary key (k, c));", keyspaceName, tableName)); @@ -83,9 +90,10 @@ public class MutationTrackingWriteForwardingTest extends TestBaseImpl cluster.coordinator(inst(inserted)).execute(format("INSERT INTO %s.%s (k, c, v) VALUES (?, ?, ?)", keyspaceName, tableName), ConsistencyLevel.ALL, inserted, inserted, inserted); // Writes should be ack'd in the journal too, but these could lag behind client acks, so could be - // permissive here. Each write should be reconciled on the leader, unreconciled on the replica (until - // background reconciliation broadcast is implemented), and ignored on others. - IInstance replica = null; + // permissive here. Each write should be reconciled on 1 leader, unreconciled on TOTAL_RF-1 + // replicas (until background reconciliation broadcast is implemented), and ignored on others. + Set<IInvokableInstance> leaderOrNonReplica = new HashSet<>(2); + Set<IInvokableInstance> replicas = new HashSet<>(); for (IInvokableInstance instance : cluster) { int unreconciled = instance.callOnInstance(() -> { @@ -97,20 +105,32 @@ public class MutationTrackingWriteForwardingTest extends TestBaseImpl }); int lastUnreconciled = instanceUnreconciled.getOrDefault(instance, 0); int newUnreconciled = unreconciled - lastUnreconciled; - if (newUnreconciled == 1) + + if (newUnreconciled == 0) + { + // instance already reconciled (as leader) or did not receive new mutation ID (non-replica) + leaderOrNonReplica.add(instance); + } + else if (newUnreconciled == 1) + { + // instance has not reconciled, so it's a replica (until reconciliation broadcast is implemented) + replicas.add(instance); + } + else { - Assertions.assertThat(replica).isNull(); - replica = instance; + Assertions.fail("Should not have more than one new unreconciled mutation"); } instanceUnreconciled.put(instance, unreconciled); } - Assertions.assertThat(replica).isNotNull(); + Assertions.assertThat(leaderOrNonReplica).hasSize(2); + Assertions.assertThat(replicas).hasSize(TOTAL_RF - 1); } Assertions.assertThat(instanceUnreconciled).matches(map -> { int sum = 0; for (Integer value : map.values()) sum += value; - return sum == ROWS; + // Each write is reconciled on the leader, unreconciled on all other replicas + return sum == (ROWS * (TOTAL_RF - 1)); }); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org