This is an automated email from the ASF dual-hosted git repository. samt pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 51e01a3862 Repair Paxos for the distributed metadata log when CMS membership changes 51e01a3862 is described below commit 51e01a3862afc1ebaffd765b2490a581461c3f14 Author: Sam Tunnicliffe <s...@apache.org> AuthorDate: Thu Mar 20 17:55:44 2025 +0000 Repair Paxos for the distributed metadata log when CMS membership changes Patch by Josh McKenzie and Sam Tunnicliffe; reviewed by Marcus Ericksson for CASSANDRA-20467 Co-authored-by: Josh McKenzie <jmcken...@apache.org> Co-authored-by: Sam Tunnicliffe <s...@apache.org> --- CHANGES.txt | 1 + .../apache/cassandra/db/DiskBoundaryManager.java | 2 +- .../apache/cassandra/tcm/MultiStepOperation.java | 4 +- .../org/apache/cassandra/tcm/Transformation.java | 6 ++ .../apache/cassandra/tcm/sequences/AddToCMS.java | 2 +- .../cassandra/tcm/sequences/ReconfigureCMS.java | 67 +++++++++++++++++----- .../distributed/test/PaxosRepairTest.java | 21 ++++--- .../distributed/test/log/ReconfigureCMSTest.java | 64 +++++++++++++++++++++ 8 files changed, 142 insertions(+), 25 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 98389b77cc..1360ab01bc 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Repair Paxos for the distributed metadata log when CMS membership changes (CASSANDRA-20467) * Reintroduce CASSANDRA-17411 in trunk (CASSANDRA-19346) * Add min/max/mean/percentiles to timer metrics vtable (CASSANDRA-20466) * Add support for time, date, timestamp types in scalar constraint (CASSANDRA-20274) diff --git a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java index 2a25dc4efc..5c6b59a0b1 100644 --- a/src/java/org/apache/cassandra/db/DiskBoundaryManager.java +++ b/src/java/org/apache/cassandra/db/DiskBoundaryManager.java @@ -152,7 +152,7 @@ public class DiskBoundaryManager } else { - // Reason we use use the future settled metadata is that if we decommission a node, we want to stream + // Reason we use the future settled metadata is that if we decommission a node, we want to stream // from that node to the correct location on disk, if we didn't, we would put new files in the wrong places. // We do this to minimize the amount of data we need to move in rebalancedisks once everything settled placement = metadata.writePlacementAllSettled(cfs.keyspace.getMetadata()); diff --git a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java index 019086dccd..d447974f85 100644 --- a/src/java/org/apache/cassandra/tcm/MultiStepOperation.java +++ b/src/java/org/apache/cassandra/tcm/MultiStepOperation.java @@ -40,7 +40,7 @@ import org.apache.cassandra.tcm.serialization.MetadataSerializer; * For example, in order to join, the joining node has to execute the following steps: * * PrepareJoin, which introduces node's tokens, but makes no changes to range ownership, and creates BootstrapAndJoin * in-progress sequence - * * StartJoin, which adds the bootstrapping node to the write placements for the ranges it gains + * * StartJoin, which adds the bootstrapping node to the write placements for the ranges it gains * * MidJoin, which adds the bootstrapping node to the read placements for the ranges it has gained, and removes * owners of these ranges from the read placements * * FinishJoin, which removes owners of the gained ranges from the write placements. @@ -126,7 +126,7 @@ public abstract class MultiStepOperation<CONTEXT> /** * Returns the {@link Transformation.Kind} of the next step due to be executed in the sequence. Used when executing - * a {@link Transformation} which is part of a sequence (specifically, subclasses of + * a {@link Transformation} which is part of a sequence (often, this is an implementation of * {@link org.apache.cassandra.tcm.transformations.ApplyPlacementDeltas}) to validate that it is being applied at * the correct point (i.e. that the type of the transform matches the expected next) * matches the If all steps diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java b/src/java/org/apache/cassandra/tcm/Transformation.java index e4b81bc765..8cfda01e26 100644 --- a/src/java/org/apache/cassandra/tcm/Transformation.java +++ b/src/java/org/apache/cassandra/tcm/Transformation.java @@ -48,6 +48,12 @@ import org.apache.cassandra.tcm.transformations.cms.RemoveFromCMS; import org.apache.cassandra.tcm.transformations.cms.StartAddToCMS; import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration; +/** + * Implementations should be pure transformations from one ClusterMetadata state to another. They are likely to be + * replayed during startup to rebuild the node's current state and so should be free of side effects and should not + * depend on external state, configuration or resources. They must produce consistent outputs when run on every instance + * in a cluster, regardless of any specific characteristics of the instance. + */ public interface Transformation { Serializer transformationSerializer = new Serializer(); diff --git a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java index 149c99b8b2..0d5ee2f067 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java +++ b/src/java/org/apache/cassandra/tcm/sequences/AddToCMS.java @@ -82,7 +82,7 @@ public class AddToCMS extends MultiStepOperation<Epoch> .commit(new StartAddToCMS(addr)) .inProgressSequences.get(nodeId); InProgressSequences.resume(sequence); - ReconfigureCMS.repairPaxosTopology(); + ReconfigureCMS.repairPaxosForCMSTopologyChange(); } public AddToCMS(Epoch latestModification, diff --git a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java index 57b5e68e80..38566812bf 100644 --- a/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java +++ b/src/java/org/apache/cassandra/tcm/sequences/ReconfigureCMS.java @@ -33,9 +33,6 @@ import com.google.common.collect.ImmutableSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.cassandra.config.DatabaseDescriptor; -import org.apache.cassandra.dht.Range; -import org.apache.cassandra.dht.Token; import org.apache.cassandra.gms.FailureDetector; import org.apache.cassandra.io.util.DataInputPlus; import org.apache.cassandra.io.util.DataOutputPlus; @@ -74,9 +71,18 @@ import org.apache.cassandra.tcm.transformations.cms.PrepareCMSReconfiguration; import org.apache.cassandra.utils.FBUtilities; import org.apache.cassandra.utils.concurrent.Future; -import static org.apache.cassandra.streaming.StreamOperation.RESTORE_REPLICA_COUNT; import static org.apache.cassandra.locator.MetaStrategy.entireRange; +import static org.apache.cassandra.streaming.StreamOperation.RESTORE_REPLICA_COUNT; +/** + * This class is slightly different from most other MultiStepOperations in that it doesn't reify every component + * transformation when it is constructed (see how {@link BootstrapAndJoin} encloses its StartJoin/MidJoin/FinishJoin + * transforms for a counter example). Instead, each instance includes a single transformation with kind {@link + * Transformation.Kind#ADVANCE_CMS_RECONFIGURATION}, representing the _next_ step to be executed. That transformation + * instance holds all the state necessary to generate the subsequent ADVANCE_CMS_RECONFIGURATION step. As each of these + * transformations is applied, they logically progress the multi-step operation by installing a new ReconfigureCMS + * instance with the idx pointer bumped and the next step encoded. + */ public class ReconfigureCMS extends MultiStepOperation<AdvanceCMSReconfiguration> { public static final Serializer serializer = new Serializer(); @@ -119,6 +125,7 @@ public class ReconfigureCMS extends MultiStepOperation<AdvanceCMSReconfiguration { return MultiStepOperation.Kind.RECONFIGURE_CMS; } + @Override protected SequenceKey sequenceKey() { @@ -176,6 +183,13 @@ public class ReconfigureCMS extends MultiStepOperation<AdvanceCMSReconfiguration Replica replica = new Replica(endpoint, entireRange, true); streamRanges(replica, activeTransition.streamCandidates); } + else + { + // Run a paxos repair before starting either the addition or removal of a CMS member, where in both + // cases there is no active transition. + repairPaxosForCMSTopologyChange(); + } + // Commit the next step in the sequence ClusterMetadataService.instance().commit(transitionCMS.next); return SequenceState.continuable(); @@ -298,17 +312,42 @@ public class ReconfigureCMS extends MultiStepOperation<AdvanceCMSReconfiguration '}'; } - static void repairPaxosTopology() + static void repairPaxosForCMSTopologyChange() { - Retry.Backoff retry = new Retry.Backoff(TCMMetrics.instance.repairPaxosTopologyRetries); + // This *should* be redundant, primarily because the state machine which manages a node's cluster metadata + // doesn't rely on the distributed metadata log table directly but is driven by metadata replication messages + // which distribute log entries around the cluster. + // + // However, it is still worthwhile to guard against a failure to sync paxos accept/commit operations by a subset + // of replicas, in this case the CMS members. Paxos repair is designed to ensure that any operation witnessed by + // at least one replica prior to a topology change is witnessed by a majority of the replica set after the + // topology change. Essentially ensuring that the pre- and post- change quorums overlap. + // + // The way that CMS reconfiguration proceeds is to first expand the membership to the maximal state by adding all + // new members and then to shrink back down to the desired size by pruning out the leaving members. Each step + // only modifies the membership group by adding or removing a single member and unlike operations on other + // keyspaces, there are no changes to the ranges involved. As all CMS members replicate the entire token + // range for that keyspace, these ownership changes are relatively simple. + // + // For example, if the CMS membership is currently {1, 2, 3} and we want to transition it to {4, 5, 6} the + // reconfiguration goes through these steps: + // * {1, 2, 3} + // * {1, 2, 3, 4} + // * {1, 2, 3, 4, 5} + // * {1, 2, 3, 4, 5, 6} + // * {2, 3, 4, 5, 6} + // * {3, 4, 5, 6} + // * {4, 5, 6} + // When adding a member, the new member streams data in from an existing member, analogous to bootstrapping. + // When removing, there is no need for streaming as the existing members' ownership is not changing. Running a + // paxos repair at the beginning of each step, before streaming where applicable, will ensure that the + // overlapping quorums invariant holds. - // The system.paxos table is what we're actually repairing and that uses the system configured partitioner - // so although we use MetaStrategy.entireRange for streaming between CMS members, we don't use it here - Range<Token> entirePaxosRange = new Range<>(DatabaseDescriptor.getPartitioner().getMinimumToken(), - DatabaseDescriptor.getPartitioner().getMinimumToken()); - List<Supplier<Future<?>>> remaining = ActiveRepairService.instance().repairPaxosForTopologyChangeAsync(SchemaConstants.METADATA_KEYSPACE_NAME, - Collections.singletonList(entirePaxosRange), - "bootstrap"); + Retry.Backoff retry = new Retry.Backoff(TCMMetrics.instance.repairPaxosTopologyRetries); + List<Supplier<Future<?>>> remaining = ActiveRepairService.instance() + .repairPaxosForTopologyChangeAsync(SchemaConstants.METADATA_KEYSPACE_NAME, + Collections.singletonList(entireRange), + "CMS reconfiguration"); while (!retry.reachedMax()) { @@ -316,7 +355,6 @@ public class ReconfigureCMS extends MultiStepOperation<AdvanceCMSReconfiguration for (Supplier<Future<?>> supplier : remaining) tasks.put(supplier, supplier.get()); remaining.clear(); - logger.info("Performing paxos topology repair on: {}", remaining); for (Map.Entry<Supplier<Future<?>>, Future<?>> e : tasks.entrySet()) { @@ -331,6 +369,7 @@ public class ReconfigureCMS extends MultiStepOperation<AdvanceCMSReconfiguration } catch (InterruptedException t) { + logger.info("Interrupted while repairing paxos topology, aborting.", t); return; } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java index 0197106bc6..a2af4a2137 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepairTest.java @@ -34,7 +34,6 @@ import com.google.common.collect.Iterators; import com.google.common.collect.Sets; import com.google.common.util.concurrent.Uninterruptibles; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,6 +48,7 @@ import org.apache.cassandra.db.marshal.Int32Type; import org.apache.cassandra.db.partitions.PartitionIterator; import org.apache.cassandra.db.rows.*; import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.Constants; import org.apache.cassandra.distributed.api.ConsistencyLevel; import org.apache.cassandra.distributed.api.Feature; import org.apache.cassandra.distributed.api.ICluster; @@ -57,6 +57,8 @@ import org.apache.cassandra.distributed.api.IInstanceConfig; import org.apache.cassandra.distributed.api.IInvokableInstance; import org.apache.cassandra.distributed.api.IMessageFilters; import org.apache.cassandra.gms.FailureDetector; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.distributed.shared.NetworkTopology; import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.repair.RepairParallelism; import org.apache.cassandra.repair.SharedContext; @@ -304,16 +306,17 @@ public class PaxosRepairTest extends TestBaseImpl } - @Ignore @Test public void topologyChangePaxosTest() throws Throwable { // TODO: fails with vnode enabled - try (Cluster cluster = Cluster.build(4).withConfig(WITH_NETWORK).withoutVNodes().createWithoutStarting()) + try (Cluster cluster = builder().withNodes(3) + .withTokenSupplier(TokenSupplier.evenlyDistributedTokens(4)) + .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(4, "dc0", "rack0")) + .withConfig(WITH_NETWORK) + .withoutVNodes() + .start()) { - for (int i=1; i<=3; i++) - cluster.get(i).startup(); - init(cluster); cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); cluster.coordinator(1).execute("INSERT INTO " + KEYSPACE + '.' + TABLE + " (pk, ck, v) VALUES (1, 1, 1) IF NOT EXISTS", ConsistencyLevel.QUORUM); @@ -333,7 +336,11 @@ public class PaxosRepairTest extends TestBaseImpl cluster.filters().reset(); // node 4 starting should repair paxos and inform the other nodes of its gossip state - cluster.get(4).startup(); + IInstanceConfig config = cluster.newInstanceConfig() + .set("auto_bootstrap", true) + .set(Constants.KEY_DTEST_FULL_STARTUP, true); + IInvokableInstance node4 = cluster.bootstrap(config); + node4.startup(); Assert.assertFalse(hasUncommittedQuorum(cluster, KEYSPACE, TABLE)); } } diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java index d1f983f19a..2869fe913a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ReconfigureCMSTest.java @@ -19,8 +19,10 @@ package org.apache.cassandra.distributed.test.log; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashSet; +import java.util.List; import java.util.Random; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -40,6 +42,8 @@ import org.apache.cassandra.locator.MetaStrategy; import org.apache.cassandra.schema.DistributedMetadataLogKeyspace; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.service.paxos.Ballot; +import org.apache.cassandra.service.paxos.PaxosRepairHistory; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; import org.apache.cassandra.tcm.ownership.DataPlacement; @@ -257,6 +261,66 @@ public class ReconfigureCMSTest extends FuzzTestBase } } + @Test + public void cmsTopologyChangePaxosTest() throws Throwable + { + // Use a 4 node cluster so we have room to decommission one node while still maintaining RF + try (Cluster cluster = builder().withNodes(4) + .withConfig(c -> c.with(Feature.NETWORK)) + .withoutVNodes() + .start()) + { + IInvokableInstance node1 = cluster.get(1); + IInvokableInstance node2 = cluster.get(2); + IInvokableInstance node3 = cluster.get(3); + IInvokableInstance node4 = cluster.get(4); + + // no paxos repair history initially + PaxosRepairHistory empty = PaxosRepairHistory.empty(MetaStrategy.partitioner); + cluster.forEach(i -> assertEquals(empty, paxosRepairHistory(i))); + + node1.nodetoolResult("cms", "reconfigure", "2").asserts().success(); + // Nodes 3 & 4 are not involved in the first cms reconfiguration, so should still have no paxos repair + // history for the metadata log table + assertEquals(empty, paxosRepairHistory(node3)); + assertEquals(empty, paxosRepairHistory(node4)); + // Node 1 & 2 should have completed a paxos repair. For this keyspace, that is always over the entire + // range, so there is only ever a single entry in the repair history which equates to prh.size() == 0 + PaxosRepairHistory node1History = paxosRepairHistory(node1); + assertEquals(0, node1History.size()); + assertEquals(node1History, paxosRepairHistory(node2)); + + // node 1 leaving should cause a cms reconfiguration which runs a paxos repair which involves nodes 2 & 3 + // does participate in while node 4 remains uninvolved. + node1.nodetoolResult("decommission").asserts().success(); + assertEquals(empty, paxosRepairHistory(node4)); + + PaxosRepairHistory node3History = paxosRepairHistory(node3); + assertEquals(0, node3History.size()); + assertEquals(node3History, paxosRepairHistory(node2)); + // verify that the ballot for this second repair is > the one for the first + Ballot node3Ballot = node3History.ballotForToken(MetaStrategy.partitioner.getMinimumToken()); + Ballot node1Ballot = node1History.ballotForToken(MetaStrategy.partitioner.getMinimumToken()); + assertTrue(node3Ballot.unixMicros() > node1Ballot.unixMicros()); + } + } + + private PaxosRepairHistory paxosRepairHistory(IInvokableInstance instance) + { + Object[][] rows = instance.executeInternal("select points from system.paxos_repair_history " + + "where keyspace_name = ? " + + "and table_name = ?", + SchemaConstants.METADATA_KEYSPACE_NAME, + DistributedMetadataLogKeyspace.TABLE_NAME); + + if (rows.length == 0) + return PaxosRepairHistory.empty(SchemaConstants.METADATA_KEYSPACE_NAME, DistributedMetadataLogKeyspace.TABLE_NAME); + assertEquals(1, rows.length); + //noinspection unchecked + List<ByteBuffer> points = (List<ByteBuffer>)rows[0][0]; + return PaxosRepairHistory.fromTupleBufferList(MetaStrategy.partitioner, points); + } + // We can't assume that nodeId matches endpoint (ie node3 = 127.0.0.3 etc) private Set<String> expectedCMS(Cluster cluster, int... instanceIds) { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org