This is an automated email from the ASF dual-hosted git repository.

maedhroz pushed a commit to branch cep-15-accord
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/cep-15-accord by this push:
     new afdb57c4a6 Command to Exclude Replicas from Durability Status 
Coordination
afdb57c4a6 is described below

commit afdb57c4a6ae449377daf09d8c39688a8560eaa3
Author: Caleb Rackliffe <calebrackli...@gmail.com>
AuthorDate: Tue Jul 16 17:40:49 2024 -0500

    Command to Exclude Replicas from Durability Status Coordination
    
    patch by Caleb Rackliffe; reviewed by David Capwell, Sam Tunnicliffe, and 
Benedict Elliott Smith for CASSANDRA-19321
---
 modules/accord                                     |   2 +-
 .../apache/cassandra/service/CassandraDaemon.java  |   2 +
 .../cassandra/service/accord/AccordOperations.java |  76 +++++++++++
 .../service/accord/AccordOperationsMBean.java      |  31 +++++
 .../service/accord/AccordStaleReplicas.java        | 137 +++++++++++++++++++
 .../cassandra/service/accord/AccordTopology.java   |  25 +++-
 .../accord/serializers/TopologySerializers.java    |   5 +-
 .../org/apache/cassandra/tcm/ClusterMetadata.java  |  77 +++++++++--
 .../org/apache/cassandra/tcm/MetadataKeys.java     |   2 +
 .../cassandra/tcm/StubClusterMetadataService.java  |   4 +-
 .../org/apache/cassandra/tcm/Transformation.java   |   2 +
 .../cassandra/tcm/compatibility/GossipHelper.java  |  10 +-
 .../cassandra/tcm/serialization/Version.java       |   1 +
 .../tcm/transformations/AccordMarkRejoining.java   | 123 +++++++++++++++++
 .../tcm/transformations/AccordMarkStale.java       | 151 +++++++++++++++++++++
 src/java/org/apache/cassandra/tools/NodeProbe.java |  10 ++
 src/java/org/apache/cassandra/tools/NodeTool.java  |   7 +
 .../cassandra/tools/nodetool/AccordAdmin.java      |  69 ++++++++++
 .../mock/nodetool/InternalNodeProbe.java           |   2 +
 .../test/accord/AccordNodetoolTest.java            |  99 ++++++++++++++
 .../test/log/ClusterMetadataTestHelper.java        |  10 +-
 .../apache/cassandra/locator/MetaStrategyTest.java |  10 +-
 .../accord/AccordFastPathCoordinatorTest.java      |  17 +--
 .../service/accord/AccordStaleReplicasTest.java    |  54 ++++++++
 .../tcm/ClusterMetadataTransformationTest.java     |   2 +
 .../AsymmetricMetadataSerializers.java             |  42 ++++++
 .../transformations/AccordMarkRejoiningTest.java   |  56 ++++++++
 .../tcm/transformations/AccordMarkStaleTest.java   |  56 ++++++++
 28 files changed, 1043 insertions(+), 39 deletions(-)

diff --git a/modules/accord b/modules/accord
index 449b2b4d0b..81c02769f9 160000
--- a/modules/accord
+++ b/modules/accord
@@ -1 +1 @@
-Subproject commit 449b2b4d0bf4bb44d55a3c57f712a4d5a15e7220
+Subproject commit 81c02769f9ad73ef3aba0675c2217fc74b8a4a4c
diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java 
b/src/java/org/apache/cassandra/service/CassandraDaemon.java
index 4ae0413529..a5e2be1e05 100644
--- a/src/java/org/apache/cassandra/service/CassandraDaemon.java
+++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java
@@ -70,6 +70,7 @@ import 
org.apache.cassandra.net.StartupClusterConnectivityChecker;
 import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.security.ThreadAwareSecurityManager;
+import org.apache.cassandra.service.accord.AccordOperations;
 import org.apache.cassandra.service.paxos.PaxosState;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.tcm.CMSOperations;
@@ -269,6 +270,7 @@ public class CassandraDaemon
             Startup.initialize(DatabaseDescriptor.getSeeds());
             
disableAutoCompaction(Schema.instance.distributedKeyspaces().names());
             CMSOperations.initJmx();
+            AccordOperations.initJmx();
         }
         catch (InterruptedException | ExecutionException | IOException e)
         {
diff --git a/src/java/org/apache/cassandra/service/accord/AccordOperations.java 
b/src/java/org/apache/cassandra/service/accord/AccordOperations.java
new file mode 100644
index 0000000000..e7820919e1
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/AccordOperations.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.ClusterMetadataService;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.transformations.AccordMarkStale;
+import org.apache.cassandra.tcm.transformations.AccordMarkRejoining;
+import org.apache.cassandra.utils.MBeanWrapper;
+
+public class AccordOperations implements AccordOperationsMBean
+{
+    public static final String MBEAN_OBJECT_NAME = 
"org.apache.cassandra.service.accord:type=AccordOperations";
+    public static final AccordOperations instance = new 
AccordOperations(ClusterMetadataService.instance());
+
+    private final ClusterMetadataService cms;
+
+    public static void initJmx()
+    {
+        MBeanWrapper.instance.registerMBean(instance, MBEAN_OBJECT_NAME);
+    }
+
+    private AccordOperations(ClusterMetadataService cms)
+    {
+        this.cms = cms;
+    }
+
+    @Override
+    public Map<String, String> describe()
+    {
+        Map<String, String> info = new HashMap<>();
+        ClusterMetadata metadata = ClusterMetadata.current();
+
+        info.put("EPOCH", Long.toString(metadata.epoch.getEpoch()));
+        String staleReplicas = 
metadata.accordStaleReplicas.ids().stream().sorted().map(Object::toString).collect(Collectors.joining(","));
+        info.put("STALE_REPLICAS", staleReplicas);
+        return info;
+    }
+
+    @Override
+    public void accordMarkStale(List<String> nodeIdStrings)
+    {
+        Set<NodeId> nodeIds = 
nodeIdStrings.stream().map(NodeId::fromString).collect(Collectors.toSet());
+        cms.commit(new AccordMarkStale(nodeIds));
+    }
+
+    @Override
+    public void accordMarkRejoining(List<String> nodeIdStrings)
+    {
+        Set<NodeId> nodeIds = 
nodeIdStrings.stream().map(NodeId::fromString).collect(Collectors.toSet());
+        cms.commit(new AccordMarkRejoining(nodeIds));
+    }
+}
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordOperationsMBean.java 
b/src/java/org/apache/cassandra/service/accord/AccordOperationsMBean.java
new file mode 100644
index 0000000000..e0b0884733
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/AccordOperationsMBean.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.List;
+import java.util.Map;
+
+public interface AccordOperationsMBean
+{
+    Map<String, String> describe();
+
+    void accordMarkStale(List<String> nodeIds);
+
+    void accordMarkRejoining(List<String> nodeIds);
+}
diff --git 
a/src/java/org/apache/cassandra/service/accord/AccordStaleReplicas.java 
b/src/java/org/apache/cassandra/service/accord/AccordStaleReplicas.java
new file mode 100644
index 0000000000..2502fa1a63
--- /dev/null
+++ b/src/java/org/apache/cassandra/service/accord/AccordStaleReplicas.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+
+import javax.annotation.concurrent.Immutable;
+
+import com.google.common.collect.ImmutableSet;
+
+import accord.local.Node;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.serializers.TopologySerializers;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.MetadataValue;
+import org.apache.cassandra.tcm.serialization.MetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.CollectionSerializers;
+
+@Immutable
+public class AccordStaleReplicas implements MetadataValue<AccordStaleReplicas>
+{
+    public static final AccordStaleReplicas EMPTY = new 
AccordStaleReplicas(ImmutableSet.of(), Epoch.EMPTY);
+
+    private final Set<Node.Id> staleIds;
+    private final Epoch lastModified;
+
+    AccordStaleReplicas(Set<Node.Id> staleIds, Epoch lastModified)
+    {
+        this.staleIds = staleIds;
+        this.lastModified = lastModified;
+    }
+
+    @Override
+    public AccordStaleReplicas withLastModified(Epoch epoch)
+    {
+        return new AccordStaleReplicas(staleIds, epoch);
+    }
+
+    @Override
+    public Epoch lastModified()
+    {
+        return lastModified;
+    }
+    
+    public AccordStaleReplicas withNodeIds(Set<Node.Id> ids)
+    {
+        ImmutableSet.Builder<Node.Id> builder = new ImmutableSet.Builder<>();
+        Set<Node.Id> newIds = builder.addAll(staleIds).addAll(ids).build();
+        return new AccordStaleReplicas(newIds, lastModified);
+    }
+
+    public AccordStaleReplicas without(Set<Node.Id> ids)
+    {
+        ImmutableSet.Builder<Node.Id> builder = new ImmutableSet.Builder<>();
+
+        for (Node.Id staleId : staleIds)
+            if (!ids.contains(staleId))
+                builder.add(staleId);
+
+        return new AccordStaleReplicas(builder.build(), lastModified);
+    }
+
+    public boolean contains(Node.Id nodeId)
+    {
+        return staleIds.contains(nodeId);
+    }
+
+    public Set<Node.Id> ids()
+    {
+        return staleIds;
+    }
+
+    @Override
+    public String toString()
+    {
+        return "AccordStaleReplicas{staleIds=" + staleIds + ", lastModified=" 
+ lastModified + '}';
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        AccordStaleReplicas that = (AccordStaleReplicas) o;
+        return Objects.equals(staleIds, that.staleIds) && 
Objects.equals(lastModified, that.lastModified);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(staleIds, lastModified);
+    }
+
+    public static final MetadataSerializer<AccordStaleReplicas> serializer = 
new MetadataSerializer<>()
+    {
+        @Override
+        public void serialize(AccordStaleReplicas replicas, DataOutputPlus 
out, Version version) throws IOException
+        {
+            CollectionSerializers.serializeCollection(replicas.staleIds, out, 
version, TopologySerializers.nodeId);
+            Epoch.serializer.serialize(replicas.lastModified, out, version);
+        }
+
+        @Override
+        public AccordStaleReplicas deserialize(DataInputPlus in, Version 
version) throws IOException
+        {
+            return new 
AccordStaleReplicas(CollectionSerializers.deserializeSet(in, version, 
TopologySerializers.nodeId),
+                                           Epoch.serializer.deserialize(in, 
version));
+        }
+
+        @Override
+        public long serializedSize(AccordStaleReplicas replicas, Version 
version)
+        {
+            return 
CollectionSerializers.serializedCollectionSize(replicas.staleIds, version, 
TopologySerializers.nodeId)
+                   + Epoch.serializer.serializedSize(replicas.lastModified, 
version);
+        }
+    };
+}
diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java 
b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
index 7bbfc0c250..45f3b3fd76 100644
--- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java
+++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java
@@ -78,7 +78,7 @@ public class AccordTopology
         }
     }
 
-    static class KeyspaceShard
+    public static class KeyspaceShard
     {
         private final KeyspaceMetadata keyspace;
         private final Range<Token> range;
@@ -139,7 +139,7 @@ public class AccordTopology
             return new KeyspaceShard(keyspace, range, nodes, pending);
         }
 
-        public static List<KeyspaceShard> forKeyspace(KeyspaceMetadata 
keyspace, DataPlacements placements, Directory directory, ShardLookup lookup)
+        public static List<KeyspaceShard> forKeyspace(KeyspaceMetadata 
keyspace, DataPlacements placements, Directory directory)
         {
             ReplicationParams replication = keyspace.params.replication;
             DataPlacement placement = placements.get(replication);
@@ -154,6 +154,16 @@ public class AccordTopology
             }
             return shards;
         }
+
+        public List<Node.Id> nodes()
+        {
+            return nodes;
+        }
+
+        public Range<Token> range()
+        {
+            return range;
+        }
     }
 
     static TokenRange minRange(TableId table, Token token)
@@ -219,7 +229,9 @@ public class AccordTopology
         return builder.build();
     }
 
-    public static Topology createAccordTopology(Epoch epoch, DistributedSchema 
schema, DataPlacements placements, Directory directory, AccordFastPath 
accordFastPath, ShardLookup lookup)
+    public static Topology createAccordTopology(Epoch epoch, DistributedSchema 
schema, DataPlacements placements, 
+                                                Directory directory, 
AccordFastPath accordFastPath, ShardLookup lookup,
+                                                AccordStaleReplicas 
staleReplicas)
     {
         List<Shard> shards = new ArrayList<>();
         Set<Node.Id> unavailable = accordFastPath.unavailableIds();
@@ -230,17 +242,18 @@ public class AccordTopology
             List<TableMetadata> tables = 
keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList());
             if (tables.isEmpty())
                 continue;
-            List<KeyspaceShard> ksShards = KeyspaceShard.forKeyspace(keyspace, 
placements, directory, lookup);
+            List<KeyspaceShard> ksShards = KeyspaceShard.forKeyspace(keyspace, 
placements, directory);
             tables.forEach(table -> ksShards.forEach(shard -> 
shards.add(shard.createForTable(table, unavailable, dcMap, lookup))));
         }
 
         shards.sort((a, b) -> a.range.compare(b.range));
-        return new Topology(epoch.getEpoch(), shards.toArray(new Shard[0]));
+
+        return new Topology(epoch.getEpoch(), staleReplicas.ids(), 
shards.toArray(new Shard[0]));
     }
 
     public static Topology createAccordTopology(ClusterMetadata metadata, 
ShardLookup lookup)
     {
-        return createAccordTopology(metadata.epoch, metadata.schema, 
metadata.placements, metadata.directory, metadata.accordFastPath, lookup);
+        return createAccordTopology(metadata.epoch, metadata.schema, 
metadata.placements, metadata.directory, metadata.accordFastPath, lookup, 
metadata.accordStaleReplicas);
     }
 
     public static Topology createAccordTopology(ClusterMetadata metadata, 
Topology current)
diff --git 
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
 
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
index 782ecbf5ed..73708c125f 100644
--- 
a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
+++ 
b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java
@@ -159,6 +159,7 @@ public class TopologySerializers
         {
             out.writeLong(topology.epoch());
             ArraySerializers.serializeArray(topology.unsafeGetShards(), out, 
version, shard);
+            CollectionSerializers.serializeCollection(topology.staleIds(), 
out, version, TopologySerializers.nodeId);
         }
 
         @Override
@@ -166,7 +167,8 @@ public class TopologySerializers
         {
             long epoch = in.readLong();
             Shard[] shards = ArraySerializers.deserializeArray(in, version, 
shard, Shard[]::new);
-            return new Topology(epoch, shards);
+            Set<Node.Id> staleIds = CollectionSerializers.deserializeSet(in, 
version, TopologySerializers.nodeId);
+            return new Topology(epoch, staleIds, shards);
         }
 
         @Override
@@ -175,6 +177,7 @@ public class TopologySerializers
             long size = 0;
             size += TypeSizes.LONG_SIZE; // epoch
             size += 
ArraySerializers.serializedArraySize(topology.unsafeGetShards(), version, 
shard);
+            size += 
CollectionSerializers.serializedCollectionSize(topology.staleIds(), version, 
TopologySerializers.nodeId);
             return size;
         }
     };
diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java 
b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
index 415fe71456..5bde50f1b3 100644
--- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
+++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java
@@ -21,6 +21,7 @@ package org.apache.cassandra.tcm;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -56,6 +57,8 @@ import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.schema.ReplicationParams;
 import org.apache.cassandra.schema.TableId;
+import org.apache.cassandra.service.accord.AccordStaleReplicas;
+import org.apache.cassandra.service.accord.AccordTopology;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import org.apache.cassandra.service.consensus.migration.TableMigrationState;
 import org.apache.cassandra.tcm.extensions.ExtensionKey;
@@ -98,6 +101,7 @@ public class ClusterMetadata
     public final InProgressSequences inProgressSequences;
     public final ConsensusMigrationState consensusMigrationState;
     public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions;
+    public final AccordStaleReplicas accordStaleReplicas;
 
     // These two fields are lazy but only for the test purposes, since their 
computation requires initialization of the log ks
     private EndpointsForRange fullCMSReplicas;
@@ -129,7 +133,8 @@ public class ClusterMetadata
              LockedRanges.EMPTY,
              InProgressSequences.EMPTY,
              ConsensusMigrationState.EMPTY,
-             ImmutableMap.of());
+             ImmutableMap.of(),
+             AccordStaleReplicas.EMPTY);
     }
 
     public ClusterMetadata(Epoch epoch,
@@ -142,7 +147,8 @@ public class ClusterMetadata
                            LockedRanges lockedRanges,
                            InProgressSequences inProgressSequences,
                            ConsensusMigrationState consensusMigrationState,
-                           Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions)
+                           Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions,
+                           AccordStaleReplicas accordStaleReplicas)
     {
         this(EMPTY_METADATA_IDENTIFIER,
              epoch,
@@ -155,7 +161,8 @@ public class ClusterMetadata
              lockedRanges,
              inProgressSequences,
              consensusMigrationState,
-             extensions);
+             extensions,
+             accordStaleReplicas);
     }
 
     private ClusterMetadata(int metadataIdentifier,
@@ -169,7 +176,8 @@ public class ClusterMetadata
                            LockedRanges lockedRanges,
                            InProgressSequences inProgressSequences,
                            ConsensusMigrationState consensusMigrationState,
-                           Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions)
+                           Map<ExtensionKey<?, ?>, ExtensionValue<?>> 
extensions,
+                           AccordStaleReplicas accordStaleReplicas)
     {
         // TODO: token map is a feature of the specific placement strategy, 
and so may not be a relevant component of
         //  ClusterMetadata in the long term. We need to consider how the 
actual components of metadata can be evolved
@@ -187,17 +195,18 @@ public class ClusterMetadata
         this.inProgressSequences = inProgressSequences;
         this.consensusMigrationState = consensusMigrationState;
         this.extensions = ImmutableMap.copyOf(extensions);
+        this.accordStaleReplicas = accordStaleReplicas;
         this.hashCode = computeHashCode();
     }
 
     public ClusterMetadata withDirectory(Directory directory)
     {
-        return new ClusterMetadata(epoch, partitioner, schema, directory, 
tokenMap, placements, accordFastPath, lockedRanges, inProgressSequences, 
consensusMigrationState, extensions);
+        return new ClusterMetadata(epoch, partitioner, schema, directory, 
tokenMap, placements, accordFastPath, lockedRanges, inProgressSequences, 
consensusMigrationState, extensions, accordStaleReplicas);
     }
 
     public ClusterMetadata withPlacements(DataPlacements placements)
     {
-        return new ClusterMetadata(epoch, partitioner, schema, directory, 
tokenMap, placements, accordFastPath, lockedRanges, inProgressSequences, 
consensusMigrationState, extensions);
+        return new ClusterMetadata(epoch, partitioner, schema, directory, 
tokenMap, placements, accordFastPath, lockedRanges, inProgressSequences, 
consensusMigrationState, extensions, accordStaleReplicas);
     }
 
     public Set<InetAddressAndPort> fullCMSMembers()
@@ -246,7 +255,8 @@ public class ClusterMetadata
                                    capLastModified(lockedRanges, epoch),
                                    capLastModified(inProgressSequences, epoch),
                                    capLastModified(consensusMigrationState, 
epoch),
-                                   capLastModified(extensions, epoch));
+                                   capLastModified(extensions, epoch),
+                                   capLastModified(accordStaleReplicas, 
epoch));
     }
 
     public ClusterMetadata initializeClusterIdentifier(int clusterIdentifier)
@@ -268,7 +278,8 @@ public class ClusterMetadata
                                    lockedRanges,
                                    inProgressSequences,
                                    consensusMigrationState,
-                                   extensions);
+                                   extensions,
+                                   accordStaleReplicas);
     }
 
     private static Map<ExtensionKey<?,?>, ExtensionValue<?>> 
capLastModified(Map<ExtensionKey<?,?>, ExtensionValue<?>> original, Epoch 
maxEpoch)
@@ -393,6 +404,7 @@ public class ClusterMetadata
         private ConsensusMigrationState consensusMigrationState;
         private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions;
         private final Set<MetadataKey> modifiedKeys;
+        private AccordStaleReplicas accordStaleReplicas;
 
         private Transformer(ClusterMetadata metadata, Epoch epoch)
         {
@@ -409,6 +421,7 @@ public class ClusterMetadata
             this.consensusMigrationState = metadata.consensusMigrationState;
             extensions = new HashMap<>(metadata.extensions);
             modifiedKeys = new HashSet<>();
+            accordStaleReplicas = metadata.accordStaleReplicas;
         }
 
         public Epoch epoch()
@@ -437,6 +450,11 @@ public class ClusterMetadata
         public Transformer unregister(NodeId nodeId)
         {
             directory = directory.without(nodeId);
+
+            Node.Id accordId = AccordTopology.tcmIdToAccord(nodeId);
+            if (accordStaleReplicas.contains(accordId))
+                accordStaleReplicas = 
accordStaleReplicas.without(Collections.singleton(accordId));
+
             return this;
         }
 
@@ -504,6 +522,11 @@ public class ClusterMetadata
             directory = directory.without(replaced)
                                  .withRackAndDC(replacement)
                                  .withNodeState(replacement, NodeState.JOINED);
+
+            Node.Id accordId = AccordTopology.tcmIdToAccord(replaced);
+            if (accordStaleReplicas.contains(accordId))
+                accordStaleReplicas = 
accordStaleReplicas.without(Collections.singleton(accordId));
+
             return this;
         }
 
@@ -532,6 +555,18 @@ public class ClusterMetadata
             accordFastPath = accordFastPath.withNodeStatusSince(node, status, 
updateTimeMillis, updateDelayMillis);
             return this;
         }
+        
+        public Transformer markStaleReplicas(Set<Node.Id> ids)
+        {
+            accordStaleReplicas = accordStaleReplicas.withNodeIds(ids);
+            return this;
+        }
+
+        public Transformer unmarkStaleReplicas(Set<Node.Id> ids)
+        {
+            accordStaleReplicas = accordStaleReplicas.without(ids);
+            return this;
+        }
 
         public Transformer with(LockedRanges lockedRanges)
         {
@@ -658,6 +693,12 @@ public class ClusterMetadata
                 modifiedKeys.add(MetadataKeys.ACCORD_FAST_PATH);
                 accordFastPath = accordFastPath.withLastModified(epoch);
             }
+            
+            if (accordStaleReplicas != base.accordStaleReplicas)
+            {
+                modifiedKeys.add(MetadataKeys.ACCORD_STALE_REPLICAS);
+                accordStaleReplicas = 
accordStaleReplicas.withLastModified(epoch);
+            }
 
             if (lockedRanges != base.lockedRanges)
             {
@@ -693,7 +734,8 @@ public class ClusterMetadata
                                                        lockedRanges,
                                                        inProgressSequences,
                                                        consensusMigrationState,
-                                                       extensions),
+                                                       extensions,
+                    accordStaleReplicas),
                                    ImmutableSet.copyOf(modifiedKeys));
         }
 
@@ -710,7 +752,8 @@ public class ClusterMetadata
                                        lockedRanges,
                                        inProgressSequences,
                                        consensusMigrationState,
-                                       extensions);
+                                       extensions,
+                    accordStaleReplicas);
         }
 
         @Override
@@ -975,7 +1018,9 @@ public class ClusterMetadata
             {
                 AccordFastPath.serializer.serialize(metadata.accordFastPath, 
out, version);
                 
ConsensusMigrationState.serializer.serialize(metadata.consensusMigrationState, 
out, version);
+                
AccordStaleReplicas.serializer.serialize(metadata.accordStaleReplicas, out, 
version);
             }
+
             LockedRanges.serializer.serialize(metadata.lockedRanges, out, 
version);
             
InProgressSequences.serializer.serialize(metadata.inProgressSequences, out, 
version);
             out.writeInt(metadata.extensions.size());
@@ -1012,18 +1057,24 @@ public class ClusterMetadata
             Directory dir = Directory.serializer.deserialize(in, version);
             TokenMap tokenMap = TokenMap.serializer.deserialize(in, version);
             DataPlacements placements = 
DataPlacements.serializer.deserialize(in, version);
+
             AccordFastPath accordFastPath;
             ConsensusMigrationState consensusMigrationState;
+            AccordStaleReplicas staleReplicas;
+
             if (version.isAtLeast(V2))
             {
                 accordFastPath = AccordFastPath.serializer.deserialize(in, 
version);
                 consensusMigrationState = 
ConsensusMigrationState.serializer.deserialize(in, version);
+                staleReplicas = AccordStaleReplicas.serializer.deserialize(in, 
version);
             }
             else
             {
                 accordFastPath = AccordFastPath.EMPTY;
                 consensusMigrationState = ConsensusMigrationState.EMPTY;
+                staleReplicas = AccordStaleReplicas.EMPTY;
             }
+
             LockedRanges lockedRanges = 
LockedRanges.serializer.deserialize(in, version);
             InProgressSequences ips = 
InProgressSequences.serializer.deserialize(in, version);
             int items = in.readInt();
@@ -1046,7 +1097,8 @@ public class ClusterMetadata
                                        lockedRanges,
                                        ips,
                                        consensusMigrationState,
-                                       extensions);
+                                       extensions,
+                                       staleReplicas);
         }
 
         @Override
@@ -1070,7 +1122,8 @@ public class ClusterMetadata
             if (version.isAtLeast(V2))
             {
                 size += 
AccordFastPath.serializer.serializedSize(metadata.accordFastPath, version) +
-                        
ConsensusMigrationState.serializer.serializedSize(metadata.consensusMigrationState,
 version);
+                        
ConsensusMigrationState.serializer.serializedSize(metadata.consensusMigrationState,
 version) +
+                        
AccordStaleReplicas.serializer.serializedSize(metadata.accordStaleReplicas, 
version);
             }
 
             size += 
LockedRanges.serializer.serializedSize(metadata.lockedRanges, version) +
diff --git a/src/java/org/apache/cassandra/tcm/MetadataKeys.java 
b/src/java/org/apache/cassandra/tcm/MetadataKeys.java
index 59baeab58c..62bd72fe70 100644
--- a/src/java/org/apache/cassandra/tcm/MetadataKeys.java
+++ b/src/java/org/apache/cassandra/tcm/MetadataKeys.java
@@ -38,6 +38,7 @@ public class MetadataKeys
     public static final MetadataKey TOKEN_MAP               = make(CORE_NS, 
"ownership", "token_map");
     public static final MetadataKey DATA_PLACEMENTS         = make(CORE_NS, 
"ownership", "data_placements");
     public static final MetadataKey ACCORD_FAST_PATH        = make(CORE_NS, 
"ownership", "accord_fast_path");
+    public static final MetadataKey ACCORD_STALE_REPLICAS   = make(CORE_NS, 
"ownership", "accord_stale_replicas");
     public static final MetadataKey LOCKED_RANGES           = make(CORE_NS, 
"sequences", "locked_ranges");
     public static final MetadataKey IN_PROGRESS_SEQUENCES   = make(CORE_NS, 
"sequences", "in_progress");
     public static final MetadataKey CONSENSUS_MIGRATION_STATE = make(CORE_NS, 
"consensus", "migration_state");
@@ -47,6 +48,7 @@ public class MetadataKeys
                                                                                
   TOKEN_MAP,
                                                                                
   DATA_PLACEMENTS,
                                                                                
   ACCORD_FAST_PATH,
+                                                                               
   ACCORD_STALE_REPLICAS,   
                                                                                
   LOCKED_RANGES,
                                                                                
   IN_PROGRESS_SEQUENCES,
                                                                                
   CONSENSUS_MIGRATION_STATE);
diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java 
b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
index abb3aa2603..42924d1690 100644
--- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
+++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java
@@ -29,6 +29,7 @@ import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.KeyspaceMetadata;
 import org.apache.cassandra.schema.Keyspaces;
 import org.apache.cassandra.service.accord.AccordFastPath;
+import org.apache.cassandra.service.accord.AccordStaleReplicas;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import org.apache.cassandra.tcm.Commit.Replicator;
 import org.apache.cassandra.tcm.log.Entry;
@@ -185,7 +186,8 @@ public class StubClusterMetadataService extends 
ClusterMetadataService
                                               LockedRanges.EMPTY,
                                               InProgressSequences.EMPTY,
                                               ConsensusMigrationState.EMPTY,
-                                              ImmutableMap.of());
+                                              ImmutableMap.of(),
+                                              AccordStaleReplicas.EMPTY);
             return new StubClusterMetadataService(new UniformRangePlacement(),
                                                   snapshots != null ? 
snapshots : MetadataSnapshots.NO_OP,
                                                   
LocalLog.logSpec().withInitialState(initial).createLog(),
diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java 
b/src/java/org/apache/cassandra/tcm/Transformation.java
index ea160482f3..e41a24159b 100644
--- a/src/java/org/apache/cassandra/tcm/Transformation.java
+++ b/src/java/org/apache/cassandra/tcm/Transformation.java
@@ -219,6 +219,8 @@ public interface Transformation
 
         BEGIN_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(36, () -> 
BeginConsensusMigrationForTableAndRange.serializer),
         MAYBE_FINISH_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(37, () -> 
MaybeFinishConsensusMigrationForTableAndRange.serializer),
+        ACCORD_MARK_STALE(38, () -> AccordMarkStale.serializer),
+        ACCORD_MARK_REJOINING(39, () -> AccordMarkRejoining.serializer),
 
         ;
 
diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java 
b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
index 746523c46e..b08aebb871 100644
--- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
+++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java
@@ -52,6 +52,7 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.schema.DistributedSchema;
 import org.apache.cassandra.schema.SchemaKeyspace;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.accord.AccordStaleReplicas;
 import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
@@ -286,7 +287,8 @@ public class GossipHelper
                                    LockedRanges.EMPTY,
                                    InProgressSequences.EMPTY,
                                    ConsensusMigrationState.EMPTY,
-                                   Collections.emptyMap());
+                                   Collections.emptyMap(),
+                                   AccordStaleReplicas.EMPTY);
     }
 
     public static ClusterMetadata fromEndpointStates(DistributedSchema schema, 
Map<InetAddressAndPort, EndpointState> epStates)
@@ -358,7 +360,8 @@ public class GossipHelper
                                                                       
LockedRanges.EMPTY,
                                                                       
InProgressSequences.EMPTY,
                                                                       
ConsensusMigrationState.EMPTY,
-                                                                      
extensions);
+                                                                      
extensions,
+                                                                      
AccordStaleReplicas.EMPTY);
         return new ClusterMetadata(Epoch.UPGRADE_GOSSIP,
                                    partitioner,
                                    schema,
@@ -369,7 +372,8 @@ public class GossipHelper
                                    LockedRanges.EMPTY,
                                    InProgressSequences.EMPTY,
                                    ConsensusMigrationState.EMPTY,
-                                   extensions);
+                                   extensions,
+                                   AccordStaleReplicas.EMPTY);
     }
 
     public static boolean isValidForClusterMetadata(Map<InetAddressAndPort, 
EndpointState> epstates)
diff --git a/src/java/org/apache/cassandra/tcm/serialization/Version.java 
b/src/java/org/apache/cassandra/tcm/serialization/Version.java
index 0fd6b5b1c1..e4021ecea0 100644
--- a/src/java/org/apache/cassandra/tcm/serialization/Version.java
+++ b/src/java/org/apache/cassandra/tcm/serialization/Version.java
@@ -37,6 +37,7 @@ public enum Version
      *  - Added version to PlacementForRange serializer
      *  - Serialize MemtableParams when serializing TableParams
      *  - Added AccordFastPath
+     *  - Added AccordStaleReplicas
      */
     V2(2),
 
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/AccordMarkRejoining.java 
b/src/java/org/apache/cassandra/tcm/transformations/AccordMarkRejoining.java
new file mode 100644
index 0000000000..402d05cf50
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/transformations/AccordMarkRejoining.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.io.IOException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.local.Node;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.service.accord.AccordTopology;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.sequences.LockedRanges;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.CollectionSerializers;
+
+import static org.apache.cassandra.exceptions.ExceptionCode.INVALID;
+
+public class AccordMarkRejoining implements Transformation
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordMarkRejoining.class);
+    
+    private final Set<NodeId> ids;
+
+    public AccordMarkRejoining(Set<NodeId> ids)
+    {
+        this.ids = ids;
+    }
+
+    @Override
+    public Kind kind()
+    {
+        return Kind.ACCORD_MARK_REJOINING;
+    }
+
+    @Override
+    public Result execute(ClusterMetadata prev)
+    {
+        for (NodeId id : ids)
+            if (!prev.directory.peerIds().contains(id))
+                return new Rejected(INVALID, String.format("Can not unmark 
node %s as it is not present in the directory.", id));
+
+        Set<Node.Id> accordIds = 
ids.stream().map(AccordTopology::tcmIdToAccord).collect(Collectors.toSet());
+
+        for (Node.Id id : accordIds)
+            if (!prev.accordStaleReplicas.contains(id))
+                return new Rejected(INVALID, String.format("Can not unmark 
node %s as it is not stale.", id));
+
+        logger.info("Unmarking " + ids + ". They will now participate in 
durability status coordination...");
+        ClusterMetadata.Transformer next = 
prev.transformer().unmarkStaleReplicas(accordIds);
+        return Transformation.success(next, LockedRanges.AffectedRanges.EMPTY);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "AccordMarkRejoining{ids=" + ids + '}';
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        AccordMarkRejoining that = (AccordMarkRejoining) o;
+        return Objects.equals(ids, that.ids);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(ids);
+    }
+
+    public static final AsymmetricMetadataSerializer<Transformation, 
AccordMarkRejoining> serializer = new AsymmetricMetadataSerializer<>()
+    {
+        @Override
+        public void serialize(Transformation t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            assert t instanceof AccordMarkRejoining;
+            AccordMarkRejoining mark = (AccordMarkRejoining) t;
+            CollectionSerializers.serializeCollection(mark.ids, out, version, 
NodeId.serializer);
+        }
+
+        @Override
+        public AccordMarkRejoining deserialize(DataInputPlus in, Version 
version) throws IOException
+        {
+            return new 
AccordMarkRejoining(CollectionSerializers.deserializeSet(in, version, 
NodeId.serializer));
+        }
+
+        @Override
+        public long serializedSize(Transformation t, Version version)
+        {
+            assert t instanceof AccordMarkRejoining;
+            AccordMarkRejoining mark = (AccordMarkRejoining) t;
+            return CollectionSerializers.serializedCollectionSize(mark.ids, 
version, NodeId.serializer);
+        }
+    };
+}
diff --git 
a/src/java/org/apache/cassandra/tcm/transformations/AccordMarkStale.java 
b/src/java/org/apache/cassandra/tcm/transformations/AccordMarkStale.java
new file mode 100644
index 0000000000..261e0a4eb2
--- /dev/null
+++ b/src/java/org/apache/cassandra/tcm/transformations/AccordMarkStale.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import accord.local.Node;
+import accord.topology.Shard;
+import org.apache.cassandra.io.util.DataInputPlus;
+import org.apache.cassandra.io.util.DataOutputPlus;
+import org.apache.cassandra.schema.KeyspaceMetadata;
+import org.apache.cassandra.schema.SchemaConstants;
+import org.apache.cassandra.service.accord.AccordTopology;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.tcm.Transformation;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.sequences.LockedRanges;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer;
+import org.apache.cassandra.tcm.serialization.Version;
+import org.apache.cassandra.utils.CollectionSerializers;
+
+import static org.apache.cassandra.exceptions.ExceptionCode.INVALID;
+
+public class AccordMarkStale implements Transformation
+{
+    private static final Logger logger = 
LoggerFactory.getLogger(AccordMarkStale.class);
+    
+    private final Set<NodeId> ids;
+
+    public AccordMarkStale(Set<NodeId> ids)
+    {
+        this.ids = ids;
+    }
+
+    @Override
+    public Kind kind()
+    {
+        return Kind.ACCORD_MARK_STALE;
+    }
+
+    @Override
+    public Result execute(ClusterMetadata prev)
+    {
+        for (NodeId id : ids)
+            if (!prev.directory.peerIds().contains(id))
+                return new Rejected(INVALID, String.format("Can not mark node 
%s stale as it is not present in the directory.", id));
+
+        Set<Node.Id> accordIds = 
ids.stream().map(AccordTopology::tcmIdToAccord).collect(Collectors.toSet());
+
+        for (Node.Id id : accordIds)
+            if (prev.accordStaleReplicas.contains(id))
+                return new Rejected(INVALID, String.format("Can not mark node 
%s stale as it already is.", id));
+
+        for (KeyspaceMetadata keyspace : 
prev.schema.getKeyspaces().without(SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES))
+        {
+            List<AccordTopology.KeyspaceShard> shards = 
AccordTopology.KeyspaceShard.forKeyspace(keyspace, prev.placements, 
prev.directory);
+            
+            for (AccordTopology.KeyspaceShard shard : shards)
+            {
+                // We're trying to mark a node in this shard stale...
+                if (!Collections.disjoint(shard.nodes(), accordIds))
+                {
+                    int quorumSize = 
Shard.slowPathQuorumSize(shard.nodes().size());
+                    Set<Node.Id> nonStaleNodes = new HashSet<>(shard.nodes());
+                    nonStaleNodes.removeAll(accordIds);
+                    nonStaleNodes.removeAll(prev.accordStaleReplicas.ids());
+
+                    // ...but reject the transformation if this would bring us 
below quorum.
+                    if (nonStaleNodes.size() < quorumSize)
+                        return new Rejected(INVALID, String.format("Can not 
mark nodes %s stale as that would leave fewer than a quorum of nodes active for 
range %s in keyspace '%s'.",
+                                                                   accordIds, 
shard.range(), keyspace.name));
+                }
+            }
+        }
+
+        logger.info("Marking " + ids + " stale. They will no longer 
participate in durability status coordination...");
+        ClusterMetadata.Transformer next = 
prev.transformer().markStaleReplicas(accordIds);
+        return Transformation.success(next, LockedRanges.AffectedRanges.EMPTY);
+    }
+
+    @Override
+    public String toString()
+    {
+        return "AccordMarkStale{ids=" + ids + '}';
+    }
+
+    @Override
+    public boolean equals(Object o)
+    {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        AccordMarkStale that = (AccordMarkStale) o;
+        return Objects.equals(ids, that.ids);
+    }
+
+    @Override
+    public int hashCode()
+    {
+        return Objects.hash(ids);
+    }
+
+    public static final AsymmetricMetadataSerializer<Transformation, 
AccordMarkStale> serializer = new AsymmetricMetadataSerializer<>()
+    {
+        @Override
+        public void serialize(Transformation t, DataOutputPlus out, Version 
version) throws IOException
+        {
+            assert t instanceof AccordMarkStale;
+            AccordMarkStale mark = (AccordMarkStale) t;
+            CollectionSerializers.serializeCollection(mark.ids, out, version, 
NodeId.serializer);
+        }
+
+        @Override
+        public AccordMarkStale deserialize(DataInputPlus in, Version version) 
throws IOException
+        {
+            return new 
AccordMarkStale(CollectionSerializers.deserializeSet(in, version, 
NodeId.serializer));
+        }
+
+        @Override
+        public long serializedSize(Transformation t, Version version)
+        {
+            assert t instanceof AccordMarkStale;
+            AccordMarkStale mark = (AccordMarkStale) t;
+            return CollectionSerializers.serializedCollectionSize(mark.ids, 
version, NodeId.serializer);
+        }
+    };
+}
diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java 
b/src/java/org/apache/cassandra/tools/NodeProbe.java
index 6a07af6af1..aaa4b996b0 100644
--- a/src/java/org/apache/cassandra/tools/NodeProbe.java
+++ b/src/java/org/apache/cassandra/tools/NodeProbe.java
@@ -114,6 +114,8 @@ import org.apache.cassandra.service.GCInspectorMXBean;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageProxyMBean;
 import org.apache.cassandra.service.StorageServiceMBean;
+import org.apache.cassandra.service.accord.AccordOperations;
+import org.apache.cassandra.service.accord.AccordOperationsMBean;
 import org.apache.cassandra.streaming.StreamManagerMBean;
 import org.apache.cassandra.streaming.StreamState;
 import org.apache.cassandra.streaming.management.StreamStateCompositeData;
@@ -148,6 +150,7 @@ public class NodeProbe implements AutoCloseable
     protected CompactionManagerMBean compactionProxy;
     protected StorageServiceMBean ssProxy;
     protected CMSOperationsMBean cmsProxy;
+    protected AccordOperationsMBean accordProxy;
     protected GossiperMBean gossProxy;
     protected MemoryMXBean memProxy;
     protected GCInspectorMXBean gcProxy;
@@ -262,6 +265,8 @@ public class NodeProbe implements AutoCloseable
             ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
StorageServiceMBean.class);
             name = new ObjectName(CMSOperations.MBEAN_OBJECT_NAME);
             cmsProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
CMSOperationsMBean.class);
+            name = new ObjectName(AccordOperations.MBEAN_OBJECT_NAME);
+            accordProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
AccordOperationsMBean.class);
             name = new ObjectName(MessagingService.MBEAN_NAME);
             msProxy = JMX.newMBeanProxy(mbeanServerConn, name, 
MessagingServiceMBean.class);
             name = new ObjectName(StreamManagerMBean.OBJECT_NAME);
@@ -1238,6 +1243,11 @@ public class NodeProbe implements AutoCloseable
         return cmsProxy;
     }
 
+    public AccordOperationsMBean getAccordOperationsProxy()
+    {
+        return accordProxy;
+    }
+
     public GossiperMBean getGossProxy()
     {
         return gossProxy;
diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java 
b/src/java/org/apache/cassandra/tools/NodeTool.java
index dc314b32df..f7e27ed231 100644
--- a/src/java/org/apache/cassandra/tools/NodeTool.java
+++ b/src/java/org/apache/cassandra/tools/NodeTool.java
@@ -271,6 +271,13 @@ public class NodeTool
             .withCommands(ConsensusMigrationAdmin.ListCmd.class)
             .withCommands(ConsensusMigrationAdmin.FinishMigration.class);
 
+        builder.withGroup("accord")
+               .withDescription("Manage the operation of Accord")
+               .withDefaultCommand(AccordAdmin.Describe.class)
+               .withCommand(AccordAdmin.Describe.class)
+               .withCommand(AccordAdmin.MarkStale.class)
+               .withCommand(AccordAdmin.MarkRejoining.class);
+
         Cli<NodeToolCmdRunnable> parser = builder.build();
 
         int status = 0;
diff --git a/src/java/org/apache/cassandra/tools/nodetool/AccordAdmin.java 
b/src/java/org/apache/cassandra/tools/nodetool/AccordAdmin.java
new file mode 100644
index 0000000000..ff7e88ca9f
--- /dev/null
+++ b/src/java/org/apache/cassandra/tools/nodetool/AccordAdmin.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tools.nodetool;
+
+import java.util.List;
+import java.util.Map;
+
+import io.airlift.airline.Arguments;
+import io.airlift.airline.Command;
+import org.apache.cassandra.tools.NodeProbe;
+import org.apache.cassandra.tools.NodeTool;
+
+public abstract class AccordAdmin extends NodeTool.NodeToolCmd
+{
+    @Command(name = "describe", description = "Describe current cluster 
metadata relating to Accord")
+    public static class Describe extends NodeTool.NodeToolCmd
+    {
+        @Override
+        protected void execute(NodeProbe probe)
+        {
+            Map<String, String> info = 
probe.getAccordOperationsProxy().describe();
+            output.out.printf("Accord Service:%n");
+            output.out.printf("Epoch: %s%n", info.get("EPOCH"));
+            output.out.printf("Stale Replicas: %s%n", 
info.get("STALE_REPLICAS"));
+        }
+    }
+
+    @Command(name = "mark_stale", description = "Mark a replica as being stale 
and no longer able to participate in durability status coordination")
+    public static class MarkStale extends AccordAdmin
+    {
+        @Arguments(required = true, description = "One or more node IDs to 
mark stale", usage = "<nodeId>+")
+        public List<String> nodeIds;
+
+        @Override
+        protected void execute(NodeProbe probe)
+        {
+            probe.getAccordOperationsProxy().accordMarkStale(nodeIds);
+        }
+    }
+
+    @Command(name = "mark_rejoining", description = "Mark a stale replica as 
being allowed to participate in durability status coordination again")
+    public static class MarkRejoining extends AccordAdmin
+    {
+        @Arguments(required = true, description = "One or more node IDs to 
mark no longer stale", usage = "<nodeId>+")
+        public List<String> nodeIds;
+
+        @Override
+        protected void execute(NodeProbe probe)
+        {
+            probe.getAccordOperationsProxy().accordMarkRejoining(nodeIds);
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
 
b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
index a1a29f09cb..d70938af79 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java
@@ -44,6 +44,7 @@ import org.apache.cassandra.service.CacheServiceMBean;
 import org.apache.cassandra.service.GCInspector;
 import org.apache.cassandra.service.StorageProxy;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.service.accord.AccordOperations;
 import org.apache.cassandra.streaming.StreamManager;
 import org.apache.cassandra.tcm.CMSOperations;
 import org.apache.cassandra.tools.NodeProbe;
@@ -70,6 +71,7 @@ public class InternalNodeProbe extends NodeProbe
 
         ssProxy = StorageService.instance;
         cmsProxy = CMSOperations.instance;
+        accordProxy = AccordOperations.instance;
         msProxy = MessagingService.instance();
         streamProxy = StreamManager.instance;
         compactionProxy = CompactionManager.instance;
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolTest.java
 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolTest.java
new file mode 100644
index 0000000000..9af94b783d
--- /dev/null
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolTest.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test.accord;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import accord.local.Node;
+import org.apache.cassandra.tcm.ClusterMetadata;
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.api.Feature;
+import org.apache.cassandra.distributed.test.TestBaseImpl;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.apache.cassandra.distributed.shared.ClusterUtils.getNodeId;
+
+public class AccordNodetoolTest extends TestBaseImpl
+{
+    @Test
+    public void testMarkSingleNode() throws Throwable
+    {
+        try (Cluster cluster = init(builder().withNodes(3).withConfig((config) 
-> config.with(Feature.NETWORK, Feature.GOSSIP)).start()))
+        {
+            cluster.get(1).nodetoolResult("accord", "mark_stale", 
"1").asserts().success();
+            cluster.get(1).runOnInstance(() -> 
assertEquals(ImmutableSet.of(new Node.Id(1)), 
ClusterMetadata.current().accordStaleReplicas.ids()));
+            cluster.get(1).nodetoolResult("accord", 
"describe").asserts().stdoutContains("Stale Replicas: 1");
+
+            // Reject the operation if the target node is already stale:
+            cluster.get(1).nodetoolResult("accord", "mark_stale", 
"1").asserts().failure().errorContains("it already is");
+
+            // Reject the operation if marking the node stale brings us below 
a quorum of non-stale nodes:
+            cluster.get(1).nodetoolResult("accord", "mark_stale", 
"2").asserts().failure().errorContains("that would leave fewer than a quorum");
+
+            // Reject the operation if the target node doesn't exist:
+            cluster.get(1).nodetoolResult("accord", "mark_stale", 
"4").asserts().failure().errorContains("not present in the directory");
+            
+            cluster.get(1).nodetoolResult("accord", "mark_rejoining", 
"1").asserts().success();
+            cluster.get(1).runOnInstance(() -> 
assertEquals(Collections.emptySet(), 
ClusterMetadata.current().accordStaleReplicas.ids()));
+
+            cluster.get(1).nodetoolResult("accord", "mark_rejoining", 
"1").asserts().failure().errorContains("it is not stale");
+            cluster.get(1).nodetoolResult("accord", "mark_rejoining", 
"4").asserts().failure().errorContains("not present in the directory");
+        }
+    }
+
+    @Test
+    public void testMarkMultipleNodes() throws Throwable
+    {
+        try (Cluster cluster = init(builder().withNodes(5).withConfig((config) 
-> config.with(Feature.NETWORK, Feature.GOSSIP)).start()))
+        {
+            // Reject the operation if marking the node stale brings us below 
a quorum of non-stale nodes:
+            cluster.get(1).nodetoolResult("accord", "mark_stale", "1", "2", 
"3").asserts().failure().errorContains("that would leave fewer than a quorum");
+
+            cluster.get(1).nodetoolResult("accord", "mark_stale", "1", 
"2").asserts().success();
+            cluster.get(1).runOnInstance(() -> 
assertEquals(ImmutableSet.of(new Node.Id(1), new Node.Id(2)), 
ClusterMetadata.current().accordStaleReplicas.ids()));
+            cluster.get(1).nodetoolResult("accord", 
"describe").asserts().stdoutContains("Stale Replicas: 1,2");
+
+            // Reject the operation if a target node is already stale:
+            cluster.get(1).nodetoolResult("accord", "mark_stale", "1", 
"2").asserts().failure().errorContains("it already is");
+
+            // Reject the operation if a target node doesn't exist:
+            cluster.get(1).nodetoolResult("accord", "mark_stale", "4", 
"6").asserts().failure().errorContains("not present in the directory");
+
+            Map<Integer, Integer> nodeIdToNode = new HashMap<>();
+            for (int i = 1; i <= 5; i++)
+                nodeIdToNode.put(getNodeId(cluster.get(i)).id(), i);
+
+            // Remove the second stale node, and ensure the set of stale 
replicas is updated:
+            cluster.get(nodeIdToNode.get(2)).shutdown().get();
+            cluster.get(1).nodetoolResult("removenode", "2", 
"--force").asserts().success();
+            cluster.get(1).nodetoolResult("cms", "unregister", 
"2").asserts().success();
+            cluster.get(1).runOnInstance(() -> 
assertEquals(ImmutableSet.of(new Node.Id(1)), 
ClusterMetadata.current().accordStaleReplicas.ids()));
+
+            cluster.get(1).nodetoolResult("accord", "mark_rejoining", "1", 
"3").asserts().failure().errorContains("it is not stale");
+            cluster.get(1).nodetoolResult("accord", "mark_rejoining", "1", 
"6").asserts().failure().errorContains("not present in the directory");
+            cluster.get(1).runOnInstance(() -> 
assertEquals(ImmutableSet.of(new Node.Id(1)), 
ClusterMetadata.current().accordStaleReplicas.ids()));
+        }
+    }
+}
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
 
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
index b19c2c091e..e52b365d9a 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java
@@ -58,6 +58,7 @@ import org.apache.cassandra.schema.Schema;
 import org.apache.cassandra.schema.SchemaConstants;
 import org.apache.cassandra.schema.SchemaTransformation;
 import org.apache.cassandra.service.ClientState;
+import org.apache.cassandra.service.accord.AccordStaleReplicas;
 import org.apache.cassandra.tcm.AtomicLongBackedProcessor;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.ClusterMetadataService;
@@ -157,7 +158,8 @@ public class ClusterMetadataTestHelper
                                    LockedRanges.EMPTY,
                                    InProgressSequences.EMPTY,
                                    null,
-                                   ImmutableMap.of());
+                                   ImmutableMap.of(),
+                                   AccordStaleReplicas.EMPTY);
     }
 
     public static ClusterMetadata minimalForTesting(IPartitioner partitioner)
@@ -172,7 +174,8 @@ public class ClusterMetadataTestHelper
                                    null,
                                    null,
                                    null,
-                                   ImmutableMap.of());
+                                   ImmutableMap.of(),
+                                   AccordStaleReplicas.EMPTY);
     }
 
     public static ClusterMetadata minimalForTesting(Keyspaces keyspaces)
@@ -187,7 +190,8 @@ public class ClusterMetadataTestHelper
                                    null,
                                    null,
                                    null,
-                                   ImmutableMap.of());
+                                   ImmutableMap.of(),
+                                   AccordStaleReplicas.EMPTY);
     }
 
     public static ClusterMetadataService syncInstanceForTest()
diff --git a/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java 
b/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java
index ce3dfc7b7e..34828dd471 100644
--- a/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java
+++ b/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java
@@ -26,14 +26,15 @@ import java.util.Map;
 import java.util.Set;
 
 import com.google.common.collect.ImmutableMap;
-import org.apache.cassandra.service.accord.AccordFastPath;
-import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import org.junit.Assert;
 import org.junit.Test;
 
 import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.schema.DistributedSchema;
+import org.apache.cassandra.service.accord.AccordFastPath;
+import org.apache.cassandra.service.accord.AccordStaleReplicas;
+import 
org.apache.cassandra.service.consensus.migration.ConsensusMigrationState;
 import org.apache.cassandra.tcm.ClusterMetadata;
 import org.apache.cassandra.tcm.Epoch;
 import org.apache.cassandra.tcm.membership.Directory;
@@ -93,7 +94,8 @@ public class MetaStrategyTest
                                    LockedRanges.EMPTY,
                                    InProgressSequences.EMPTY,
                                    ConsensusMigrationState.EMPTY,
-                                   ImmutableMap.of());
+                                   ImmutableMap.of(),
+                                   AccordStaleReplicas.EMPTY);
     }
 
     @Test
@@ -159,4 +161,4 @@ public class MetaStrategyTest
     {
         return new Location(dc, rack);
     }
-}
\ No newline at end of file
+}
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java
 
b/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java
index 143f63a26c..4bec25b4d8 100644
--- 
a/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java
+++ 
b/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java
@@ -18,6 +18,15 @@
 
 package org.apache.cassandra.service.accord;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
 import accord.local.Node;
 import accord.topology.Shard;
 import accord.topology.Topology;
@@ -28,14 +37,6 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.schema.*;
 import org.apache.cassandra.service.accord.AccordFastPath.Status;
 import org.apache.cassandra.tcm.ClusterMetadata;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.cassandra.service.accord.AccordTestUtils.*;
 
diff --git 
a/test/unit/org/apache/cassandra/service/accord/AccordStaleReplicasTest.java 
b/test/unit/org/apache/cassandra/service/accord/AccordStaleReplicasTest.java
new file mode 100644
index 0000000000..83eced9b66
--- /dev/null
+++ b/test/unit/org/apache/cassandra/service/accord/AccordStaleReplicasTest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.service.accord;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.junit.Test;
+
+import accord.local.Node;
+import accord.utils.AccordGens;
+import accord.utils.Gen;
+import accord.utils.Gens;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.tcm.Epoch;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializers;
+import org.apache.cassandra.tcm.serialization.Version;
+
+import static accord.utils.Property.qt;
+
+public class AccordStaleReplicasTest
+{
+    @Test
+    public void serde()
+    {
+        try (DataOutputBuffer buffer = new DataOutputBuffer())
+        {
+            Gen<Set<Node.Id>> nodesGen = 
Gens.lists(AccordGens.nodes()).unique().ofSizeBetween(0, 9).map(nodes -> new 
HashSet<>(nodes));
+            Gen<Epoch> epochGen = AccordGens.epochs().map(Epoch::create);
+
+            qt().check(rs -> {
+                Epoch epoch = epochGen.next(rs);
+                Set<Node.Id> nodes = nodesGen.next(rs);
+                AsymmetricMetadataSerializers.testSerde(buffer, 
AccordStaleReplicas.serializer, new AccordStaleReplicas(nodes, epoch), 
Version.V2);
+            });
+        }
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTransformationTest.java 
b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTransformationTest.java
index f5e1ecdfe2..109fefeda2 100644
--- a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTransformationTest.java
+++ b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTransformationTest.java
@@ -309,6 +309,8 @@ public class ClusterMetadataTransformationTest
             return metadata.accordFastPath;
         else if (key == CONSENSUS_MIGRATION_STATE)
             return metadata.consensusMigrationState;
+        else if (key == ACCORD_STALE_REPLICAS)
+            return metadata.accordStaleReplicas;
 
         throw new IllegalArgumentException("Unknown metadata key " + key);
     }
diff --git 
a/test/unit/org/apache/cassandra/tcm/serialization/AsymmetricMetadataSerializers.java
 
b/test/unit/org/apache/cassandra/tcm/serialization/AsymmetricMetadataSerializers.java
new file mode 100644
index 0000000000..bd3cd4547e
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/tcm/serialization/AsymmetricMetadataSerializers.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.serialization;
+
+import java.io.IOException;
+
+import org.assertj.core.api.Assertions;
+
+import accord.utils.LazyToString;
+import accord.utils.ReflectionUtils;
+import org.apache.cassandra.io.util.DataInputBuffer;
+import org.apache.cassandra.io.util.DataOutputBuffer;
+
+public class AsymmetricMetadataSerializers
+{
+    public static <In, Out> void testSerde(DataOutputBuffer output, 
AsymmetricMetadataSerializer<In, Out> serializer, In input, Version version) 
throws IOException
+    {
+        output.clear();
+        long expectedSize = serializer.serializedSize(input, version);
+        serializer.serialize(input, output, version);
+        Assertions.assertThat(output.getLength()).describedAs("The serialized 
size and bytes written do not match").isEqualTo(expectedSize);
+        DataInputBuffer in = new 
DataInputBuffer(output.unsafeGetBufferAndFlip(), false);
+        Out read = serializer.deserialize(in, version);
+        Assertions.assertThat(read).describedAs("The deserialized output does 
not match the serialized input; difference %s", new LazyToString(() -> 
ReflectionUtils.recursiveEquals(read, input).toString())).isEqualTo(input);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkRejoiningTest.java
 
b/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkRejoiningTest.java
new file mode 100644
index 0000000000..032c8dd1ec
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkRejoiningTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializers;
+import org.apache.cassandra.tcm.serialization.Version;
+
+public class AccordMarkRejoiningTest
+{
+    @Test
+    public void shouldSerializeEmpty() throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        AsymmetricMetadataSerializers.testSerde(buffer, 
AccordMarkRejoining.serializer, new 
AccordMarkRejoining(Collections.emptySet()), Version.V2);
+    }
+
+    @Test
+    public void shouldSerializeSingleton() throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        AccordMarkRejoining markStale = new 
AccordMarkRejoining(Collections.singleton(NodeId.fromString("1")));
+        AsymmetricMetadataSerializers.testSerde(buffer, 
AccordMarkRejoining.serializer, markStale, Version.V2);
+    }
+
+    @Test
+    public void shouldSerializeMulti() throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        AccordMarkRejoining markStale = new 
AccordMarkRejoining(ImmutableSet.of(NodeId.fromString("1"), 
NodeId.fromString("2")));
+        AsymmetricMetadataSerializers.testSerde(buffer, 
AccordMarkRejoining.serializer, markStale, Version.V2);
+    }
+}
diff --git 
a/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkStaleTest.java 
b/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkStaleTest.java
new file mode 100644
index 0000000000..d794b3a2a9
--- /dev/null
+++ 
b/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkStaleTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.tcm.transformations;
+
+import java.io.IOException;
+import java.util.Collections;
+
+import com.google.common.collect.ImmutableSet;
+import org.junit.Test;
+
+import org.apache.cassandra.io.util.DataOutputBuffer;
+import org.apache.cassandra.tcm.membership.NodeId;
+import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializers;
+import org.apache.cassandra.tcm.serialization.Version;
+
+public class AccordMarkStaleTest
+{
+    @Test
+    public void shouldSerializeEmpty() throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        AsymmetricMetadataSerializers.testSerde(buffer, 
AccordMarkStale.serializer, new AccordMarkStale(Collections.emptySet()), 
Version.V2);
+    }
+
+    @Test
+    public void shouldSerializeSingleton() throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        AccordMarkStale markStale = new 
AccordMarkStale(Collections.singleton(NodeId.fromString("1")));
+        AsymmetricMetadataSerializers.testSerde(buffer, 
AccordMarkStale.serializer, markStale, Version.V2);
+    }
+
+    @Test
+    public void shouldSerializeMulti() throws IOException
+    {
+        DataOutputBuffer buffer = new DataOutputBuffer();
+        AccordMarkStale markStale = new 
AccordMarkStale(ImmutableSet.of(NodeId.fromString("1"), 
NodeId.fromString("2")));
+        AsymmetricMetadataSerializers.testSerde(buffer, 
AccordMarkStale.serializer, markStale, Version.V2);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to