This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new e6cf0c1c98 Support Restart node in Accord e6cf0c1c98 is described below commit e6cf0c1c9822529f11e20e1757c70be41ef549e4 Author: David Capwell <dcapw...@apache.org> AuthorDate: Mon Sep 30 11:41:04 2024 -0700 Support Restart node in Accord patch by David Capwell; reviewed by Alex Petrov for CASSANDRA-19969 --- .../service/accord/AccordConfigurationService.java | 34 +++++++---- .../cassandra/service/accord/AccordKeyspace.java | 42 +++++--------- .../cassandra/service/accord/AccordService.java | 16 +++--- src/java/org/apache/cassandra/tcm/Processor.java | 3 + .../distributed/test/PaxosRepair2Test.java | 1 + .../test/accord/AccordJournalIntegrationTest.java | 62 ++++++++++++++++----- .../distributed/test/log/BootWithMetadataTest.java | 4 ++ .../test/log/CoordinatorPathTestBase.java | 4 +- .../fuzz/topology/HarryTopologyMixupTest.java | 6 +- .../fuzz/topology/TopologyMixupTestBase.java | 65 ++++++++++++++++++---- .../accord/AccordConfigurationServiceTest.java | 4 +- 11 files changed, 164 insertions(+), 77 deletions(-) diff --git a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java index 09a04140c9..f52d7a935e 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java @@ -22,6 +22,7 @@ import java.util.Objects; import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -230,9 +231,19 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc state = State.LOADING; EndpointMapping snapshot = mapping; //TODO (restart): if there are topologies loaded then there is likely failures if reporting is needed, as mapping is not setup yet - diskState = diskStateManager.loadTopologies(((epoch, topology, syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant) -> { - if (topology != null) - reportTopology(topology, syncStatus == SyncStatus.NOT_STARTED); + AtomicReference<Topology> previousRef = new AtomicReference<>(null); + diskState = diskStateManager.loadTopologies(((epoch, metadata, topology, syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant) -> { + updateMapping(metadata); + reportTopology(topology, syncStatus == SyncStatus.NOT_STARTED); + Topology previous = previousRef.get(); + if (previous != null) + { + // for all nodes removed, or pending removal, mark them as removed so we don't wait on their replies + Sets.SetView<Node.Id> removedNodes = Sets.difference(previous.nodes(), topology.nodes()); + if (!removedNodes.isEmpty()) + onNodesRemoved(topology.epoch(), currentTopology(), removedNodes); + } + previousRef.set(topology); getOrCreateEpochState(epoch).setSyncStatus(syncStatus); if (syncStatus == SyncStatus.NOTIFYING) @@ -331,14 +342,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc // for all nodes removed, or pending removal, mark them as removed so we don't wait on their replies Sets.SetView<Node.Id> removedNodes = Sets.difference(current.nodes(), topology.nodes()); if (!removedNodes.isEmpty()) - { - onNodesRemoved(topology.epoch(), removedNodes); - for (Node.Id node : removedNodes) - { - if (shareShard(current, node, localId)) - AccordService.instance().tryMarkRemoved(current, node); - } - } + onNodesRemoved(topology.epoch(), current, removedNodes); } private static boolean shareShard(Topology current, Node.Id target, Node.Id self) @@ -351,7 +355,7 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc return false; } - public synchronized void onNodesRemoved(long epoch, Set<Node.Id> removed) + public synchronized void onNodesRemoved(long epoch, Topology current, Set<Node.Id> removed) { if (removed.isEmpty()) return; syncPropagator.onNodesRemoved(removed); @@ -361,6 +365,12 @@ public class AccordConfigurationService extends AbstractConfigurationService<Acc receiveRemoteSyncCompletePreListenerNotify(node, oldEpoch); } listeners.forEach(l -> l.onRemoveNodes(epoch, removed)); + + for (Node.Id node : removed) + { + if (shareShard(current, node, localId)) + AccordService.instance().tryMarkRemoved(current, node); + } } private long[] nonCompletedEpochsBefore(long max) diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java index d38a3dfb98..f39fdbf902 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java +++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java @@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; +import org.apache.cassandra.tcm.ClusterMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,7 +138,6 @@ import org.apache.cassandra.service.accord.serializers.AccordRoutingKeyByteSourc import org.apache.cassandra.service.accord.serializers.CommandSerializers; import org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer; import org.apache.cassandra.service.accord.serializers.KeySerializers; -import org.apache.cassandra.service.accord.serializers.TopologySerializers; import org.apache.cassandra.utils.Clock.Global; import org.apache.cassandra.utils.CloseableIterator; import org.apache.cassandra.utils.btree.BTree; @@ -264,7 +264,6 @@ public class AccordKeyspace public static class LocalVersionedSerializers { static final LocalVersionedSerializer<StoreParticipants> participants = localSerializer(CommandSerializers.participants); - static final LocalVersionedSerializer<Topology> topology = localSerializer(TopologySerializers.topology); private static <T> LocalVersionedSerializer<T> localSerializer(IVersionedSerializer<T> serializer) { @@ -708,7 +707,6 @@ public class AccordKeyspace "accord topologies", "CREATE TABLE %s (" + "epoch bigint primary key, " + - "topology blob, " + "sync_state int, " + "pending_sync_notify set<int>, " + // nodes that need to be told we're synced "remote_sync_complete set<int>, " + // nodes that have told us they're synced @@ -1387,22 +1385,7 @@ public class AccordKeyspace public static EpochDiskState saveTopology(Topology topology, EpochDiskState diskState) { - diskState = maybeUpdateMaxEpoch(diskState, topology.epoch()); - - try - { - String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + - "SET topology=? WHERE epoch=?"; - executeInternal(cql, - serialize(topology, LocalVersionedSerializers.topology), topology.epoch()); - flush(Topologies); - } - catch (IOException e) - { - throw new UncheckedIOException(e); - } - - return diskState; + return maybeUpdateMaxEpoch(diskState, topology.epoch()); } public static EpochDiskState markRemoteTopologySync(Node.Id node, long epoch, EpochDiskState diskState) @@ -1487,21 +1470,26 @@ public class AccordKeyspace public interface TopologyLoadConsumer { - void load(long epoch, Topology topology, SyncStatus syncStatus, Set<Node.Id> pendingSyncNotify, Set<Node.Id> remoteSyncComplete, Ranges closed, Ranges redundant); + void load(long epoch, ClusterMetadata metadata, Topology topology, SyncStatus syncStatus, Set<Node.Id> pendingSyncNotify, Set<Node.Id> remoteSyncComplete, Ranges closed, Ranges redundant); } @VisibleForTesting - public static void loadEpoch(long epoch, TopologyLoadConsumer consumer) throws IOException + public static void loadEpoch(long epoch, ClusterMetadata metadata, TopologyLoadConsumer consumer) throws IOException { + Topology topology = AccordTopology.createAccordTopology(metadata); + String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + ' ' + "WHERE epoch=?"; UntypedResultSet result = executeInternal(cql, epoch); + if (result.isEmpty()) + { + // topology updates disk state for epoch but doesn't save the topology to the table, so there maybe an epoch we know about, but no fields are present + consumer.load(epoch, metadata, topology, SyncStatus.NOT_STARTED, Collections.emptySet(), Collections.emptySet(), Ranges.EMPTY, Ranges.EMPTY); + return; + } checkState(!result.isEmpty(), "Nothing found for epoch %d", epoch); UntypedResultSet.Row row = result.one(); - Topology topology = row.has("topology") - ? deserialize(row.getBytes("topology"), LocalVersionedSerializers.topology) - : null; SyncStatus syncStatus = row.has("sync_state") ? SyncStatus.values()[row.getInt("sync_state")] @@ -1515,7 +1503,7 @@ public class AccordKeyspace Ranges closed = row.has("closed") ? blobMapToRanges(row.getMap("closed", BytesType.instance, BytesType.instance)) : Ranges.EMPTY; Ranges redundant = row.has("redundant") ? blobMapToRanges(row.getMap("redundant", BytesType.instance, BytesType.instance)) : Ranges.EMPTY; - consumer.load(epoch, topology, syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant); + consumer.load(epoch, metadata, topology, syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant); } public static EpochDiskState loadTopologies(TopologyLoadConsumer consumer) @@ -1526,8 +1514,8 @@ public class AccordKeyspace if (diskState == null) return EpochDiskState.EMPTY; - for (long epoch=diskState.minEpoch; epoch<=diskState.maxEpoch; epoch++) - loadEpoch(epoch, consumer); + for (ClusterMetadata metadata : AccordService.tcmLoadRange(diskState.minEpoch, diskState.maxEpoch)) + loadEpoch(metadata.epoch.getEpoch(), metadata, consumer); return diskState; } diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index 4bb46424fb..4095c31c67 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -443,10 +443,9 @@ public class AccordService implements IAccordService, Shutdownable class Ref { List<ClusterMetadata> historic = Collections.emptyList();} Ref ref = new Ref(); configService.start((optMaxEpoch -> { - // when max epoch isn't know, this means the node started for the first time; check cluster's min epoch - // when max epoch is known, then there is no reason to discover min epoch (we already did it) - if (optMaxEpoch.isPresent()) return; - List<ClusterMetadata> historic = ref.historic = discoverHistoric(node, cms); + List<ClusterMetadata> historic = ref.historic = !optMaxEpoch.isEmpty() + ? tcmLoadRange(optMaxEpoch.getAsLong(), Long.MAX_VALUE) + : discoverHistoric(node, cms); for (ClusterMetadata m : historic) configService.reportMetadataInternal(m); })); @@ -531,14 +530,17 @@ public class AccordService implements IAccordService, Shutdownable return tcmLoadRange(minEpoch, current.epoch.getEpoch()); } - private static List<ClusterMetadata> tcmLoadRange(long min, long max) + public static List<ClusterMetadata> tcmLoadRange(long min, long max) { - List<ClusterMetadata> afterLoad = ClusterMetadataService.instance().processor().reconstructFull(Epoch.create(min - 1), Epoch.create(max)); + List<ClusterMetadata> afterLoad = ClusterMetadataService.instance().processor().reconstructFull(Epoch.create(min), Epoch.create(max)); + if (Invariants.isParanoid()) + assert afterLoad.get(0).epoch.getEpoch() == min : String.format("Unexpected epoch: expected %d but given %d", min, afterLoad.get(0).epoch.getEpoch()); while (!afterLoad.isEmpty() && afterLoad.get(0).epoch.getEpoch() < min) afterLoad.remove(0); assert !afterLoad.isEmpty() : String.format("TCM was unable to return the needed epochs: %d -> %d", min, max); assert afterLoad.get(0).epoch.getEpoch() == min : String.format("Unexpected epoch: expected %d but given %d", min, afterLoad.get(0).epoch.getEpoch()); - assert afterLoad.get(afterLoad.size() - 1).epoch.getEpoch() == max : String.format("Unexpected epoch: expected %d but given %d", max, afterLoad.get(afterLoad.size() - 1).epoch.getEpoch()); + if (max != Long.MAX_VALUE) + assert afterLoad.get(afterLoad.size() - 1).epoch.getEpoch() == max : String.format("Unexpected epoch: expected %d but given %d", max, afterLoad.get(afterLoad.size() - 1).epoch.getEpoch()); return afterLoad; } diff --git a/src/java/org/apache/cassandra/tcm/Processor.java b/src/java/org/apache/cassandra/tcm/Processor.java index 96cc42c066..5558b253d6 100644 --- a/src/java/org/apache/cassandra/tcm/Processor.java +++ b/src/java/org/apache/cassandra/tcm/Processor.java @@ -19,6 +19,7 @@ package org.apache.cassandra.tcm; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; @@ -83,8 +84,10 @@ public interface Processor { LogState logState = reconstruct(lowEpoch, highEpoch, Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS), TCMMetrics.instance.commitRetries)); + if (logState.isEmpty()) return Collections.emptyList(); List<ClusterMetadata> cms = new ArrayList<>(logState.entries.size()); ClusterMetadata accum = logState.baseState; + cms.add(accum); for (Entry entry : logState.entries) { Transformation.Result res = entry.transform.execute(accum); diff --git a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java index ed066b2c3b..88ef1af750 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java +++ b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java @@ -189,6 +189,7 @@ public class PaxosRepair2Test extends TestBaseImpl Ballot staleBallot = Paxos.newBallot(Ballot.none(), org.apache.cassandra.db.ConsistencyLevel.SERIAL); try (Cluster cluster = init(Cluster.create(3, cfg -> cfg .set("paxos_variant", "v2") + .set("accord.enabled", false) // this test monkeys with TCM which can cause confussion for Accord while it fetches epochs... .set("paxos_purge_grace_period", "0s") .set("truncate_request_timeout_in_ms", 1000L))) ) diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java index 66d677080e..19a675b8d2 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java @@ -18,11 +18,13 @@ package org.apache.cassandra.distributed.test.accord; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.cassandra.distributed.api.IInvokableInstance; import org.junit.Assert; import org.junit.Test; @@ -35,6 +37,9 @@ import org.apache.cassandra.distributed.test.TestBaseImpl; import org.apache.cassandra.service.accord.AccordService; import org.apache.cassandra.utils.concurrent.CountDownLatch; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; + public class AccordJournalIntegrationTest extends TestBaseImpl { @Test @@ -45,8 +50,7 @@ public class AccordJournalIntegrationTest extends TestBaseImpl .withoutVNodes() .start())) { - final String TABLE = KEYSPACE + ".test_table"; - cluster.schemaChange("CREATE TABLE " + TABLE + " (k int, c int, v int, primary key (k, c)) WITH transactional_mode='full'"); + final String TABLE = createTable(cluster); List<Thread> threads = new ArrayList<>(); int numThreads = 10; CountDownLatch latch = CountDownLatch.newCountDownLatch(numThreads); @@ -94,18 +98,9 @@ public class AccordJournalIntegrationTest extends TestBaseImpl .start()) { cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};"); - final String TABLE = KEYSPACE + ".test_table"; - cluster.schemaChange("CREATE TABLE " + TABLE + " (k int, c int, v int, primary key (k, c)) WITH transactional_mode='full'"); + final String TABLE = createTable(cluster); - for (int j = 0; j < 1_000; j++) - { - cluster.coordinator(1).execute("BEGIN TRANSACTION\n" + - "INSERT INTO " + TABLE + "(k, c, v) VALUES (?, ?, ?);\n" + - "COMMIT TRANSACTION", - ConsistencyLevel.ALL, - j, j, 1 - ); - } + insertData(cluster, TABLE); Object[][] before = cluster.coordinator(1).execute("SELECT * FROM " + TABLE + " WHERE k = ?;", ConsistencyLevel.SERIAL, 1); @@ -122,4 +117,45 @@ public class AccordJournalIntegrationTest extends TestBaseImpl } } } + + @Test + public void restartWithEpochChanges() throws IOException + { + try (Cluster cluster = Cluster.build(3).withoutVNodes().withConfig(c -> c.with(GOSSIP).with(NETWORK)).start()) + { + init(cluster); + final String TABLE = createTable(cluster); + cluster.get(1).nodetoolResult("cms", "reconfigure", "3").asserts().success(); + + insertData(cluster, TABLE); + + IInvokableInstance restartNode = cluster.get(1); + ClusterUtils.stopUnchecked(restartNode); + + // make epoch changes + for (int i = 0; i < 10; i++) + cluster.schemaChange("ALTER TABLE " + TABLE + " WITH comment = 'change " + i + "'", true, cluster.get(2)); + + restartNode.startup(); + insertData(cluster, TABLE); + } + } + + private void insertData(Cluster cluster, String TABLE) { + for (int j = 0; j < 1_000; j++) + { + cluster.coordinator(1).execute("BEGIN TRANSACTION\n" + + "INSERT INTO " + TABLE + "(k, c, v) VALUES (?, ?, ?);\n" + + "COMMIT TRANSACTION", + ConsistencyLevel.ALL, + j, j, 1 + ); + } + } + + private String createTable(Cluster cluster) { + final String TABLE = KEYSPACE + ".test_table"; + cluster.schemaChange("CREATE TABLE " + TABLE + " (k int, c int, v int, primary key (k, c)) WITH transactional_mode='full'"); + return TABLE; + } } \ No newline at end of file diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/BootWithMetadataTest.java b/test/distributed/org/apache/cassandra/distributed/test/log/BootWithMetadataTest.java index fc6dad5adf..ae269dbdd6 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/BootWithMetadataTest.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/BootWithMetadataTest.java @@ -52,6 +52,8 @@ public class BootWithMetadataTest extends TestBaseImpl public void resetTest() throws IOException, ExecutionException, InterruptedException { try (Cluster cluster = init(builder().withNodes(3) + // Accord tracks epochs, and if the expose no longer exist it is not able to process anything, causing it to crash... + .withConfig(c -> c.set("accord.enabled", false)) .start())) { long epoch = 0; @@ -94,6 +96,8 @@ public class BootWithMetadataTest extends TestBaseImpl public void newCMSTest() throws IOException, ExecutionException, InterruptedException { try (Cluster cluster = init(builder().withNodes(4) + // Accord tracks epochs, and if the expose no longer exist it is not able to process anything, causing it to crash... + .withConfig(c -> c.set("accord.enabled", false)) .start())) { for (int i = 0; i < 10; i++) diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java index 59121b0aaa..59fdc0e6e4 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java @@ -127,7 +127,9 @@ public abstract class CoordinatorPathTestBase extends FuzzTestBase try (Cluster cluster = builder().withNodes(1) .withConfig(cfg -> cfg.set("seed_provider", new ParameterizedClass(SimpleSeedProvider.class.getName(), - Collections.singletonMap("seeds", fakeCmsNode.id() + ":7012")))) + Collections.singletonMap("seeds", fakeCmsNode.id() + ":7012"))) + // Accord depends on Processor.reconstruct, but those verbs are not simulated, causing the tests to fail + .set("accord.enabled", false)) .withTokenSupplier(factory) .withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(10, "dc0", "rack0")) .createWithoutStarting(); diff --git a/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java index 11fce664a1..9fe1039e99 100644 --- a/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java +++ b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java @@ -63,7 +63,7 @@ public class HarryTopologyMixupTest extends TopologyMixupTestBase<HarryTopologyM { // do one last read just to make sure we validate the data... var harry = state.schemaSpec.harry; - harry.validateAll(harry.quiescentLocalChecker()); + harry.validateAll(harry.quiescentChecker()); } } @@ -72,7 +72,7 @@ public class HarryTopologyMixupTest extends TopologyMixupTestBase<HarryTopologyM ReplayingHistoryBuilder harry = HarryHelper.dataGen(rs.nextLong(), new InJvmSut(cluster), new TokenPlacementModel.SimpleReplicationFactor(3), - SystemUnderTest.ConsistencyLevel.ALL); + SystemUnderTest.ConsistencyLevel.QUORUM); cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};", HarryHelper.KEYSPACE)); var schema = harry.schema(); cluster.schemaChange(schema.compile().cql()); @@ -101,7 +101,7 @@ public class HarryTopologyMixupTest extends TopologyMixupTestBase<HarryTopologyM ((HarryState) state).numInserts++; }); Command<State<Spec>, Void, ?> validateAll = new HarryCommand(state -> "Harry Validate All" + state.commandNamePostfix(), state -> { - spec.harry.validateAll(spec.harry.quiescentLocalChecker()); + spec.harry.validateAll(spec.harry.quiescentChecker()); ((HarryState) state).numInserts = 0; }); return (rs, state) -> { diff --git a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java index fd669340ae..5d974a4f17 100644 --- a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java +++ b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java @@ -104,9 +104,9 @@ public abstract class TopologyMixupTestBase<S extends TopologyMixupTestBase.Sche AddNode, RemoveNode, HostReplace, + StopNode, + StartNode, //TODO (coverage): add the following states once supported -// StopNode, -// StartNode, // MoveToken //TODO (coverage): node migrate to another rack or dc (unsupported on trunk as of this writing, but planned work for TCM) // MoveNodeToNewRack, @@ -128,7 +128,7 @@ public abstract class TopologyMixupTestBase<S extends TopologyMixupTestBase.Sche private Command<State<S>, Void, ?> repairCommand(int toCoordinate) { return new SimpleCommand<>(state -> "nodetool repair " + state.schemaSpec.keyspaceName() + ' ' + state.schemaSpec.name() + " from node" + toCoordinate + state.commandNamePostfix(), - state -> state.cluster.get(toCoordinate).nodetoolResult("repair", state.schemaSpec.keyspaceName(), state.schemaSpec.name()).asserts().success()); + state -> state.cluster.get(toCoordinate).nodetoolResult("repair", state.schemaSpec.keyspaceName(), state.schemaSpec.name(), "--force").asserts().success()); } private Command<State<S>, Void, ?> waitForCMSToQuiesce() @@ -137,6 +137,29 @@ public abstract class TopologyMixupTestBase<S extends TopologyMixupTestBase.Sche state -> ClusterUtils.waitForCMSToQuiesce(state.cluster, state.cmsGroup)); } + private Command<State<S>, Void, ?> stopInstance(RandomSource rs, State<S> state) + { + int toStop = rs.pickInt(state.topologyHistory.up()); + return stopInstance(toStop, "Normal Stop"); + } + + private Command<State<S>, Void, ?> startInstance(RandomSource rs, State<S> state) + { + int toStop = rs.pickInt(state.topologyHistory.down()); + return startInstance(toStop); + } + + private Command<State<S>, Void, ?> startInstance(int toStart) + { + return new SimpleCommand<>(state -> "Start Node" + toStart + state.commandNamePostfix(), + state -> { + IInvokableInstance inst = state.cluster.get(toStart); + TopologyHistory.Node node = state.topologyHistory.node(toStart); + inst.startup(); + node.up(); + }); + } + private Command<State<S>, Void, ?> stopInstance(int toRemove, String why) { return new SimpleCommand<>(state -> "Stop Node" + toRemove + " for " + why + state.commandNamePostfix(), @@ -256,10 +279,7 @@ public abstract class TopologyMixupTestBase<S extends TopologyMixupTestBase.Sche TopologyHistory.Node adding = state.topologyHistory.replace(nodeToReplace); TopologyHistory.Node removing = state.topologyHistory.nodes.get(nodeToReplace); - return multistep(new SimpleCommand<>("Stop Node" + nodeToReplace + " for HostReplace; Node" + adding.id + state.commandNamePostfix(), s2 -> { - ClusterUtils.stopUnchecked(toReplace); - removing.down(); - }), + return multistep(stopInstance(nodeToReplace, "HostReplace; Node" + adding.id), new SimpleCommand<>("Host Replace Node" + nodeToReplace + "; Node" + adding.id + state.commandNamePostfix(), s2 -> { logger.info("node{} starting host replacement; epoch={}", adding.id, HackSerialization.tcmEpochAndSync(s2.cluster.getFirstRunningInstance())); removing.status = TopologyHistory.Node.Status.BeingReplaced; @@ -314,15 +334,20 @@ public abstract class TopologyMixupTestBase<S extends TopologyMixupTestBase.Sche EnumSet<TopologyChange> possibleTopologyChanges = EnumSet.noneOf(TopologyChange.class); // up or down is logically more correct, but since this runs sequentially and after the topology changes are complete, we don't have downed nodes at this point // so up is enough to know the topology size - int size = state.topologyHistory.up().length; - if (size < state.topologyHistory.maxNodes) + int up = state.topologyHistory.up().length; + int down = state.topologyHistory.down().length; + int total = up + down; + if (total < state.topologyHistory.maxNodes) possibleTopologyChanges.add(TopologyChange.AddNode); - if (size > state.topologyHistory.quorum()) + if (up > state.topologyHistory.quorum()) { - if (size > TARGET_RF) + if (up > TARGET_RF) possibleTopologyChanges.add(TopologyChange.RemoveNode); possibleTopologyChanges.add(TopologyChange.HostReplace); + possibleTopologyChanges.add(TopologyChange.StopNode); } + if (down > 0) + possibleTopologyChanges.add(TopologyChange.StartNode); return possibleTopologyChanges; } @@ -342,6 +367,12 @@ public abstract class TopologyMixupTestBase<S extends TopologyMixupTestBase.Sche case HostReplace: possible.put(rs -> multistep(hostReplace(rs, state), waitForCMSToQuiesce()), 1); break; + case StartNode: + possible.put(rs -> startInstance(rs, state), 1); + break; + case StopNode: + possible.put(rs -> stopInstance(rs, state), 1); + break; default: throw new UnsupportedOperationException(task.name()); } @@ -568,11 +599,21 @@ public abstract class TopologyMixupTestBase<S extends TopologyMixupTestBase.Sche } public int[] up() + { + return nodes(Node.Status.Up); + } + + public int[] down() + { + return nodes(Node.Status.Down); + } + + private int[] nodes(Node.Status target) { IntArrayList up = new IntArrayList(nodes.size(), -1); for (Map.Entry<Integer, Node> n : nodes.entrySet()) { - if (n.getValue().status == Node.Status.Up) + if (n.getValue().status == target) up.add(n.getKey()); } int[] ints = up.toIntArray(); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java index 94e181a853..9daa1bb9fa 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java @@ -185,7 +185,7 @@ public class AccordConfigurationServiceTest Topology topology1 = new Topology(1, new Shard(AccordTopology.fullRange(TBL1), ID_LIST, ID_SET)); service.reportTopology(topology1); - loadEpoch(1, (epoch, topology, syncStatus, pendingSync, remoteSync, closed, redundant) -> { + loadEpoch(1, null, (epoch, cm, topology, syncStatus, pendingSync, remoteSync, closed, redundant) -> { Assert.assertEquals(topology1, topology); Assert.assertTrue(remoteSync.isEmpty()); }); @@ -193,7 +193,7 @@ public class AccordConfigurationServiceTest service.receiveRemoteSyncComplete(ID1, 1); service.receiveRemoteSyncComplete(ID2, 1); - loadEpoch(1, (epoch, topology, syncStatus, pendingSync, remoteSync, closed, redundant) -> { + loadEpoch(1, null, (epoch, cm, topology, syncStatus, pendingSync, remoteSync, closed, redundant) -> { Assert.assertEquals(topology1, topology); Assert.assertEquals(Sets.newHashSet(ID1, ID2), remoteSync); }); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org