bdeggleston commented on code in PR #4806:
URL: https://github.com/apache/cassandra/pull/4806#discussion_r3261364610
##########
src/java/org/apache/cassandra/service/paxos/PaxosPrepare.java:
##########
@@ -1226,7 +1230,12 @@ public static class RequestHandler implements
IVerbHandler<Request>
@Override
public void doVerb(Message<Request> message)
{
-
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(ClusterMetadata.current(),
message.from(), message.epoch());
+ ClusterMetadata metadata =
ClusterMetadataService.instance().fetchLogFromPeerOrCMS(ClusterMetadata.current(),
+
message.from(),
+
message.epoch());
+
+ if (message.payload.read != null)
+ MigrationRouter.checkPaxosPrepareReadMigration(metadata,
message, message.from(), message.payload.read);
Review Comment:
this doesn't seem to be handled on the prepare callback from what I can
tell. If the coordinator gets a coordinator behind exception I think we'll just
timeout? I think this could be handled by inspecting the failure reasons in the
MAYBE_FAILURE case and breaking out of the switch (and retrying) if we're
behind?
##########
test/distributed/org/apache/cassandra/distributed/test/tracking/PaxosMutationTrackingMigrationV2Test.java:
##########
@@ -0,0 +1,1217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.EpochPin;
+import
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.MessageSpy;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.replication.migration.MigrationRouter;
+import
org.apache.cassandra.service.replication.migration.MutationTrackingMigrationState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.alterReplicationType;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.alterReplicationTypeFrom;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertAllNodesSee;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertCasApplied;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertCasException;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertCasNotApplied;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertNodeSees;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertReplicaHasNoRow;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertReplicasAreExactly;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertReplicasHaveValue;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.awaitReplicationType;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.buildPaxosCluster;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.casAsync;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.casAsyncExpectingFailure;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.createKeyspace;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.epochPin;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.on;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.pauseHintsAndReconciler;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that Paxos V2 CAS operations work correctly during mutation tracking
migration.
+ *
+ * Each test documents the exact message verb being verified and asserts that
the
+ * expected code path was taken (not just that the CAS "succeeds").
+ *
+ * Uses a shared 4-node cluster with paxos_variant=v2 and RF=3. With RF=3 on 4
nodes,
+ * some keys have node 1 as a replica (2 remote replicas) and some do not (3
remote
+ * replicas). All tests handle both cases.
+ */
+public class PaxosMutationTrackingMigrationV2Test extends TestBaseImpl
+{
+ private static Cluster cluster;
+
+ /*
+ * With Murmur3Partitioner, 4 nodes, and SimpleStrategy RF=3:
+ * Key 5 → replicas on nodes 1, 2, 3 (node 4 excluded)
+ * This key is used by most tests because node 1 is a replica (avoids CAS
forwarding)
+ * and node 4 is not a replica (useful for
testCommitAndPrepareViaIncompleteAccepted).
+ */
+ private static final int KEY = 5;
+
+ @BeforeClass
+ public static void setup() throws Throwable
+ {
+ cluster = init(buildPaxosCluster(4, "v2").start());
+ pauseHintsAndReconciler(cluster);
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ if (cluster != null)
+ cluster.close();
+ }
+
+ @After
+ public void resetFilters()
+ {
+ cluster.filters().reset();
+ cluster.forEach(instance -> instance.runOnInstance(() ->
HintsService.setRejectHintsBeforeNanos(0)));
+ ClusterUtils.awaitTCMCatchUp(cluster);
+ }
+
+ /*
+ * CAS during active to-tracked migration
+ *
+ * Message: PAXOS_COMMIT_REQ
+ * Path: V2 -> PaxosCommit.start() (tracked path)
+ * Verifies: MigrationRouter.shouldUseTrackedForWrites() returns true
during migration
+ */
+ @Test
+ public void testCasDuringMigrationToTracked() throws Throwable
+ {
+ String ks = createKeyspace(cluster, "pmt_v2", "untracked");
+ assertReplicasAreExactly(cluster, ks, KEY, new int[]{ 1, 2, 3 });
+
+ cluster.coordinator(1).execute("INSERT INTO " + ks + ".tbl (k, v)
VALUES (" + KEY + ", 0)",
+ ConsistencyLevel.QUORUM);
+
+ alterReplicationType(cluster, ks, "tracked");
+
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ final String keyspace = ks;
+ int nodeId = i;
+ cluster.get(i).runOnInstance(() -> {
+ ClusterMetadata cm = ClusterMetadata.current();
+ MutationTrackingMigrationState state =
cm.mutationTrackingMigrationState;
+ assertTrue("Node " + nodeId + ": keyspace should be migrating",
+ state.isMigrating(keyspace));
+ assertTrue("Node " + nodeId + ": schema should show tracked",
+
cm.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked());
+ });
+ }
+
+ // Count PAXOS_COMMIT_REQ messages that carry a mutation ID. Counting
raw messages is not
+ // enough — 2 untracked commits on RF=3 would also produce count==2.
The tracked path
+ // differs from the untracked path by carrying a non-none mutation ID
on the Commit payload.
+ try (MessageSpy spy = on(cluster, Verb.PAXOS_COMMIT_REQ)
+ .checkMutationId()
+ .expect(2)
+ .start())
+ {
+ Object[][] result = cluster.coordinator(1).execute("UPDATE " + ks
+ ".tbl SET v = 1 WHERE k = " + KEY + " IF v = 0",
+
ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM);
+
+ spy.await();
+ assertCasApplied(result);
+ assertEquals("Tracked CAS should send exactly 2 PAXOS_COMMIT_REQ
carrying a mutation ID (to 2 remote replicas)",
+ 2, spy.withMutationId());
+ }
+
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ int nodeId = i;
+ cluster.get(i).runOnInstance(() -> {
+ assertTrue("Node " + nodeId + ": MutationTrackingService
should be enabled",
+ MutationTrackingService.isEnabled());
+ });
+ }
+
+ assertReplicasHaveValue(cluster, ks, KEY, 1, 1, 2, 3);
+ }
+
+ /*
+ * In V2 Paxos, the read is embedded in the PAXOS2_PREPARE_REQ payload.
Tracked reads use
+ * TrackedRead.DataRequest/SummaryRequest; untracked uses
SinglePartitionReadCommand.
+ * The handler validates via checkPaxosPrepareReadMigration(). Since both
types use the
+ * same verb (PAXOS2_PREPARE_REQ), we verify routing by checking
MigrationRouter.shouldUseTracked()
+ * on the coordinator, and confirm handler acceptance by checking no
COORDINATOR_BEHIND occurred
+ * (proven by exact PAXOS_COMMIT_REQ counts -- retries would inflate the
count).
+ *
+ * 2a: During to-tracked migration, reads should be UNTRACKED (safe
default).
+ * 2b: After migration to untracked, reads should be UNTRACKED.
+ * 2c: On a fully-tracked keyspace (no migration), reads should be TRACKED.
+ */
+
+ /**
+ * 2a: During to-tracked migration, the Paxos prepare read uses untracked
routing.
+ *
+ * MigrationRouter.shouldUseTracked() returns false during migration
because tracked
+ * reads require ALL writes to be tracked for monotonicity.
+ */
+ @Test
+ public void testPrepareReadDuringMigrationIsUntracked() throws Throwable
+ {
+ String ks = createKeyspace(cluster, "pmt_v2", "untracked");
+ assertReplicasAreExactly(cluster, ks, KEY, new int[]{ 1, 2, 3 });
+
+ cluster.coordinator(1).execute("INSERT INTO " + ks + ".tbl (k, v)
VALUES (" + KEY + ", 42)",
+ ConsistencyLevel.QUORUM);
+
+ alterReplicationType(cluster, ks, "tracked");
+
+ // Verify migration is active and the prepare read routing is UNTRACKED
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ final String keyspace = ks;
+ int nodeId = i;
+ cluster.get(i).runOnInstance(() -> {
+ assertTrue("Node " + nodeId + ": migration should be active",
+
ClusterMetadata.current().mutationTrackingMigrationState.isMigrating(keyspace));
+
+ TableMetadata tbl =
ClusterMetadata.current().schema.getKeyspaceMetadata(keyspace).getTableOrViewNullable("tbl");
+ SinglePartitionReadCommand cmd =
SinglePartitionReadCommand.fullPartitionRead(tbl, FBUtilities.nowInSeconds(),
tbl.partitioner.decorateKey(tbl.partitionKeyType.fromString(String.valueOf(KEY))));
+ assertFalse("Node " + nodeId + ": prepare read should be
UNTRACKED during migration",
+ MigrationRouter.shouldUseTracked(cmd));
+ });
+ }
+
+ // CAS works with untracked read (correct routing during migration)
+ Object[][] result = cluster.coordinator(1).execute("SELECT * FROM " +
ks + ".tbl WHERE k = " + KEY,
+
ConsistencyLevel.SERIAL);
+
+ assertNotNull("CAS read should return result", result);
+ assertEquals("Should have one row", 1, result.length);
+ assertEquals("Value should be 42", 42, result[0][1]);
+ }
+
+ /**
+ * 2b: After migration to untracked, the Paxos prepare read uses untracked
routing.
+ */
+ @Test
+ public void testPrepareReadAfterMigrationToUntrackedIsUntracked() throws
Throwable
+ {
+ String ks = createKeyspace(cluster, "pmt_v2", "tracked");
+ assertReplicasAreExactly(cluster, ks, KEY, new int[]{ 1, 2, 3 });
+
+ alterReplicationType(cluster, ks, "untracked");
+
+ // Verify all nodes see untracked and the prepare read routing is
UNTRACKED
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ final String keyspace = ks;
+ int nodeId = i;
+ cluster.get(i).runOnInstance(() -> {
+ assertFalse("Node " + nodeId + ": should be untracked",
+
ClusterMetadata.current().schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked());
+
+ TableMetadata tbl =
ClusterMetadata.current().schema.getKeyspaceMetadata(keyspace).getTableOrViewNullable("tbl");
+ SinglePartitionReadCommand cmd =
SinglePartitionReadCommand.fullPartitionRead(tbl, FBUtilities.nowInSeconds(),
tbl.partitioner.decorateKey(tbl.partitionKeyType.fromString(String.valueOf(KEY))));
+ assertFalse("Node " + nodeId + ": prepare read should be
UNTRACKED after migration",
+ MigrationRouter.shouldUseTracked(cmd));
+ });
+ }
+
+ // Spy on PAXOS_COMMIT_REQ to verify no COORDINATOR_BEHIND retry.
+ try (MessageSpy spy = on(cluster, Verb.PAXOS_COMMIT_REQ)
+ .to(2, 3, 4)
+ .expect(2)
+ .start())
+ {
+ Object[][] result = cluster.coordinator(1).execute("INSERT INTO "
+ ks + ".tbl (k, v) VALUES (" + KEY + ", 42) IF NOT EXISTS",
Review Comment:
same thing here, let's take a look at prepare and confirm the right messages
are being sent
##########
test/distributed/org/apache/cassandra/distributed/test/tracking/PaxosMutationTrackingMigrationV2Test.java:
##########
@@ -0,0 +1,1217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.tracking;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import org.apache.cassandra.db.SinglePartitionReadCommand;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.ConsistencyLevel;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+import
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.EpochPin;
+import
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.MessageSpy;
+import org.apache.cassandra.hints.HintsService;
+import org.apache.cassandra.metrics.StorageMetrics;
+import org.apache.cassandra.net.Verb;
+import org.apache.cassandra.replication.MutationTrackingService;
+import org.apache.cassandra.schema.ReplicationType;
+import org.apache.cassandra.schema.TableMetadata;
+import org.apache.cassandra.service.replication.migration.MigrationRouter;
+import
org.apache.cassandra.service.replication.migration.MutationTrackingMigrationState;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.utils.FBUtilities;
+
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.alterReplicationType;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.alterReplicationTypeFrom;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertAllNodesSee;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertCasApplied;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertCasException;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertCasNotApplied;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertNodeSees;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertReplicaHasNoRow;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertReplicasAreExactly;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.assertReplicasHaveValue;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.awaitReplicationType;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.buildPaxosCluster;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.casAsync;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.casAsyncExpectingFailure;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.createKeyspace;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.epochPin;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.on;
+import static
org.apache.cassandra.distributed.test.tracking.PaxosMigrationTestUtils.pauseHintsAndReconciler;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that Paxos V2 CAS operations work correctly during mutation tracking
migration.
+ *
+ * Each test documents the exact message verb being verified and asserts that
the
+ * expected code path was taken (not just that the CAS "succeeds").
+ *
+ * Uses a shared 4-node cluster with paxos_variant=v2 and RF=3. With RF=3 on 4
nodes,
+ * some keys have node 1 as a replica (2 remote replicas) and some do not (3
remote
+ * replicas). All tests handle both cases.
+ */
+public class PaxosMutationTrackingMigrationV2Test extends TestBaseImpl
+{
+ private static Cluster cluster;
+
+ /*
+ * With Murmur3Partitioner, 4 nodes, and SimpleStrategy RF=3:
+ * Key 5 → replicas on nodes 1, 2, 3 (node 4 excluded)
+ * This key is used by most tests because node 1 is a replica (avoids CAS
forwarding)
+ * and node 4 is not a replica (useful for
testCommitAndPrepareViaIncompleteAccepted).
+ */
+ private static final int KEY = 5;
+
+ @BeforeClass
+ public static void setup() throws Throwable
+ {
+ cluster = init(buildPaxosCluster(4, "v2").start());
+ pauseHintsAndReconciler(cluster);
+ }
+
+ @AfterClass
+ public static void teardown()
+ {
+ if (cluster != null)
+ cluster.close();
+ }
+
+ @After
+ public void resetFilters()
+ {
+ cluster.filters().reset();
+ cluster.forEach(instance -> instance.runOnInstance(() ->
HintsService.setRejectHintsBeforeNanos(0)));
+ ClusterUtils.awaitTCMCatchUp(cluster);
+ }
+
+ /*
+ * CAS during active to-tracked migration
+ *
+ * Message: PAXOS_COMMIT_REQ
+ * Path: V2 -> PaxosCommit.start() (tracked path)
+ * Verifies: MigrationRouter.shouldUseTrackedForWrites() returns true
during migration
+ */
+ @Test
+ public void testCasDuringMigrationToTracked() throws Throwable
+ {
+ String ks = createKeyspace(cluster, "pmt_v2", "untracked");
+ assertReplicasAreExactly(cluster, ks, KEY, new int[]{ 1, 2, 3 });
+
+ cluster.coordinator(1).execute("INSERT INTO " + ks + ".tbl (k, v)
VALUES (" + KEY + ", 0)",
+ ConsistencyLevel.QUORUM);
+
+ alterReplicationType(cluster, ks, "tracked");
+
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ final String keyspace = ks;
+ int nodeId = i;
+ cluster.get(i).runOnInstance(() -> {
+ ClusterMetadata cm = ClusterMetadata.current();
+ MutationTrackingMigrationState state =
cm.mutationTrackingMigrationState;
+ assertTrue("Node " + nodeId + ": keyspace should be migrating",
+ state.isMigrating(keyspace));
+ assertTrue("Node " + nodeId + ": schema should show tracked",
+
cm.schema.getKeyspaceMetadata(keyspace).params.replicationType.isTracked());
+ });
+ }
+
+ // Count PAXOS_COMMIT_REQ messages that carry a mutation ID. Counting
raw messages is not
+ // enough — 2 untracked commits on RF=3 would also produce count==2.
The tracked path
+ // differs from the untracked path by carrying a non-none mutation ID
on the Commit payload.
+ try (MessageSpy spy = on(cluster, Verb.PAXOS_COMMIT_REQ)
+ .checkMutationId()
+ .expect(2)
+ .start())
+ {
+ Object[][] result = cluster.coordinator(1).execute("UPDATE " + ks
+ ".tbl SET v = 1 WHERE k = " + KEY + " IF v = 0",
+
ConsistencyLevel.SERIAL, ConsistencyLevel.QUORUM);
+
+ spy.await();
+ assertCasApplied(result);
+ assertEquals("Tracked CAS should send exactly 2 PAXOS_COMMIT_REQ
carrying a mutation ID (to 2 remote replicas)",
+ 2, spy.withMutationId());
+ }
+
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ int nodeId = i;
+ cluster.get(i).runOnInstance(() -> {
+ assertTrue("Node " + nodeId + ": MutationTrackingService
should be enabled",
+ MutationTrackingService.isEnabled());
+ });
+ }
+
+ assertReplicasHaveValue(cluster, ks, KEY, 1, 1, 2, 3);
+ }
+
+ /*
+ * In V2 Paxos, the read is embedded in the PAXOS2_PREPARE_REQ payload.
Tracked reads use
+ * TrackedRead.DataRequest/SummaryRequest; untracked uses
SinglePartitionReadCommand.
+ * The handler validates via checkPaxosPrepareReadMigration(). Since both
types use the
+ * same verb (PAXOS2_PREPARE_REQ), we verify routing by checking
MigrationRouter.shouldUseTracked()
+ * on the coordinator, and confirm handler acceptance by checking no
COORDINATOR_BEHIND occurred
+ * (proven by exact PAXOS_COMMIT_REQ counts -- retries would inflate the
count).
+ *
+ * 2a: During to-tracked migration, reads should be UNTRACKED (safe
default).
+ * 2b: After migration to untracked, reads should be UNTRACKED.
+ * 2c: On a fully-tracked keyspace (no migration), reads should be TRACKED.
+ */
+
+ /**
+ * 2a: During to-tracked migration, the Paxos prepare read uses untracked
routing.
+ *
+ * MigrationRouter.shouldUseTracked() returns false during migration
because tracked
+ * reads require ALL writes to be tracked for monotonicity.
+ */
+ @Test
+ public void testPrepareReadDuringMigrationIsUntracked() throws Throwable
+ {
+ String ks = createKeyspace(cluster, "pmt_v2", "untracked");
+ assertReplicasAreExactly(cluster, ks, KEY, new int[]{ 1, 2, 3 });
+
+ cluster.coordinator(1).execute("INSERT INTO " + ks + ".tbl (k, v)
VALUES (" + KEY + ", 42)",
+ ConsistencyLevel.QUORUM);
+
+ alterReplicationType(cluster, ks, "tracked");
+
+ // Verify migration is active and the prepare read routing is UNTRACKED
+ for (int i = 1; i <= cluster.size(); i++)
+ {
+ final String keyspace = ks;
+ int nodeId = i;
+ cluster.get(i).runOnInstance(() -> {
+ assertTrue("Node " + nodeId + ": migration should be active",
+
ClusterMetadata.current().mutationTrackingMigrationState.isMigrating(keyspace));
+
+ TableMetadata tbl =
ClusterMetadata.current().schema.getKeyspaceMetadata(keyspace).getTableOrViewNullable("tbl");
+ SinglePartitionReadCommand cmd =
SinglePartitionReadCommand.fullPartitionRead(tbl, FBUtilities.nowInSeconds(),
tbl.partitioner.decorateKey(tbl.partitionKeyType.fromString(String.valueOf(KEY))));
+ assertFalse("Node " + nodeId + ": prepare read should be
UNTRACKED during migration",
+ MigrationRouter.shouldUseTracked(cmd));
+ });
+ }
+
+ // CAS works with untracked read (correct routing during migration)
+ Object[][] result = cluster.coordinator(1).execute("SELECT * FROM " +
ks + ".tbl WHERE k = " + KEY,
Review Comment:
We should be intercepting the prepare messages here and confirming they're
doing tracked reads. This is just confirming that the migration router says the
right thing and the read completes
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]