This is an automated email from the ASF dual-hosted git repository.

samt pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 51e01a3862 Repair Paxos for the distributed metadata log when CMS 
membership changes
51e01a3862 is described below

commit 51e01a3862afc1ebaffd765b2490a581461c3f14
Author: Sam Tunnicliffe <s...@apache.org>
AuthorDate: Thu Mar 20 17:55:44 2025 +0000

    Repair Paxos for the distributed metadata log when CMS membership changes
    
    Patch by Josh McKenzie and Sam Tunnicliffe; reviewed by Marcus Ericksson for
    CASSANDRA-20467
    
    Co-authored-by: Josh McKenzie <jmcken...@apache.org>
    Co-authored-by: Sam Tunnicliffe <s...@apache.org>
---
 CHANGES.txt                                        |  1 +
 .../apache/cassandra/db/DiskBoundaryManager.java   |  2 +-
 .../apache/cassandra/tcm/MultiStepOperation.java   |  4 +-
 .../org/apache/cassandra/tcm/Transformation.java   |  6 ++
 .../apache/cassandra/tcm/sequences/AddToCMS.java   |  2 +-
 .../cassandra/tcm/sequences/ReconfigureCMS.java    | 67 +++++++++++++++++-----
 .../distributed/test/PaxosRepairTest.java          | 21 ++++---
 .../distributed/test/log/ReconfigureCMSTest.java   | 64 +++++++++++++++++++++
 8 files changed, 142 insertions(+), 25 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index 98389b77cc..1360ab01bc 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 5.1
+ * Repair Paxos for the distributed metadata log when CMS membership changes 
(CASSANDRA-20467)
  * Reintroduce CASSANDRA-17411 in trunk (CASSANDRA-19346)
  * Add min/max/mean/percentiles to timer metrics vtable (CASSANDRA-20466)
  * Add support for time, date, timestamp types in scalar constraint 
(CASSANDRA-20274)
diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java 
b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
index 2a25dc4efc..5c6b59a0b1 100644
--- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
+++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java
@@ -152,7 +152,7 @@ public class DiskBoundaryManager
         }
         else
         {
-            // Reason we use use the future settled metadata is that if we 
decommission a node, we want to stream
+            // Reason we use the future settled metadata is that if we 
decommission a node, we want to stream
             // from that node to the correct location on disk, if we didn't, 
we would put new files in the wrong places.
             // We do this to minimize the amount of data we need to move in 
rebalancedisks once everything settled
             placement = 
metadata.writePlacementAllSettled(cfs.keyspace.getMetadata());
diff --git a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java 
b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
index 019086dccd..d447974f85 100644
--- a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
+++ b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java
@@ -40,7 +40,7 @@ import 
org.apache.cassandra.tcm.serialization.MetadataSerializer;
  * For example, in order to join, the joining node has to execute the 
following steps:
  *   * PrepareJoin, which introduces node's tokens, but makes no changes to 
range ownership, and creates BootstrapAndJoin
  *     in-progress sequence
- *   * StartJoin, which adds the bootstrapping node to the write placements 
for the ranges it gains
+ *   * StartJoin, which adds the bootstrapping node to the write placements 
for  the ranges it gains
  *   * MidJoin, which adds the bootstrapping node to the read placements for 
the ranges it has gained, and removes
  *     owners of these ranges from the read placements
  *   * FinishJoin, which removes owners of the gained ranges from the write 
placements.
@@ -126,7 +126,7 @@ public abstract class MultiStepOperation<CONTEXT>
 
     /**
      * Returns the {@link Transformation.Kind} of the next step due to be 
executed in the sequence. Used when executing
-     * a {@link Transformation} which is part of a sequence (specifically, 
subclasses of
+     * a {@link Transformation} which is part of a sequence (often, this is an 
implementation of
      * {@link org.apache.cassandra.tcm.transformations.ApplyPlacementDeltas}) 
to validate that it is being applied at
      * the correct point (i.e. that the type of the transform matches the 
expected next)
      * matches the If all steps
diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java 
b/src/java/org/apache/cassandra/tcm/Transformation.java
index e4b81bc765..8cfda01e26 100644
--- a/src/java/org/apache/cassandra/tcm/Transformation.java
+++ b/src/java/org/apache/cassandra/tcm/Transformation.java
@@ -48,6 +48,12 @@ import 
org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS;
 import org.apache.cassandra.tcm.transformations.cms.StartAddToCMS;
 import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
 
+/**
+ * Implementations should be pure transformations from one ClusterMetadata 
state to another. They are likely to be
+ * replayed during startup to rebuild the node's current state and so should 
be free of side effects and should not
+ * depend on external state, configuration or resources. They must produce 
consistent outputs when run on every instance
+ * in a cluster, regardless of any specific characteristics of the instance.
+ */
 public interface Transformation
 {
     Serializer transformationSerializer = new Serializer();
diff --git a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java 
b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
index 149c99b8b2..0d5ee2f067 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java
@@ -82,7 +82,7 @@ public class AddToCMS extends MultiStepOperation<Epoch>
                                                                .commit(new 
StartAddToCMS(addr))
                                            .inProgressSequences.get(nodeId);
         InProgressSequences.resume(sequence);
-        ReconfigureCMS.repairPaxosTopology();
+        ReconfigureCMS.repairPaxosForCMSTopologyChange();
     }
 
     public AddToCMS(Epoch latestModification,
diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java 
b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
index 57b5e68e80..38566812bf 100644
--- a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
+++ b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java
@@ -33,9 +33,6 @@ import com.google.common.collect.ImmutableSet;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.cassandra.config.DatabaseDescriptor;
-import org.apache.cassandra.dht.Range;
-import org.apache.cassandra.dht.Token;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.io.util.DataInputPlus;
 import org.apache.cassandra.io.util.DataOutputPlus;
@@ -74,9 +71,18 @@ import 
org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration;
 import org.apache.cassandra.utils.FBUtilities;
 import org.apache.cassandra.utils.concurrent.Future;
 
-import static 
org.apache.cassandra.streaming.StreamOperation.RESTORE_REPLICA_COUNT;
 import static org.apache.cassandra.locator.MetaStrategy.entireRange;
+import static 
org.apache.cassandra.streaming.StreamOperation.RESTORE_REPLICA_COUNT;
 
+/**
+ * This class is slightly different from most other MultiStepOperations in 
that it doesn't reify every component
+ * transformation when it is constructed (see how {@link BootstrapAndJoin} 
encloses its StartJoin/MidJoin/FinishJoin
+ * transforms for a counter example). Instead, each instance includes a single 
transformation with kind {@link
+ * Transformation.Kind#ADVANCE_CMS_RECONFIGURATION}, representing the _next_ 
step to be executed. That transformation
+ * instance holds all the state necessary to generate the subsequent 
ADVANCE_CMS_RECONFIGURATION step. As each of these
+ * transformations is applied, they logically progress the multi-step 
operation by installing a new ReconfigureCMS
+ * instance with the idx pointer bumped and the next step encoded.
+ */
 public class ReconfigureCMS extends 
MultiStepOperation<AdvanceCMSReconfiguration>
 {
     public static final Serializer serializer = new Serializer();
@@ -119,6 +125,7 @@ public class ReconfigureCMS extends 
MultiStepOperation<AdvanceCMSReconfiguration
     {
         return MultiStepOperation.Kind.RECONFIGURE_CMS;
     }
+
     @Override
     protected SequenceKey sequenceKey()
     {
@@ -176,6 +183,13 @@ public class ReconfigureCMS extends 
MultiStepOperation<AdvanceCMSReconfiguration
                 Replica replica = new Replica(endpoint, entireRange, true);
                 streamRanges(replica, activeTransition.streamCandidates);
             }
+            else
+            {
+                // Run a paxos repair before starting either the addition or 
removal of a CMS member, where in both
+                // cases there is no active transition.
+                repairPaxosForCMSTopologyChange();
+            }
+
             // Commit the next step in the sequence
             ClusterMetadataService.instance().commit(transitionCMS.next);
             return SequenceState.continuable();
@@ -298,17 +312,42 @@ public class ReconfigureCMS extends 
MultiStepOperation<AdvanceCMSReconfiguration
                '}';
     }
 
-    static void repairPaxosTopology()
+    static void repairPaxosForCMSTopologyChange()
     {
-        Retry.Backoff retry = new 
Retry.Backoff(TCMMetrics.instance.repairPaxosTopologyRetries);
+        // This *should* be redundant, primarily because the state machine 
which manages a node's cluster metadata
+        // doesn't rely on the distributed metadata log table directly but is 
driven by metadata replication messages
+        // which distribute log entries around the cluster.
+        //
+        // However, it is still worthwhile to guard against a failure to sync 
paxos accept/commit operations by a subset
+        // of replicas, in this case the CMS members. Paxos repair is designed 
to ensure that any operation witnessed by
+        // at least one replica prior to a topology change is witnessed by a 
majority of the replica set after the
+        // topology change. Essentially ensuring that the pre- and post- 
change quorums overlap.
+        //
+        // The way that CMS reconfiguration proceeds is to first expand the 
membership to the maximal state by adding all
+        // new members and then to shrink back down to the desired size by 
pruning out the leaving members. Each step
+        // only modifies the membership group by adding or removing a single 
member and unlike operations on other
+        // keyspaces, there are no changes to the ranges involved. As all CMS 
members replicate the entire token
+        // range for that keyspace, these ownership changes are relatively 
simple.
+        //
+        // For example, if the CMS membership is currently {1, 2, 3} and we 
want to transition it to {4, 5, 6} the
+        // reconfiguration goes through these steps:
+        // * {1, 2, 3}
+        // * {1, 2, 3, 4}
+        // * {1, 2, 3, 4, 5}
+        // * {1, 2, 3, 4, 5, 6}
+        // * {2, 3, 4, 5, 6}
+        // * {3, 4, 5, 6}
+        // * {4, 5, 6}
+        // When adding a member, the new member streams data in from an 
existing member, analogous to bootstrapping.
+        // When removing, there is no need for streaming as the existing 
members' ownership is not changing. Running a
+        // paxos repair at the beginning of each step, before streaming where 
applicable, will ensure that the
+        // overlapping quorums invariant holds.
 
-        // The system.paxos table is what we're actually repairing and that 
uses the system configured partitioner
-        // so although we use MetaStrategy.entireRange for streaming between 
CMS members, we don't use it here
-        Range<Token> entirePaxosRange = new 
Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(),
-                                                    
DatabaseDescriptor.getPartitioner().getMinimumToken());
-        List<Supplier<Future<?>>> remaining = 
ActiveRepairService.instance().repairPaxosForTopologyChangeAsync(SchemaConstants.METADATA_KEYSPACE_NAME,
-                                                                               
                                Collections.singletonList(entirePaxosRange),
-                                                                               
                                "bootstrap");
+        Retry.Backoff retry = new 
Retry.Backoff(TCMMetrics.instance.repairPaxosTopologyRetries);
+        List<Supplier<Future<?>>> remaining = ActiveRepairService.instance()
+                                                                 
.repairPaxosForTopologyChangeAsync(SchemaConstants.METADATA_KEYSPACE_NAME,
+                                                                               
                     Collections.singletonList(entireRange),
+                                                                               
                     "CMS reconfiguration");
 
         while (!retry.reachedMax())
         {
@@ -316,7 +355,6 @@ public class ReconfigureCMS extends 
MultiStepOperation<AdvanceCMSReconfiguration
             for (Supplier<Future<?>> supplier : remaining)
                 tasks.put(supplier, supplier.get());
             remaining.clear();
-            logger.info("Performing paxos topology repair on: {}", remaining);
 
             for (Map.Entry<Supplier<Future<?>>, Future<?>> e : 
tasks.entrySet())
             {
@@ -331,6 +369,7 @@ public class ReconfigureCMS extends 
MultiStepOperation<AdvanceCMSReconfiguration
                 }
                 catch (InterruptedException t)
                 {
+                    logger.info("Interrupted while repairing paxos topology, 
aborting.", t);
                     return;
                 }
             }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
index 0197106bc6..a2af4a2137 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java
@@ -34,7 +34,6 @@ import com.google.common.collect.Iterators;
 import com.google.common.collect.Sets;
 import com.google.common.util.concurrent.Uninterruptibles;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -49,6 +48,7 @@ import org.apache.cassandra.db.marshal.Int32Type;
 import org.apache.cassandra.db.partitions.PartitionIterator;
 import org.apache.cassandra.db.rows.*;
 import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.Constants;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.ICluster;
@@ -57,6 +57,8 @@ import org.apache.cassandra.distributed.api.IInstanceConfig;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.apache.cassandra.distributed.api.IMessageFilters;
 import org.apache.cassandra.gms.FailureDetector;
+import org.apache.cassandra.distributed.api.TokenSupplier;
+import org.apache.cassandra.distributed.shared.NetworkTopology;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.repair.RepairParallelism;
 import org.apache.cassandra.repair.SharedContext;
@@ -304,16 +306,17 @@ public class PaxosRepairTest extends TestBaseImpl
     }
 
 
-    @Ignore
     @Test
     public void topologyChangePaxosTest() throws Throwable
     {
         // TODO: fails with vnode enabled
-        try (Cluster cluster = 
Cluster.build(4).withConfig(WITH_NETWORK).withoutVNodes().createWithoutStarting())
+        try (Cluster cluster = builder().withNodes(3)
+                                        
.withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4))
+                                        
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0"))
+                                        .withConfig(WITH_NETWORK)
+                                        .withoutVNodes()
+                                        .start())
         {
-            for (int i=1; i<=3; i++)
-                cluster.get(i).startup();
-
             init(cluster);
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " 
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
             cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + '.' + 
TABLE + " (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM);
@@ -333,7 +336,11 @@ public class PaxosRepairTest extends TestBaseImpl
             cluster.filters().reset();
 
             // node 4 starting should repair paxos and inform the other nodes 
of its gossip state
-            cluster.get(4).startup();
+            IInstanceConfig config = cluster.newInstanceConfig()
+                                            .set("auto_bootstrap", true)
+                                            
.set(Constants.KEY_DTEST_FULL_STARTUP, true);
+            IInvokableInstance node4 = cluster.bootstrap(config);
+            node4.startup();
             Assert.assertFalse(hasUncommittedQuorum(cluster, KEYSPACE, TABLE));
         }
     }
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
index d1f983f19a..2869fe913a 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java
@@ -19,8 +19,10 @@
 package org.apache.cassandra.distributed.test.log;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
@@ -40,6 +42,8 @@ import org.apache.cassandra.locator.MetaStrategy;
 import org.apache.cassandra.schema.DistributedMetadataLogKeyspace;
 import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.paxos.Ballot;
+import org.apache.cassandra.service.paxos.PaxosRepairHistory;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
 import org.apache.cassandra.tcm.ownership.DataPlacement;
@@ -257,6 +261,66 @@ public class ReconfigureCMSTest extends FuzzTestBase
         }
     }
 
+    @Test
+    public void cmsTopologyChangePaxosTest() throws Throwable
+    {
+        // Use a 4 node cluster so we have room to decommission one node while 
still maintaining RF
+        try (Cluster cluster = builder().withNodes(4)
+                                        .withConfig(c -> 
c.with(Feature.NETWORK))
+                                        .withoutVNodes()
+                                        .start())
+        {
+            IInvokableInstance node1 = cluster.get(1);
+            IInvokableInstance node2 = cluster.get(2);
+            IInvokableInstance node3 = cluster.get(3);
+            IInvokableInstance node4 = cluster.get(4);
+
+            // no paxos repair history initially
+            PaxosRepairHistory empty = 
PaxosRepairHistory.empty(MetaStrategy.partitioner);
+            cluster.forEach(i -> assertEquals(empty, paxosRepairHistory(i)));
+
+            node1.nodetoolResult("cms", "reconfigure", 
"2").asserts().success();
+            // Nodes 3 & 4 are not involved in the first cms reconfiguration, 
so should still have no paxos repair
+            // history for the metadata log table
+            assertEquals(empty, paxosRepairHistory(node3));
+            assertEquals(empty, paxosRepairHistory(node4));
+            // Node 1 & 2 should have completed a paxos repair. For this 
keyspace, that is always over the entire
+            // range, so there is only ever a single entry in the repair 
history which equates to prh.size() == 0
+            PaxosRepairHistory node1History = paxosRepairHistory(node1);
+            assertEquals(0, node1History.size());
+            assertEquals(node1History, paxosRepairHistory(node2));
+
+            // node 1 leaving should cause a cms reconfiguration which runs a 
paxos repair which involves nodes 2 & 3
+            // does participate in while node 4 remains uninvolved.
+            node1.nodetoolResult("decommission").asserts().success();
+            assertEquals(empty, paxosRepairHistory(node4));
+
+            PaxosRepairHistory node3History = paxosRepairHistory(node3);
+            assertEquals(0, node3History.size());
+            assertEquals(node3History, paxosRepairHistory(node2));
+            // verify that the ballot for this second repair is > the one for 
the first
+            Ballot node3Ballot = 
node3History.ballotForToken(MetaStrategy.partitioner.getMinimumToken());
+            Ballot node1Ballot = 
node1History.ballotForToken(MetaStrategy.partitioner.getMinimumToken());
+            assertTrue(node3Ballot.unixMicros() > node1Ballot.unixMicros());
+        }
+    }
+
+    private PaxosRepairHistory paxosRepairHistory(IInvokableInstance instance)
+    {
+        Object[][] rows = instance.executeInternal("select points from 
system.paxos_repair_history " +
+                                                   "where keyspace_name = ? " +
+                                                   "and table_name = ?",
+                                                   
SchemaConstants.METADATA_KEYSPACE_NAME,
+                                                   
DistributedMetadataLogKeyspace.TABLE_NAME);
+
+        if (rows.length == 0)
+            return 
PaxosRepairHistory.empty(SchemaConstants.METADATA_KEYSPACE_NAME, 
DistributedMetadataLogKeyspace.TABLE_NAME);
+        assertEquals(1, rows.length);
+        //noinspection unchecked
+        List<ByteBuffer> points = (List<ByteBuffer>)rows[0][0];
+        return 
PaxosRepairHistory.fromTupleBufferList(MetaStrategy.partitioner, points);
+    }
+
     // We can't assume that nodeId matches endpoint (ie node3 = 127.0.0.3 etc)
     private Set<String> expectedCMS(Cluster cluster, int... instanceIds)
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to