This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new e6cf0c1c98 Support Restart node in Accord
e6cf0c1c98 is described below

commit e6cf0c1c9822529f11e20e1757c70be41ef549e4
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Mon Sep 30 11:41:04 2024 -0700

    Support Restart node in Accord
    
    patch by David Capwell; reviewed by Alex Petrov for CASSANDRA-19969
---
 .../service/accord/AccordConfigurationService.java | 34 +++++++----
 .../cassandra/service/accord/AccordKeyspace.java   | 42 +++++---------
 .../cassandra/service/accord/AccordService.java    | 16 +++---
 src/java/org/apache/cassandra/tcm/Processor.java   |  3 +
 .../distributed/test/PaxosRepair2Test.java         |  1 +
 .../test/accord/AccordJournalIntegrationTest.java  | 62 ++++++++++++++++-----
 .../distributed/test/log/BootWithMetadataTest.java |  4 ++
 .../test/log/CoordinatorPathTestBase.java          |  4 +-
 .../fuzz/topology/HarryTopologyMixupTest.java      |  6 +-
 .../fuzz/topology/TopologyMixupTestBase.java       | 65 ++++++++++++++++++----
 .../accord/AccordConfigurationServiceTest.java     |  4 +-
 11 files changed, 164 insertions(+), 77 deletions(-)

diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
index 09a04140c9..f52d7a935e 100644
--- 
a/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
+++ 
b/src/java/org/apache/cassandra/service/accord/AccordConfigurationService.java
@@ -22,6 +22,7 @@ import java.util.Objects;
 import java.util.OptionalLong;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 import javax.annotation.Nullable;
@@ -230,9 +231,19 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         state = State.LOADING;
         EndpointMapping snapshot = mapping;
         //TODO (restart): if there are topologies loaded then there is likely 
failures if reporting is needed, as mapping is not setup yet
-        diskState = diskStateManager.loadTopologies(((epoch, topology, 
syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant) -> {
-            if (topology != null)
-                reportTopology(topology, syncStatus == SyncStatus.NOT_STARTED);
+        AtomicReference<Topology> previousRef = new AtomicReference<>(null);
+        diskState = diskStateManager.loadTopologies(((epoch, metadata, 
topology, syncStatus, pendingSyncNotify, remoteSyncComplete, closed, redundant) 
-> {
+            updateMapping(metadata);
+            reportTopology(topology, syncStatus == SyncStatus.NOT_STARTED);
+            Topology previous = previousRef.get();
+            if (previous != null)
+            {
+                // for all nodes removed, or pending removal, mark them as 
removed so we don't wait on their replies
+                Sets.SetView<Node.Id> removedNodes = 
Sets.difference(previous.nodes(), topology.nodes());
+                if (!removedNodes.isEmpty())
+                    onNodesRemoved(topology.epoch(), currentTopology(), 
removedNodes);
+            }
+            previousRef.set(topology);
 
             getOrCreateEpochState(epoch).setSyncStatus(syncStatus);
             if (syncStatus == SyncStatus.NOTIFYING)
@@ -331,14 +342,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         // for all nodes removed, or pending removal, mark them as removed so 
we don't wait on their replies
         Sets.SetView<Node.Id> removedNodes = Sets.difference(current.nodes(), 
topology.nodes());
         if (!removedNodes.isEmpty())
-        {
-            onNodesRemoved(topology.epoch(), removedNodes);
-            for (Node.Id node : removedNodes)
-            {
-                if (shareShard(current, node, localId))
-                    AccordService.instance().tryMarkRemoved(current, node);
-            }
-        }
+            onNodesRemoved(topology.epoch(), current, removedNodes);
     }
 
     private static boolean shareShard(Topology current, Node.Id target, 
Node.Id self)
@@ -351,7 +355,7 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
         return false;
     }
 
-    public synchronized void onNodesRemoved(long epoch, Set<Node.Id> removed)
+    public synchronized void onNodesRemoved(long epoch, Topology current, 
Set<Node.Id> removed)
     {
         if (removed.isEmpty()) return;
         syncPropagator.onNodesRemoved(removed);
@@ -361,6 +365,12 @@ public class AccordConfigurationService extends 
AbstractConfigurationService<Acc
                 receiveRemoteSyncCompletePreListenerNotify(node, oldEpoch);
         }
         listeners.forEach(l -> l.onRemoveNodes(epoch, removed));
+
+        for (Node.Id node : removed)
+        {
+            if (shareShard(current, node, localId))
+                AccordService.instance().tryMarkRemoved(current, node);
+        }
     }
 
     private long[] nonCompletedEpochsBefore(long max)
diff --git a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java 
b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
index d38a3dfb98..f39fdbf902 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordKeyspace.java
@@ -37,6 +37,7 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
+import org.apache.cassandra.tcm.ClusterMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,7 +138,6 @@ import 
org.apache.cassandra.service.accord.serializers.AccordRoutingKeyByteSourc
 import org.apache.cassandra.service.accord.serializers.CommandSerializers;
 import 
org.apache.cassandra.service.accord.serializers.CommandsForKeySerializer;
 import org.apache.cassandra.service.accord.serializers.KeySerializers;
-import org.apache.cassandra.service.accord.serializers.TopologySerializers;
 import org.apache.cassandra.utils.Clock.Global;
 import org.apache.cassandra.utils.CloseableIterator;
 import org.apache.cassandra.utils.btree.BTree;
@@ -264,7 +264,6 @@ public class AccordKeyspace
     public static class LocalVersionedSerializers
     {
         static final LocalVersionedSerializer<StoreParticipants> participants 
= localSerializer(CommandSerializers.participants);
-        static final LocalVersionedSerializer<Topology> topology = 
localSerializer(TopologySerializers.topology);
 
         private static <T> LocalVersionedSerializer<T> 
localSerializer(IVersionedSerializer<T> serializer)
         {
@@ -708,7 +707,6 @@ public class AccordKeyspace
               "accord topologies",
               "CREATE TABLE %s (" +
               "epoch bigint primary key, " +
-              "topology blob, " +
               "sync_state int, " +
               "pending_sync_notify set<int>, " + // nodes that need to be told 
we're synced
               "remote_sync_complete set<int>, " +  // nodes that have told us 
they're synced
@@ -1387,22 +1385,7 @@ public class AccordKeyspace
 
     public static EpochDiskState saveTopology(Topology topology, 
EpochDiskState diskState)
     {
-        diskState = maybeUpdateMaxEpoch(diskState, topology.epoch());
-
-        try
-        {
-            String cql = "UPDATE " + ACCORD_KEYSPACE_NAME + '.' + TOPOLOGIES + 
' ' +
-                         "SET topology=? WHERE epoch=?";
-            executeInternal(cql,
-                            serialize(topology, 
LocalVersionedSerializers.topology), topology.epoch());
-            flush(Topologies);
-        }
-        catch (IOException e)
-        {
-            throw new UncheckedIOException(e);
-        }
-
-        return diskState;
+        return maybeUpdateMaxEpoch(diskState, topology.epoch());
     }
 
     public static EpochDiskState markRemoteTopologySync(Node.Id node, long 
epoch, EpochDiskState diskState)
@@ -1487,21 +1470,26 @@ public class AccordKeyspace
 
     public interface TopologyLoadConsumer
     {
-        void load(long epoch, Topology topology, SyncStatus syncStatus, 
Set<Node.Id> pendingSyncNotify, Set<Node.Id> remoteSyncComplete, Ranges closed, 
Ranges redundant);
+        void load(long epoch, ClusterMetadata metadata, Topology topology, 
SyncStatus syncStatus, Set<Node.Id> pendingSyncNotify, Set<Node.Id> 
remoteSyncComplete, Ranges closed, Ranges redundant);
     }
 
     @VisibleForTesting
-    public static void loadEpoch(long epoch, TopologyLoadConsumer consumer) 
throws IOException
+    public static void loadEpoch(long epoch, ClusterMetadata metadata, 
TopologyLoadConsumer consumer) throws IOException
     {
+        Topology topology = AccordTopology.createAccordTopology(metadata);
+
         String cql = "SELECT * FROM " + ACCORD_KEYSPACE_NAME + '.' + 
TOPOLOGIES + ' ' +
                      "WHERE epoch=?";
 
         UntypedResultSet result = executeInternal(cql, epoch);
+        if (result.isEmpty())
+        {
+            // topology updates disk state for epoch but doesn't save the 
topology to the table, so there maybe an epoch we know about, but no fields are 
present
+            consumer.load(epoch, metadata, topology, SyncStatus.NOT_STARTED, 
Collections.emptySet(), Collections.emptySet(), Ranges.EMPTY, Ranges.EMPTY);
+            return;
+        }
         checkState(!result.isEmpty(), "Nothing found for epoch %d", epoch);
         UntypedResultSet.Row row = result.one();
-        Topology topology = row.has("topology")
-                            ? deserialize(row.getBytes("topology"), 
LocalVersionedSerializers.topology)
-                            : null;
 
         SyncStatus syncStatus = row.has("sync_state")
                                 ? SyncStatus.values()[row.getInt("sync_state")]
@@ -1515,7 +1503,7 @@ public class AccordKeyspace
         Ranges closed = row.has("closed") ? 
blobMapToRanges(row.getMap("closed", BytesType.instance, BytesType.instance)) : 
Ranges.EMPTY;
         Ranges redundant = row.has("redundant") ? 
blobMapToRanges(row.getMap("redundant", BytesType.instance, 
BytesType.instance)) : Ranges.EMPTY;
 
-        consumer.load(epoch, topology, syncStatus, pendingSyncNotify, 
remoteSyncComplete, closed, redundant);
+        consumer.load(epoch, metadata, topology, syncStatus, 
pendingSyncNotify, remoteSyncComplete, closed, redundant);
     }
 
     public static EpochDiskState loadTopologies(TopologyLoadConsumer consumer)
@@ -1526,8 +1514,8 @@ public class AccordKeyspace
             if (diskState == null)
                 return EpochDiskState.EMPTY;
 
-            for (long epoch=diskState.minEpoch; epoch<=diskState.maxEpoch; 
epoch++)
-                loadEpoch(epoch, consumer);
+            for (ClusterMetadata metadata : 
AccordService.tcmLoadRange(diskState.minEpoch, diskState.maxEpoch))
+                loadEpoch(metadata.epoch.getEpoch(), metadata, consumer);
 
             return diskState;
         }
diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java 
b/src/java/org/apache/cassandra/service/accord/AccordService.java
index 4bb46424fb..4095c31c67 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordService.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordService.java
@@ -443,10 +443,9 @@ public class AccordService implements IAccordService, 
Shutdownable
         class Ref { List<ClusterMetadata> historic = Collections.emptyList();}
         Ref ref = new Ref();
         configService.start((optMaxEpoch -> {
-            // when max epoch isn't know, this means the node started for the 
first time; check cluster's min epoch
-            // when max epoch is known, then there is no reason to discover 
min epoch (we already did it)
-            if (optMaxEpoch.isPresent()) return;
-            List<ClusterMetadata> historic = ref.historic = 
discoverHistoric(node, cms);
+            List<ClusterMetadata> historic = ref.historic = 
!optMaxEpoch.isEmpty()
+                    ? tcmLoadRange(optMaxEpoch.getAsLong(), Long.MAX_VALUE)
+                    : discoverHistoric(node, cms);
             for (ClusterMetadata m : historic)
                 configService.reportMetadataInternal(m);
         }));
@@ -531,14 +530,17 @@ public class AccordService implements IAccordService, 
Shutdownable
         return tcmLoadRange(minEpoch, current.epoch.getEpoch());
     }
 
-    private static List<ClusterMetadata> tcmLoadRange(long min, long max)
+    public static List<ClusterMetadata> tcmLoadRange(long min, long max)
     {
-        List<ClusterMetadata> afterLoad = 
ClusterMetadataService.instance().processor().reconstructFull(Epoch.create(min 
- 1), Epoch.create(max));
+        List<ClusterMetadata> afterLoad = 
ClusterMetadataService.instance().processor().reconstructFull(Epoch.create(min),
 Epoch.create(max));
+        if (Invariants.isParanoid())
+            assert afterLoad.get(0).epoch.getEpoch() == min : 
String.format("Unexpected epoch: expected %d but given %d", min, 
afterLoad.get(0).epoch.getEpoch());
         while (!afterLoad.isEmpty() && afterLoad.get(0).epoch.getEpoch() < min)
             afterLoad.remove(0);
         assert !afterLoad.isEmpty() : String.format("TCM was unable to return 
the needed epochs: %d -> %d", min, max);
         assert afterLoad.get(0).epoch.getEpoch() == min : 
String.format("Unexpected epoch: expected %d but given %d", min, 
afterLoad.get(0).epoch.getEpoch());
-        assert afterLoad.get(afterLoad.size() - 1).epoch.getEpoch() == max : 
String.format("Unexpected epoch: expected %d but given %d", max, 
afterLoad.get(afterLoad.size() - 1).epoch.getEpoch());
+        if (max != Long.MAX_VALUE)
+            assert afterLoad.get(afterLoad.size() - 1).epoch.getEpoch() == max 
: String.format("Unexpected epoch: expected %d but given %d", max, 
afterLoad.get(afterLoad.size() - 1).epoch.getEpoch());
         return afterLoad;
     }
 
diff --git a/src/java/org/apache/cassandra/tcm/Processor.java 
b/src/java/org/apache/cassandra/tcm/Processor.java
index 96cc42c066..5558b253d6 100644
--- a/src/java/org/apache/cassandra/tcm/Processor.java
+++ b/src/java/org/apache/cassandra/tcm/Processor.java
@@ -19,6 +19,7 @@
 package org.apache.cassandra.tcm;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -83,8 +84,10 @@ public interface Processor
     {
         LogState logState = reconstruct(lowEpoch, highEpoch, 
Retry.Deadline.retryIndefinitely(DatabaseDescriptor.getCmsAwaitTimeout().to(NANOSECONDS),
                                                                                
               TCMMetrics.instance.commitRetries));
+        if (logState.isEmpty()) return Collections.emptyList();
         List<ClusterMetadata> cms = new ArrayList<>(logState.entries.size());
         ClusterMetadata accum = logState.baseState;
+        cms.add(accum);
         for (Entry entry : logState.entries)
         {
             Transformation.Result res = entry.transform.execute(accum);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
index ed066b2c3b..88ef1af750 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
@@ -189,6 +189,7 @@ public class PaxosRepair2Test extends TestBaseImpl
         Ballot staleBallot = Paxos.newBallot(Ballot.none(), 
org.apache.cassandra.db.ConsistencyLevel.SERIAL);
         try (Cluster cluster = init(Cluster.create(3, cfg -> cfg
                                                              
.set("paxos_variant", "v2")
+                                                             
.set("accord.enabled", false) // this test monkeys with TCM which can cause 
confussion for Accord while it fetches epochs...
                                                              
.set("paxos_purge_grace_period", "0s")
                                                              
.set("truncate_request_timeout_in_ms", 1000L)))
         )
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
index 66d677080e..19a675b8d2 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordJournalIntegrationTest.java
@@ -18,11 +18,13 @@
 
 package org.apache.cassandra.distributed.test.accord;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.cassandra.distributed.api.IInvokableInstance;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -35,6 +37,9 @@ import org.apache.cassandra.distributed.test.TestBaseImpl;
 import org.apache.cassandra.service.accord.AccordService;
 import org.apache.cassandra.utils.concurrent.CountDownLatch;
 
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+
 public class AccordJournalIntegrationTest extends TestBaseImpl
 {
     @Test
@@ -45,8 +50,7 @@ public class AccordJournalIntegrationTest extends TestBaseImpl
                                            .withoutVNodes()
                                            .start()))
         {
-            final String TABLE = KEYSPACE + ".test_table";
-            cluster.schemaChange("CREATE TABLE " + TABLE + " (k int, c int, v 
int, primary key (k, c)) WITH transactional_mode='full'");
+            final String TABLE = createTable(cluster);
             List<Thread> threads = new ArrayList<>();
             int numThreads = 10;
             CountDownLatch latch = 
CountDownLatch.newCountDownLatch(numThreads);
@@ -94,18 +98,9 @@ public class AccordJournalIntegrationTest extends 
TestBaseImpl
                                       .start())
         {
             cluster.schemaChange("CREATE KEYSPACE " + KEYSPACE + " WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor': 1};");
-            final String TABLE = KEYSPACE + ".test_table";
-            cluster.schemaChange("CREATE TABLE " + TABLE + " (k int, c int, v 
int, primary key (k, c)) WITH transactional_mode='full'");
+            final String TABLE = createTable(cluster);
 
-            for (int j = 0; j < 1_000; j++)
-            {
-                cluster.coordinator(1).execute("BEGIN TRANSACTION\n" +
-                                               "INSERT INTO " + TABLE + "(k, 
c, v) VALUES (?, ?, ?);\n" +
-                                               "COMMIT TRANSACTION",
-                                               ConsistencyLevel.ALL,
-                                               j, j, 1
-                );
-            }
+            insertData(cluster, TABLE);
 
             Object[][] before = cluster.coordinator(1).execute("SELECT * FROM 
" + TABLE + " WHERE k = ?;", ConsistencyLevel.SERIAL, 1);
 
@@ -122,4 +117,45 @@ public class AccordJournalIntegrationTest extends 
TestBaseImpl
             }
         }
     }
+
+    @Test
+    public void restartWithEpochChanges() throws IOException
+    {
+        try (Cluster cluster = Cluster.build(3).withoutVNodes().withConfig(c 
-> c.with(GOSSIP).with(NETWORK)).start())
+        {
+            init(cluster);
+            final String TABLE = createTable(cluster);
+            cluster.get(1).nodetoolResult("cms", "reconfigure", 
"3").asserts().success();
+
+            insertData(cluster, TABLE);
+
+            IInvokableInstance restartNode = cluster.get(1);
+            ClusterUtils.stopUnchecked(restartNode);
+
+            // make epoch changes
+            for (int i = 0; i < 10; i++)
+                cluster.schemaChange("ALTER TABLE " + TABLE + " WITH comment = 
'change " + i + "'", true, cluster.get(2));
+
+            restartNode.startup();
+            insertData(cluster, TABLE);
+        }
+    }
+
+    private void insertData(Cluster cluster, String TABLE) {
+        for (int j = 0; j < 1_000; j++)
+        {
+            cluster.coordinator(1).execute("BEGIN TRANSACTION\n" +
+                                           "INSERT INTO " + TABLE + "(k, c, v) 
VALUES (?, ?, ?);\n" +
+                                           "COMMIT TRANSACTION",
+                                           ConsistencyLevel.ALL,
+                                           j, j, 1
+            );
+        }
+    }
+
+    private String createTable(Cluster cluster) {
+        final String TABLE = KEYSPACE + ".test_table";
+        cluster.schemaChange("CREATE TABLE " + TABLE + " (k int, c int, v int, 
primary key (k, c)) WITH transactional_mode='full'");
+        return TABLE;
+    }
 }
\ No newline at end of file
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/BootWithMetadataTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/BootWithMetadataTest.java
index fc6dad5adf..ae269dbdd6 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/BootWithMetadataTest.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/BootWithMetadataTest.java
@@ -52,6 +52,8 @@ public class BootWithMetadataTest extends TestBaseImpl
     public void resetTest() throws IOException, ExecutionException, 
InterruptedException
     {
         try (Cluster cluster = init(builder().withNodes(3)
+                                             // Accord tracks epochs, and if 
the expose no longer exist it is not able to process anything, causing it to 
crash...
+                                             .withConfig(c -> 
c.set("accord.enabled", false))
                                              .start()))
         {
             long epoch = 0;
@@ -94,6 +96,8 @@ public class BootWithMetadataTest extends TestBaseImpl
     public void newCMSTest() throws IOException, ExecutionException, 
InterruptedException
     {
         try (Cluster cluster = init(builder().withNodes(4)
+                                             // Accord tracks epochs, and if 
the expose no longer exist it is not able to process anything, causing it to 
crash...
+                                             .withConfig(c -> 
c.set("accord.enabled", false))
                                              .start()))
         {
             for (int i = 0; i < 10; i++)
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
index 59121b0aaa..59fdc0e6e4 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/CoordinatorPathTestBase.java
@@ -127,7 +127,9 @@ public abstract class CoordinatorPathTestBase extends 
FuzzTestBase
 
         try (Cluster cluster = builder().withNodes(1)
                                         .withConfig(cfg -> 
cfg.set("seed_provider", new 
ParameterizedClass(SimpleSeedProvider.class.getName(),
-                                                                               
                            Collections.singletonMap("seeds", fakeCmsNode.id() 
+ ":7012"))))
+                                                                               
                            Collections.singletonMap("seeds", fakeCmsNode.id() 
+ ":7012")))
+                                                              // Accord 
depends on Processor.reconstruct, but those verbs are not simulated, causing 
the tests to fail
+                                                              
.set("accord.enabled", false))
                                         .withTokenSupplier(factory)
                                         
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(10, "dc0", "rack0"))
                                         .createWithoutStarting();
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
index 11fce664a1..9fe1039e99 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/HarryTopologyMixupTest.java
@@ -63,7 +63,7 @@ public class HarryTopologyMixupTest extends 
TopologyMixupTestBase<HarryTopologyM
         {
             // do one last read just to make sure we validate the data...
             var harry = state.schemaSpec.harry;
-            harry.validateAll(harry.quiescentLocalChecker());
+            harry.validateAll(harry.quiescentChecker());
         }
     }
 
@@ -72,7 +72,7 @@ public class HarryTopologyMixupTest extends 
TopologyMixupTestBase<HarryTopologyM
         ReplayingHistoryBuilder harry = HarryHelper.dataGen(rs.nextLong(),
                                                             new 
InJvmSut(cluster),
                                                             new 
TokenPlacementModel.SimpleReplicationFactor(3),
-                                                            
SystemUnderTest.ConsistencyLevel.ALL);
+                                                            
SystemUnderTest.ConsistencyLevel.QUORUM);
         cluster.schemaChange(String.format("CREATE KEYSPACE %s WITH 
replication = {'class': 'SimpleStrategy', 'replication_factor' : 3};", 
HarryHelper.KEYSPACE));
         var schema = harry.schema();
         cluster.schemaChange(schema.compile().cql());
@@ -101,7 +101,7 @@ public class HarryTopologyMixupTest extends 
TopologyMixupTestBase<HarryTopologyM
             ((HarryState) state).numInserts++;
         });
         Command<State<Spec>, Void, ?> validateAll = new HarryCommand(state -> 
"Harry Validate All" + state.commandNamePostfix(), state -> {
-            spec.harry.validateAll(spec.harry.quiescentLocalChecker());
+            spec.harry.validateAll(spec.harry.quiescentChecker());
             ((HarryState) state).numInserts = 0;
         });
         return (rs, state) -> {
diff --git 
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
 
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
index fd669340ae..5d974a4f17 100644
--- 
a/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
+++ 
b/test/distributed/org/apache/cassandra/fuzz/topology/TopologyMixupTestBase.java
@@ -104,9 +104,9 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
         AddNode,
         RemoveNode,
         HostReplace,
+        StopNode,
+        StartNode,
         //TODO (coverage): add the following states once supported
-//        StopNode,
-//        StartNode,
 //        MoveToken
         //TODO (coverage): node migrate to another rack or dc (unsupported on 
trunk as of this writing, but planned work for TCM)
 //        MoveNodeToNewRack,
@@ -128,7 +128,7 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
     private Command<State<S>, Void, ?> repairCommand(int toCoordinate)
     {
         return new SimpleCommand<>(state -> "nodetool repair " + 
state.schemaSpec.keyspaceName() + ' ' + state.schemaSpec.name() + " from node" 
+ toCoordinate + state.commandNamePostfix(),
-                                   state -> 
state.cluster.get(toCoordinate).nodetoolResult("repair", 
state.schemaSpec.keyspaceName(), state.schemaSpec.name()).asserts().success());
+                                   state -> 
state.cluster.get(toCoordinate).nodetoolResult("repair", 
state.schemaSpec.keyspaceName(), state.schemaSpec.name(), 
"--force").asserts().success());
     }
 
     private Command<State<S>, Void, ?> waitForCMSToQuiesce()
@@ -137,6 +137,29 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
                                    state -> 
ClusterUtils.waitForCMSToQuiesce(state.cluster, state.cmsGroup));
     }
 
+    private Command<State<S>, Void, ?> stopInstance(RandomSource rs, State<S> 
state)
+    {
+        int toStop = rs.pickInt(state.topologyHistory.up());
+        return stopInstance(toStop, "Normal Stop");
+    }
+
+    private Command<State<S>, Void, ?> startInstance(RandomSource rs, State<S> 
state)
+    {
+        int toStop = rs.pickInt(state.topologyHistory.down());
+        return startInstance(toStop);
+    }
+
+    private Command<State<S>, Void, ?> startInstance(int toStart)
+    {
+        return new SimpleCommand<>(state -> "Start Node" + toStart + 
state.commandNamePostfix(),
+                state -> {
+                    IInvokableInstance inst = state.cluster.get(toStart);
+                    TopologyHistory.Node node = 
state.topologyHistory.node(toStart);
+                    inst.startup();
+                    node.up();
+                });
+    }
+
     private Command<State<S>, Void, ?> stopInstance(int toRemove, String why)
     {
         return new SimpleCommand<>(state -> "Stop Node" + toRemove + " for " + 
why + state.commandNamePostfix(),
@@ -256,10 +279,7 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
         TopologyHistory.Node adding = 
state.topologyHistory.replace(nodeToReplace);
         TopologyHistory.Node removing = 
state.topologyHistory.nodes.get(nodeToReplace);
 
-        return multistep(new SimpleCommand<>("Stop Node" + nodeToReplace + " 
for HostReplace; Node" + adding.id + state.commandNamePostfix(), s2 -> {
-                             ClusterUtils.stopUnchecked(toReplace);
-                             removing.down();
-                         }),
+        return multistep(stopInstance(nodeToReplace, "HostReplace; Node" + 
adding.id),
                          new SimpleCommand<>("Host Replace Node" + 
nodeToReplace + "; Node" + adding.id + state.commandNamePostfix(), s2 -> {
                              logger.info("node{} starting host replacement; 
epoch={}", adding.id, 
HackSerialization.tcmEpochAndSync(s2.cluster.getFirstRunningInstance()));
                              removing.status = 
TopologyHistory.Node.Status.BeingReplaced;
@@ -314,15 +334,20 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
         EnumSet<TopologyChange> possibleTopologyChanges = 
EnumSet.noneOf(TopologyChange.class);
         // up or down is logically more correct, but since this runs 
sequentially and after the topology changes are complete, we don't have downed 
nodes at this point
         // so up is enough to know the topology size
-        int size = state.topologyHistory.up().length;
-        if (size < state.topologyHistory.maxNodes)
+        int up = state.topologyHistory.up().length;
+        int down = state.topologyHistory.down().length;
+        int total = up + down;
+        if (total < state.topologyHistory.maxNodes)
             possibleTopologyChanges.add(TopologyChange.AddNode);
-        if (size > state.topologyHistory.quorum())
+        if (up > state.topologyHistory.quorum())
         {
-            if (size > TARGET_RF)
+            if (up > TARGET_RF)
                 possibleTopologyChanges.add(TopologyChange.RemoveNode);
             possibleTopologyChanges.add(TopologyChange.HostReplace);
+            possibleTopologyChanges.add(TopologyChange.StopNode);
         }
+        if (down > 0)
+            possibleTopologyChanges.add(TopologyChange.StartNode);
         return possibleTopologyChanges;
     }
 
@@ -342,6 +367,12 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
                 case HostReplace:
                     possible.put(rs -> multistep(hostReplace(rs, state), 
waitForCMSToQuiesce()), 1);
                     break;
+                case StartNode:
+                    possible.put(rs -> startInstance(rs, state), 1);
+                    break;
+                case StopNode:
+                    possible.put(rs -> stopInstance(rs, state), 1);
+                    break;
                 default:
                     throw new UnsupportedOperationException(task.name());
             }
@@ -568,11 +599,21 @@ public abstract class TopologyMixupTestBase<S extends 
TopologyMixupTestBase.Sche
         }
 
         public int[] up()
+        {
+            return nodes(Node.Status.Up);
+        }
+
+        public int[] down()
+        {
+            return nodes(Node.Status.Down);
+        }
+
+        private int[] nodes(Node.Status target)
         {
             IntArrayList up = new IntArrayList(nodes.size(), -1);
             for (Map.Entry<Integer, Node> n : nodes.entrySet())
             {
-                if (n.getValue().status == Node.Status.Up)
+                if (n.getValue().status == target)
                     up.add(n.getKey());
             }
             int[] ints = up.toIntArray();
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
 
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
index 94e181a853..9daa1bb9fa 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/AccordConfigurationServiceTest.java
@@ -185,7 +185,7 @@ public class AccordConfigurationServiceTest
 
         Topology topology1 = new Topology(1, new 
Shard(AccordTopology.fullRange(TBL1), ID_LIST, ID_SET));
         service.reportTopology(topology1);
-        loadEpoch(1, (epoch, topology, syncStatus, pendingSync, remoteSync, 
closed, redundant) -> {
+        loadEpoch(1, null, (epoch, cm, topology, syncStatus, pendingSync, 
remoteSync, closed, redundant) -> {
             Assert.assertEquals(topology1, topology);
             Assert.assertTrue(remoteSync.isEmpty());
         });
@@ -193,7 +193,7 @@ public class AccordConfigurationServiceTest
 
         service.receiveRemoteSyncComplete(ID1, 1);
         service.receiveRemoteSyncComplete(ID2, 1);
-        loadEpoch(1, (epoch, topology, syncStatus, pendingSync, remoteSync, 
closed, redundant) -> {
+        loadEpoch(1, null, (epoch, cm, topology, syncStatus, pendingSync, 
remoteSync, closed, redundant) -> {
             Assert.assertEquals(topology1, topology);
             Assert.assertEquals(Sets.newHashSet(ID1, ID2), remoteSync);
         });


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to