This is an automated email from the ASF dual-hosted git repository. maedhroz pushed a commit to branch cep-15-accord in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/cep-15-accord by this push: new afdb57c4a6 Command to Exclude Replicas from Durability Status Coordination afdb57c4a6 is described below commit afdb57c4a6ae449377daf09d8c39688a8560eaa3 Author: Caleb Rackliffe <calebrackli...@gmail.com> AuthorDate: Tue Jul 16 17:40:49 2024 -0500 Command to Exclude Replicas from Durability Status Coordination patch by Caleb Rackliffe; reviewed by David Capwell, Sam Tunnicliffe, and Benedict Elliott Smith for CASSANDRA-19321 --- modules/accord | 2 +- .../apache/cassandra/service/CassandraDaemon.java | 2 + .../cassandra/service/accord/AccordOperations.java | 76 +++++++++++ .../service/accord/AccordOperationsMBean.java | 31 +++++ .../service/accord/AccordStaleReplicas.java | 137 +++++++++++++++++++ .../cassandra/service/accord/AccordTopology.java | 25 +++- .../accord/serializers/TopologySerializers.java | 5 +- .../org/apache/cassandra/tcm/ClusterMetadata.java | 77 +++++++++-- .../org/apache/cassandra/tcm/MetadataKeys.java | 2 + .../cassandra/tcm/StubClusterMetadataService.java | 4 +- .../org/apache/cassandra/tcm/Transformation.java | 2 + .../cassandra/tcm/compatibility/GossipHelper.java | 10 +- .../cassandra/tcm/serialization/Version.java | 1 + .../tcm/transformations/AccordMarkRejoining.java | 123 +++++++++++++++++ .../tcm/transformations/AccordMarkStale.java | 151 +++++++++++++++++++++ src/java/org/apache/cassandra/tools/NodeProbe.java | 10 ++ src/java/org/apache/cassandra/tools/NodeTool.java | 7 + .../cassandra/tools/nodetool/AccordAdmin.java | 69 ++++++++++ .../mock/nodetool/InternalNodeProbe.java | 2 + .../test/accord/AccordNodetoolTest.java | 99 ++++++++++++++ .../test/log/ClusterMetadataTestHelper.java | 10 +- .../apache/cassandra/locator/MetaStrategyTest.java | 10 +- .../accord/AccordFastPathCoordinatorTest.java | 17 +-- .../service/accord/AccordStaleReplicasTest.java | 54 ++++++++ .../tcm/ClusterMetadataTransformationTest.java | 2 + .../AsymmetricMetadataSerializers.java | 42 ++++++ .../transformations/AccordMarkRejoiningTest.java | 56 ++++++++ .../tcm/transformations/AccordMarkStaleTest.java | 56 ++++++++ 28 files changed, 1043 insertions(+), 39 deletions(-) diff --git a/modules/accord b/modules/accord index 449b2b4d0b..81c02769f9 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 449b2b4d0bf4bb44d55a3c57f712a4d5a15e7220 +Subproject commit 81c02769f9ad73ef3aba0675c2217fc74b8a4a4c diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 4ae0413529..a5e2be1e05 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -70,6 +70,7 @@ import org.apache.cassandra.net.StartupClusterConnectivityChecker; import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.security.ThreadAwareSecurityManager; +import org.apache.cassandra.service.accord.AccordOperations; import org.apache.cassandra.service.paxos.PaxosState; import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.tcm.CMSOperations; @@ -269,6 +270,7 @@ public class CassandraDaemon Startup.initialize(DatabaseDescriptor.getSeeds()); disableAutoCompaction(Schema.instance.distributedKeyspaces().names()); CMSOperations.initJmx(); + AccordOperations.initJmx(); } catch (InterruptedException | ExecutionException | IOException e) { diff --git a/src/java/org/apache/cassandra/service/accord/AccordOperations.java b/src/java/org/apache/cassandra/service/accord/AccordOperations.java new file mode 100644 index 0000000000..e7820919e1 --- /dev/null +++ b/src/java/org/apache/cassandra/service/accord/AccordOperations.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.ClusterMetadataService; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.transformations.AccordMarkStale; +import org.apache.cassandra.tcm.transformations.AccordMarkRejoining; +import org.apache.cassandra.utils.MBeanWrapper; + +public class AccordOperations implements AccordOperationsMBean +{ + public static final String MBEAN_OBJECT_NAME = "org.apache.cassandra.service.accord:type=AccordOperations"; + public static final AccordOperations instance = new AccordOperations(ClusterMetadataService.instance()); + + private final ClusterMetadataService cms; + + public static void initJmx() + { + MBeanWrapper.instance.registerMBean(instance, MBEAN_OBJECT_NAME); + } + + private AccordOperations(ClusterMetadataService cms) + { + this.cms = cms; + } + + @Override + public Map<String, String> describe() + { + Map<String, String> info = new HashMap<>(); + ClusterMetadata metadata = ClusterMetadata.current(); + + info.put("EPOCH", Long.toString(metadata.epoch.getEpoch())); + String staleReplicas = metadata.accordStaleReplicas.ids().stream().sorted().map(Object::toString).collect(Collectors.joining(",")); + info.put("STALE_REPLICAS", staleReplicas); + return info; + } + + @Override + public void accordMarkStale(List<String> nodeIdStrings) + { + Set<NodeId> nodeIds = nodeIdStrings.stream().map(NodeId::fromString).collect(Collectors.toSet()); + cms.commit(new AccordMarkStale(nodeIds)); + } + + @Override + public void accordMarkRejoining(List<String> nodeIdStrings) + { + Set<NodeId> nodeIds = nodeIdStrings.stream().map(NodeId::fromString).collect(Collectors.toSet()); + cms.commit(new AccordMarkRejoining(nodeIds)); + } +} diff --git a/src/java/org/apache/cassandra/service/accord/AccordOperationsMBean.java b/src/java/org/apache/cassandra/service/accord/AccordOperationsMBean.java new file mode 100644 index 0000000000..e0b0884733 --- /dev/null +++ b/src/java/org/apache/cassandra/service/accord/AccordOperationsMBean.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.List; +import java.util.Map; + +public interface AccordOperationsMBean +{ + Map<String, String> describe(); + + void accordMarkStale(List<String> nodeIds); + + void accordMarkRejoining(List<String> nodeIds); +} diff --git a/src/java/org/apache/cassandra/service/accord/AccordStaleReplicas.java b/src/java/org/apache/cassandra/service/accord/AccordStaleReplicas.java new file mode 100644 index 0000000000..2502fa1a63 --- /dev/null +++ b/src/java/org/apache/cassandra/service/accord/AccordStaleReplicas.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.io.IOException; +import java.util.Objects; +import java.util.Set; + +import javax.annotation.concurrent.Immutable; + +import com.google.common.collect.ImmutableSet; + +import accord.local.Node; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.service.accord.serializers.TopologySerializers; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.MetadataValue; +import org.apache.cassandra.tcm.serialization.MetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.CollectionSerializers; + +@Immutable +public class AccordStaleReplicas implements MetadataValue<AccordStaleReplicas> +{ + public static final AccordStaleReplicas EMPTY = new AccordStaleReplicas(ImmutableSet.of(), Epoch.EMPTY); + + private final Set<Node.Id> staleIds; + private final Epoch lastModified; + + AccordStaleReplicas(Set<Node.Id> staleIds, Epoch lastModified) + { + this.staleIds = staleIds; + this.lastModified = lastModified; + } + + @Override + public AccordStaleReplicas withLastModified(Epoch epoch) + { + return new AccordStaleReplicas(staleIds, epoch); + } + + @Override + public Epoch lastModified() + { + return lastModified; + } + + public AccordStaleReplicas withNodeIds(Set<Node.Id> ids) + { + ImmutableSet.Builder<Node.Id> builder = new ImmutableSet.Builder<>(); + Set<Node.Id> newIds = builder.addAll(staleIds).addAll(ids).build(); + return new AccordStaleReplicas(newIds, lastModified); + } + + public AccordStaleReplicas without(Set<Node.Id> ids) + { + ImmutableSet.Builder<Node.Id> builder = new ImmutableSet.Builder<>(); + + for (Node.Id staleId : staleIds) + if (!ids.contains(staleId)) + builder.add(staleId); + + return new AccordStaleReplicas(builder.build(), lastModified); + } + + public boolean contains(Node.Id nodeId) + { + return staleIds.contains(nodeId); + } + + public Set<Node.Id> ids() + { + return staleIds; + } + + @Override + public String toString() + { + return "AccordStaleReplicas{staleIds=" + staleIds + ", lastModified=" + lastModified + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AccordStaleReplicas that = (AccordStaleReplicas) o; + return Objects.equals(staleIds, that.staleIds) && Objects.equals(lastModified, that.lastModified); + } + + @Override + public int hashCode() + { + return Objects.hash(staleIds, lastModified); + } + + public static final MetadataSerializer<AccordStaleReplicas> serializer = new MetadataSerializer<>() + { + @Override + public void serialize(AccordStaleReplicas replicas, DataOutputPlus out, Version version) throws IOException + { + CollectionSerializers.serializeCollection(replicas.staleIds, out, version, TopologySerializers.nodeId); + Epoch.serializer.serialize(replicas.lastModified, out, version); + } + + @Override + public AccordStaleReplicas deserialize(DataInputPlus in, Version version) throws IOException + { + return new AccordStaleReplicas(CollectionSerializers.deserializeSet(in, version, TopologySerializers.nodeId), + Epoch.serializer.deserialize(in, version)); + } + + @Override + public long serializedSize(AccordStaleReplicas replicas, Version version) + { + return CollectionSerializers.serializedCollectionSize(replicas.staleIds, version, TopologySerializers.nodeId) + + Epoch.serializer.serializedSize(replicas.lastModified, version); + } + }; +} diff --git a/src/java/org/apache/cassandra/service/accord/AccordTopology.java b/src/java/org/apache/cassandra/service/accord/AccordTopology.java index 7bbfc0c250..45f3b3fd76 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordTopology.java +++ b/src/java/org/apache/cassandra/service/accord/AccordTopology.java @@ -78,7 +78,7 @@ public class AccordTopology } } - static class KeyspaceShard + public static class KeyspaceShard { private final KeyspaceMetadata keyspace; private final Range<Token> range; @@ -139,7 +139,7 @@ public class AccordTopology return new KeyspaceShard(keyspace, range, nodes, pending); } - public static List<KeyspaceShard> forKeyspace(KeyspaceMetadata keyspace, DataPlacements placements, Directory directory, ShardLookup lookup) + public static List<KeyspaceShard> forKeyspace(KeyspaceMetadata keyspace, DataPlacements placements, Directory directory) { ReplicationParams replication = keyspace.params.replication; DataPlacement placement = placements.get(replication); @@ -154,6 +154,16 @@ public class AccordTopology } return shards; } + + public List<Node.Id> nodes() + { + return nodes; + } + + public Range<Token> range() + { + return range; + } } static TokenRange minRange(TableId table, Token token) @@ -219,7 +229,9 @@ public class AccordTopology return builder.build(); } - public static Topology createAccordTopology(Epoch epoch, DistributedSchema schema, DataPlacements placements, Directory directory, AccordFastPath accordFastPath, ShardLookup lookup) + public static Topology createAccordTopology(Epoch epoch, DistributedSchema schema, DataPlacements placements, + Directory directory, AccordFastPath accordFastPath, ShardLookup lookup, + AccordStaleReplicas staleReplicas) { List<Shard> shards = new ArrayList<>(); Set<Node.Id> unavailable = accordFastPath.unavailableIds(); @@ -230,17 +242,18 @@ public class AccordTopology List<TableMetadata> tables = keyspace.tables.stream().filter(TableMetadata::requiresAccordSupport).collect(Collectors.toList()); if (tables.isEmpty()) continue; - List<KeyspaceShard> ksShards = KeyspaceShard.forKeyspace(keyspace, placements, directory, lookup); + List<KeyspaceShard> ksShards = KeyspaceShard.forKeyspace(keyspace, placements, directory); tables.forEach(table -> ksShards.forEach(shard -> shards.add(shard.createForTable(table, unavailable, dcMap, lookup)))); } shards.sort((a, b) -> a.range.compare(b.range)); - return new Topology(epoch.getEpoch(), shards.toArray(new Shard[0])); + + return new Topology(epoch.getEpoch(), staleReplicas.ids(), shards.toArray(new Shard[0])); } public static Topology createAccordTopology(ClusterMetadata metadata, ShardLookup lookup) { - return createAccordTopology(metadata.epoch, metadata.schema, metadata.placements, metadata.directory, metadata.accordFastPath, lookup); + return createAccordTopology(metadata.epoch, metadata.schema, metadata.placements, metadata.directory, metadata.accordFastPath, lookup, metadata.accordStaleReplicas); } public static Topology createAccordTopology(ClusterMetadata metadata, Topology current) diff --git a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java index 782ecbf5ed..73708c125f 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/TopologySerializers.java @@ -159,6 +159,7 @@ public class TopologySerializers { out.writeLong(topology.epoch()); ArraySerializers.serializeArray(topology.unsafeGetShards(), out, version, shard); + CollectionSerializers.serializeCollection(topology.staleIds(), out, version, TopologySerializers.nodeId); } @Override @@ -166,7 +167,8 @@ public class TopologySerializers { long epoch = in.readLong(); Shard[] shards = ArraySerializers.deserializeArray(in, version, shard, Shard[]::new); - return new Topology(epoch, shards); + Set<Node.Id> staleIds = CollectionSerializers.deserializeSet(in, version, TopologySerializers.nodeId); + return new Topology(epoch, staleIds, shards); } @Override @@ -175,6 +177,7 @@ public class TopologySerializers long size = 0; size += TypeSizes.LONG_SIZE; // epoch size += ArraySerializers.serializedArraySize(topology.unsafeGetShards(), version, shard); + size += CollectionSerializers.serializedCollectionSize(topology.staleIds(), version, TopologySerializers.nodeId); return size; } }; diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java index 415fe71456..5bde50f1b3 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadata.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadata.java @@ -21,6 +21,7 @@ package org.apache.cassandra.tcm; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -56,6 +57,8 @@ import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.schema.TableId; +import org.apache.cassandra.service.accord.AccordStaleReplicas; +import org.apache.cassandra.service.accord.AccordTopology; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.service.consensus.migration.TableMigrationState; import org.apache.cassandra.tcm.extensions.ExtensionKey; @@ -98,6 +101,7 @@ public class ClusterMetadata public final InProgressSequences inProgressSequences; public final ConsensusMigrationState consensusMigrationState; public final ImmutableMap<ExtensionKey<?,?>, ExtensionValue<?>> extensions; + public final AccordStaleReplicas accordStaleReplicas; // These two fields are lazy but only for the test purposes, since their computation requires initialization of the log ks private EndpointsForRange fullCMSReplicas; @@ -129,7 +133,8 @@ public class ClusterMetadata LockedRanges.EMPTY, InProgressSequences.EMPTY, ConsensusMigrationState.EMPTY, - ImmutableMap.of()); + ImmutableMap.of(), + AccordStaleReplicas.EMPTY); } public ClusterMetadata(Epoch epoch, @@ -142,7 +147,8 @@ public class ClusterMetadata LockedRanges lockedRanges, InProgressSequences inProgressSequences, ConsensusMigrationState consensusMigrationState, - Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions) + Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions, + AccordStaleReplicas accordStaleReplicas) { this(EMPTY_METADATA_IDENTIFIER, epoch, @@ -155,7 +161,8 @@ public class ClusterMetadata lockedRanges, inProgressSequences, consensusMigrationState, - extensions); + extensions, + accordStaleReplicas); } private ClusterMetadata(int metadataIdentifier, @@ -169,7 +176,8 @@ public class ClusterMetadata LockedRanges lockedRanges, InProgressSequences inProgressSequences, ConsensusMigrationState consensusMigrationState, - Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions) + Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions, + AccordStaleReplicas accordStaleReplicas) { // TODO: token map is a feature of the specific placement strategy, and so may not be a relevant component of // ClusterMetadata in the long term. We need to consider how the actual components of metadata can be evolved @@ -187,17 +195,18 @@ public class ClusterMetadata this.inProgressSequences = inProgressSequences; this.consensusMigrationState = consensusMigrationState; this.extensions = ImmutableMap.copyOf(extensions); + this.accordStaleReplicas = accordStaleReplicas; this.hashCode = computeHashCode(); } public ClusterMetadata withDirectory(Directory directory) { - return new ClusterMetadata(epoch, partitioner, schema, directory, tokenMap, placements, accordFastPath, lockedRanges, inProgressSequences, consensusMigrationState, extensions); + return new ClusterMetadata(epoch, partitioner, schema, directory, tokenMap, placements, accordFastPath, lockedRanges, inProgressSequences, consensusMigrationState, extensions, accordStaleReplicas); } public ClusterMetadata withPlacements(DataPlacements placements) { - return new ClusterMetadata(epoch, partitioner, schema, directory, tokenMap, placements, accordFastPath, lockedRanges, inProgressSequences, consensusMigrationState, extensions); + return new ClusterMetadata(epoch, partitioner, schema, directory, tokenMap, placements, accordFastPath, lockedRanges, inProgressSequences, consensusMigrationState, extensions, accordStaleReplicas); } public Set<InetAddressAndPort> fullCMSMembers() @@ -246,7 +255,8 @@ public class ClusterMetadata capLastModified(lockedRanges, epoch), capLastModified(inProgressSequences, epoch), capLastModified(consensusMigrationState, epoch), - capLastModified(extensions, epoch)); + capLastModified(extensions, epoch), + capLastModified(accordStaleReplicas, epoch)); } public ClusterMetadata initializeClusterIdentifier(int clusterIdentifier) @@ -268,7 +278,8 @@ public class ClusterMetadata lockedRanges, inProgressSequences, consensusMigrationState, - extensions); + extensions, + accordStaleReplicas); } private static Map<ExtensionKey<?,?>, ExtensionValue<?>> capLastModified(Map<ExtensionKey<?,?>, ExtensionValue<?>> original, Epoch maxEpoch) @@ -393,6 +404,7 @@ public class ClusterMetadata private ConsensusMigrationState consensusMigrationState; private final Map<ExtensionKey<?, ?>, ExtensionValue<?>> extensions; private final Set<MetadataKey> modifiedKeys; + private AccordStaleReplicas accordStaleReplicas; private Transformer(ClusterMetadata metadata, Epoch epoch) { @@ -409,6 +421,7 @@ public class ClusterMetadata this.consensusMigrationState = metadata.consensusMigrationState; extensions = new HashMap<>(metadata.extensions); modifiedKeys = new HashSet<>(); + accordStaleReplicas = metadata.accordStaleReplicas; } public Epoch epoch() @@ -437,6 +450,11 @@ public class ClusterMetadata public Transformer unregister(NodeId nodeId) { directory = directory.without(nodeId); + + Node.Id accordId = AccordTopology.tcmIdToAccord(nodeId); + if (accordStaleReplicas.contains(accordId)) + accordStaleReplicas = accordStaleReplicas.without(Collections.singleton(accordId)); + return this; } @@ -504,6 +522,11 @@ public class ClusterMetadata directory = directory.without(replaced) .withRackAndDC(replacement) .withNodeState(replacement, NodeState.JOINED); + + Node.Id accordId = AccordTopology.tcmIdToAccord(replaced); + if (accordStaleReplicas.contains(accordId)) + accordStaleReplicas = accordStaleReplicas.without(Collections.singleton(accordId)); + return this; } @@ -532,6 +555,18 @@ public class ClusterMetadata accordFastPath = accordFastPath.withNodeStatusSince(node, status, updateTimeMillis, updateDelayMillis); return this; } + + public Transformer markStaleReplicas(Set<Node.Id> ids) + { + accordStaleReplicas = accordStaleReplicas.withNodeIds(ids); + return this; + } + + public Transformer unmarkStaleReplicas(Set<Node.Id> ids) + { + accordStaleReplicas = accordStaleReplicas.without(ids); + return this; + } public Transformer with(LockedRanges lockedRanges) { @@ -658,6 +693,12 @@ public class ClusterMetadata modifiedKeys.add(MetadataKeys.ACCORD_FAST_PATH); accordFastPath = accordFastPath.withLastModified(epoch); } + + if (accordStaleReplicas != base.accordStaleReplicas) + { + modifiedKeys.add(MetadataKeys.ACCORD_STALE_REPLICAS); + accordStaleReplicas = accordStaleReplicas.withLastModified(epoch); + } if (lockedRanges != base.lockedRanges) { @@ -693,7 +734,8 @@ public class ClusterMetadata lockedRanges, inProgressSequences, consensusMigrationState, - extensions), + extensions, + accordStaleReplicas), ImmutableSet.copyOf(modifiedKeys)); } @@ -710,7 +752,8 @@ public class ClusterMetadata lockedRanges, inProgressSequences, consensusMigrationState, - extensions); + extensions, + accordStaleReplicas); } @Override @@ -975,7 +1018,9 @@ public class ClusterMetadata { AccordFastPath.serializer.serialize(metadata.accordFastPath, out, version); ConsensusMigrationState.serializer.serialize(metadata.consensusMigrationState, out, version); + AccordStaleReplicas.serializer.serialize(metadata.accordStaleReplicas, out, version); } + LockedRanges.serializer.serialize(metadata.lockedRanges, out, version); InProgressSequences.serializer.serialize(metadata.inProgressSequences, out, version); out.writeInt(metadata.extensions.size()); @@ -1012,18 +1057,24 @@ public class ClusterMetadata Directory dir = Directory.serializer.deserialize(in, version); TokenMap tokenMap = TokenMap.serializer.deserialize(in, version); DataPlacements placements = DataPlacements.serializer.deserialize(in, version); + AccordFastPath accordFastPath; ConsensusMigrationState consensusMigrationState; + AccordStaleReplicas staleReplicas; + if (version.isAtLeast(V2)) { accordFastPath = AccordFastPath.serializer.deserialize(in, version); consensusMigrationState = ConsensusMigrationState.serializer.deserialize(in, version); + staleReplicas = AccordStaleReplicas.serializer.deserialize(in, version); } else { accordFastPath = AccordFastPath.EMPTY; consensusMigrationState = ConsensusMigrationState.EMPTY; + staleReplicas = AccordStaleReplicas.EMPTY; } + LockedRanges lockedRanges = LockedRanges.serializer.deserialize(in, version); InProgressSequences ips = InProgressSequences.serializer.deserialize(in, version); int items = in.readInt(); @@ -1046,7 +1097,8 @@ public class ClusterMetadata lockedRanges, ips, consensusMigrationState, - extensions); + extensions, + staleReplicas); } @Override @@ -1070,7 +1122,8 @@ public class ClusterMetadata if (version.isAtLeast(V2)) { size += AccordFastPath.serializer.serializedSize(metadata.accordFastPath, version) + - ConsensusMigrationState.serializer.serializedSize(metadata.consensusMigrationState, version); + ConsensusMigrationState.serializer.serializedSize(metadata.consensusMigrationState, version) + + AccordStaleReplicas.serializer.serializedSize(metadata.accordStaleReplicas, version); } size += LockedRanges.serializer.serializedSize(metadata.lockedRanges, version) + diff --git a/src/java/org/apache/cassandra/tcm/MetadataKeys.java b/src/java/org/apache/cassandra/tcm/MetadataKeys.java index 59baeab58c..62bd72fe70 100644 --- a/src/java/org/apache/cassandra/tcm/MetadataKeys.java +++ b/src/java/org/apache/cassandra/tcm/MetadataKeys.java @@ -38,6 +38,7 @@ public class MetadataKeys public static final MetadataKey TOKEN_MAP = make(CORE_NS, "ownership", "token_map"); public static final MetadataKey DATA_PLACEMENTS = make(CORE_NS, "ownership", "data_placements"); public static final MetadataKey ACCORD_FAST_PATH = make(CORE_NS, "ownership", "accord_fast_path"); + public static final MetadataKey ACCORD_STALE_REPLICAS = make(CORE_NS, "ownership", "accord_stale_replicas"); public static final MetadataKey LOCKED_RANGES = make(CORE_NS, "sequences", "locked_ranges"); public static final MetadataKey IN_PROGRESS_SEQUENCES = make(CORE_NS, "sequences", "in_progress"); public static final MetadataKey CONSENSUS_MIGRATION_STATE = make(CORE_NS, "consensus", "migration_state"); @@ -47,6 +48,7 @@ public class MetadataKeys TOKEN_MAP, DATA_PLACEMENTS, ACCORD_FAST_PATH, + ACCORD_STALE_REPLICAS, LOCKED_RANGES, IN_PROGRESS_SEQUENCES, CONSENSUS_MIGRATION_STATE); diff --git a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java index abb3aa2603..42924d1690 100644 --- a/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/StubClusterMetadataService.java @@ -29,6 +29,7 @@ import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.KeyspaceMetadata; import org.apache.cassandra.schema.Keyspaces; import org.apache.cassandra.service.accord.AccordFastPath; +import org.apache.cassandra.service.accord.AccordStaleReplicas; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.tcm.Commit.Replicator; import org.apache.cassandra.tcm.log.Entry; @@ -185,7 +186,8 @@ public class StubClusterMetadataService extends ClusterMetadataService LockedRanges.EMPTY, InProgressSequences.EMPTY, ConsensusMigrationState.EMPTY, - ImmutableMap.of()); + ImmutableMap.of(), + AccordStaleReplicas.EMPTY); return new StubClusterMetadataService(new UniformRangePlacement(), snapshots != null ? snapshots : MetadataSnapshots.NO_OP, LocalLog.logSpec().withInitialState(initial).createLog(), diff --git a/src/java/org/apache/cassandra/tcm/Transformation.java b/src/java/org/apache/cassandra/tcm/Transformation.java index ea160482f3..e41a24159b 100644 --- a/src/java/org/apache/cassandra/tcm/Transformation.java +++ b/src/java/org/apache/cassandra/tcm/Transformation.java @@ -219,6 +219,8 @@ public interface Transformation BEGIN_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(36, () -> BeginConsensusMigrationForTableAndRange.serializer), MAYBE_FINISH_CONSENSUS_MIGRATION_FOR_TABLE_AND_RANGE(37, () -> MaybeFinishConsensusMigrationForTableAndRange.serializer), + ACCORD_MARK_STALE(38, () -> AccordMarkStale.serializer), + ACCORD_MARK_REJOINING(39, () -> AccordMarkRejoining.serializer), ; diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index 746523c46e..b08aebb871 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -52,6 +52,7 @@ import org.apache.cassandra.locator.InetAddressAndPort; import org.apache.cassandra.schema.DistributedSchema; import org.apache.cassandra.schema.SchemaKeyspace; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordStaleReplicas; import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; @@ -286,7 +287,8 @@ public class GossipHelper LockedRanges.EMPTY, InProgressSequences.EMPTY, ConsensusMigrationState.EMPTY, - Collections.emptyMap()); + Collections.emptyMap(), + AccordStaleReplicas.EMPTY); } public static ClusterMetadata fromEndpointStates(DistributedSchema schema, Map<InetAddressAndPort, EndpointState> epStates) @@ -358,7 +360,8 @@ public class GossipHelper LockedRanges.EMPTY, InProgressSequences.EMPTY, ConsensusMigrationState.EMPTY, - extensions); + extensions, + AccordStaleReplicas.EMPTY); return new ClusterMetadata(Epoch.UPGRADE_GOSSIP, partitioner, schema, @@ -369,7 +372,8 @@ public class GossipHelper LockedRanges.EMPTY, InProgressSequences.EMPTY, ConsensusMigrationState.EMPTY, - extensions); + extensions, + AccordStaleReplicas.EMPTY); } public static boolean isValidForClusterMetadata(Map<InetAddressAndPort, EndpointState> epstates) diff --git a/src/java/org/apache/cassandra/tcm/serialization/Version.java b/src/java/org/apache/cassandra/tcm/serialization/Version.java index 0fd6b5b1c1..e4021ecea0 100644 --- a/src/java/org/apache/cassandra/tcm/serialization/Version.java +++ b/src/java/org/apache/cassandra/tcm/serialization/Version.java @@ -37,6 +37,7 @@ public enum Version * - Added version to PlacementForRange serializer * - Serialize MemtableParams when serializing TableParams * - Added AccordFastPath + * - Added AccordStaleReplicas */ V2(2), diff --git a/src/java/org/apache/cassandra/tcm/transformations/AccordMarkRejoining.java b/src/java/org/apache/cassandra/tcm/transformations/AccordMarkRejoining.java new file mode 100644 index 0000000000..402d05cf50 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/transformations/AccordMarkRejoining.java @@ -0,0 +1,123 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations; + +import java.io.IOException; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.local.Node; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.service.accord.AccordTopology; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.sequences.LockedRanges; +import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.CollectionSerializers; + +import static org.apache.cassandra.exceptions.ExceptionCode.INVALID; + +public class AccordMarkRejoining implements Transformation +{ + private static final Logger logger = LoggerFactory.getLogger(AccordMarkRejoining.class); + + private final Set<NodeId> ids; + + public AccordMarkRejoining(Set<NodeId> ids) + { + this.ids = ids; + } + + @Override + public Kind kind() + { + return Kind.ACCORD_MARK_REJOINING; + } + + @Override + public Result execute(ClusterMetadata prev) + { + for (NodeId id : ids) + if (!prev.directory.peerIds().contains(id)) + return new Rejected(INVALID, String.format("Can not unmark node %s as it is not present in the directory.", id)); + + Set<Node.Id> accordIds = ids.stream().map(AccordTopology::tcmIdToAccord).collect(Collectors.toSet()); + + for (Node.Id id : accordIds) + if (!prev.accordStaleReplicas.contains(id)) + return new Rejected(INVALID, String.format("Can not unmark node %s as it is not stale.", id)); + + logger.info("Unmarking " + ids + ". They will now participate in durability status coordination..."); + ClusterMetadata.Transformer next = prev.transformer().unmarkStaleReplicas(accordIds); + return Transformation.success(next, LockedRanges.AffectedRanges.EMPTY); + } + + @Override + public String toString() + { + return "AccordMarkRejoining{ids=" + ids + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AccordMarkRejoining that = (AccordMarkRejoining) o; + return Objects.equals(ids, that.ids); + } + + @Override + public int hashCode() + { + return Objects.hash(ids); + } + + public static final AsymmetricMetadataSerializer<Transformation, AccordMarkRejoining> serializer = new AsymmetricMetadataSerializer<>() + { + @Override + public void serialize(Transformation t, DataOutputPlus out, Version version) throws IOException + { + assert t instanceof AccordMarkRejoining; + AccordMarkRejoining mark = (AccordMarkRejoining) t; + CollectionSerializers.serializeCollection(mark.ids, out, version, NodeId.serializer); + } + + @Override + public AccordMarkRejoining deserialize(DataInputPlus in, Version version) throws IOException + { + return new AccordMarkRejoining(CollectionSerializers.deserializeSet(in, version, NodeId.serializer)); + } + + @Override + public long serializedSize(Transformation t, Version version) + { + assert t instanceof AccordMarkRejoining; + AccordMarkRejoining mark = (AccordMarkRejoining) t; + return CollectionSerializers.serializedCollectionSize(mark.ids, version, NodeId.serializer); + } + }; +} diff --git a/src/java/org/apache/cassandra/tcm/transformations/AccordMarkStale.java b/src/java/org/apache/cassandra/tcm/transformations/AccordMarkStale.java new file mode 100644 index 0000000000..261e0a4eb2 --- /dev/null +++ b/src/java/org/apache/cassandra/tcm/transformations/AccordMarkStale.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import accord.local.Node; +import accord.topology.Shard; +import org.apache.cassandra.io.util.DataInputPlus; +import org.apache.cassandra.io.util.DataOutputPlus; +import org.apache.cassandra.schema.KeyspaceMetadata; +import org.apache.cassandra.schema.SchemaConstants; +import org.apache.cassandra.service.accord.AccordTopology; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.Transformation; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.sequences.LockedRanges; +import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializer; +import org.apache.cassandra.tcm.serialization.Version; +import org.apache.cassandra.utils.CollectionSerializers; + +import static org.apache.cassandra.exceptions.ExceptionCode.INVALID; + +public class AccordMarkStale implements Transformation +{ + private static final Logger logger = LoggerFactory.getLogger(AccordMarkStale.class); + + private final Set<NodeId> ids; + + public AccordMarkStale(Set<NodeId> ids) + { + this.ids = ids; + } + + @Override + public Kind kind() + { + return Kind.ACCORD_MARK_STALE; + } + + @Override + public Result execute(ClusterMetadata prev) + { + for (NodeId id : ids) + if (!prev.directory.peerIds().contains(id)) + return new Rejected(INVALID, String.format("Can not mark node %s stale as it is not present in the directory.", id)); + + Set<Node.Id> accordIds = ids.stream().map(AccordTopology::tcmIdToAccord).collect(Collectors.toSet()); + + for (Node.Id id : accordIds) + if (prev.accordStaleReplicas.contains(id)) + return new Rejected(INVALID, String.format("Can not mark node %s stale as it already is.", id)); + + for (KeyspaceMetadata keyspace : prev.schema.getKeyspaces().without(SchemaConstants.REPLICATED_SYSTEM_KEYSPACE_NAMES)) + { + List<AccordTopology.KeyspaceShard> shards = AccordTopology.KeyspaceShard.forKeyspace(keyspace, prev.placements, prev.directory); + + for (AccordTopology.KeyspaceShard shard : shards) + { + // We're trying to mark a node in this shard stale... + if (!Collections.disjoint(shard.nodes(), accordIds)) + { + int quorumSize = Shard.slowPathQuorumSize(shard.nodes().size()); + Set<Node.Id> nonStaleNodes = new HashSet<>(shard.nodes()); + nonStaleNodes.removeAll(accordIds); + nonStaleNodes.removeAll(prev.accordStaleReplicas.ids()); + + // ...but reject the transformation if this would bring us below quorum. + if (nonStaleNodes.size() < quorumSize) + return new Rejected(INVALID, String.format("Can not mark nodes %s stale as that would leave fewer than a quorum of nodes active for range %s in keyspace '%s'.", + accordIds, shard.range(), keyspace.name)); + } + } + } + + logger.info("Marking " + ids + " stale. They will no longer participate in durability status coordination..."); + ClusterMetadata.Transformer next = prev.transformer().markStaleReplicas(accordIds); + return Transformation.success(next, LockedRanges.AffectedRanges.EMPTY); + } + + @Override + public String toString() + { + return "AccordMarkStale{ids=" + ids + '}'; + } + + @Override + public boolean equals(Object o) + { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + AccordMarkStale that = (AccordMarkStale) o; + return Objects.equals(ids, that.ids); + } + + @Override + public int hashCode() + { + return Objects.hash(ids); + } + + public static final AsymmetricMetadataSerializer<Transformation, AccordMarkStale> serializer = new AsymmetricMetadataSerializer<>() + { + @Override + public void serialize(Transformation t, DataOutputPlus out, Version version) throws IOException + { + assert t instanceof AccordMarkStale; + AccordMarkStale mark = (AccordMarkStale) t; + CollectionSerializers.serializeCollection(mark.ids, out, version, NodeId.serializer); + } + + @Override + public AccordMarkStale deserialize(DataInputPlus in, Version version) throws IOException + { + return new AccordMarkStale(CollectionSerializers.deserializeSet(in, version, NodeId.serializer)); + } + + @Override + public long serializedSize(Transformation t, Version version) + { + assert t instanceof AccordMarkStale; + AccordMarkStale mark = (AccordMarkStale) t; + return CollectionSerializers.serializedCollectionSize(mark.ids, version, NodeId.serializer); + } + }; +} diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 6a07af6af1..aaa4b996b0 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -114,6 +114,8 @@ import org.apache.cassandra.service.GCInspectorMXBean; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageProxyMBean; import org.apache.cassandra.service.StorageServiceMBean; +import org.apache.cassandra.service.accord.AccordOperations; +import org.apache.cassandra.service.accord.AccordOperationsMBean; import org.apache.cassandra.streaming.StreamManagerMBean; import org.apache.cassandra.streaming.StreamState; import org.apache.cassandra.streaming.management.StreamStateCompositeData; @@ -148,6 +150,7 @@ public class NodeProbe implements AutoCloseable protected CompactionManagerMBean compactionProxy; protected StorageServiceMBean ssProxy; protected CMSOperationsMBean cmsProxy; + protected AccordOperationsMBean accordProxy; protected GossiperMBean gossProxy; protected MemoryMXBean memProxy; protected GCInspectorMXBean gcProxy; @@ -262,6 +265,8 @@ public class NodeProbe implements AutoCloseable ssProxy = JMX.newMBeanProxy(mbeanServerConn, name, StorageServiceMBean.class); name = new ObjectName(CMSOperations.MBEAN_OBJECT_NAME); cmsProxy = JMX.newMBeanProxy(mbeanServerConn, name, CMSOperationsMBean.class); + name = new ObjectName(AccordOperations.MBEAN_OBJECT_NAME); + accordProxy = JMX.newMBeanProxy(mbeanServerConn, name, AccordOperationsMBean.class); name = new ObjectName(MessagingService.MBEAN_NAME); msProxy = JMX.newMBeanProxy(mbeanServerConn, name, MessagingServiceMBean.class); name = new ObjectName(StreamManagerMBean.OBJECT_NAME); @@ -1238,6 +1243,11 @@ public class NodeProbe implements AutoCloseable return cmsProxy; } + public AccordOperationsMBean getAccordOperationsProxy() + { + return accordProxy; + } + public GossiperMBean getGossProxy() { return gossProxy; diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index dc314b32df..f7e27ed231 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -271,6 +271,13 @@ public class NodeTool .withCommands(ConsensusMigrationAdmin.ListCmd.class) .withCommands(ConsensusMigrationAdmin.FinishMigration.class); + builder.withGroup("accord") + .withDescription("Manage the operation of Accord") + .withDefaultCommand(AccordAdmin.Describe.class) + .withCommand(AccordAdmin.Describe.class) + .withCommand(AccordAdmin.MarkStale.class) + .withCommand(AccordAdmin.MarkRejoining.class); + Cli<NodeToolCmdRunnable> parser = builder.build(); int status = 0; diff --git a/src/java/org/apache/cassandra/tools/nodetool/AccordAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/AccordAdmin.java new file mode 100644 index 0000000000..ff7e88ca9f --- /dev/null +++ b/src/java/org/apache/cassandra/tools/nodetool/AccordAdmin.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tools.nodetool; + +import java.util.List; +import java.util.Map; + +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import org.apache.cassandra.tools.NodeProbe; +import org.apache.cassandra.tools.NodeTool; + +public abstract class AccordAdmin extends NodeTool.NodeToolCmd +{ + @Command(name = "describe", description = "Describe current cluster metadata relating to Accord") + public static class Describe extends NodeTool.NodeToolCmd + { + @Override + protected void execute(NodeProbe probe) + { + Map<String, String> info = probe.getAccordOperationsProxy().describe(); + output.out.printf("Accord Service:%n"); + output.out.printf("Epoch: %s%n", info.get("EPOCH")); + output.out.printf("Stale Replicas: %s%n", info.get("STALE_REPLICAS")); + } + } + + @Command(name = "mark_stale", description = "Mark a replica as being stale and no longer able to participate in durability status coordination") + public static class MarkStale extends AccordAdmin + { + @Arguments(required = true, description = "One or more node IDs to mark stale", usage = "<nodeId>+") + public List<String> nodeIds; + + @Override + protected void execute(NodeProbe probe) + { + probe.getAccordOperationsProxy().accordMarkStale(nodeIds); + } + } + + @Command(name = "mark_rejoining", description = "Mark a stale replica as being allowed to participate in durability status coordination again") + public static class MarkRejoining extends AccordAdmin + { + @Arguments(required = true, description = "One or more node IDs to mark no longer stale", usage = "<nodeId>+") + public List<String> nodeIds; + + @Override + protected void execute(NodeProbe probe) + { + probe.getAccordOperationsProxy().accordMarkRejoining(nodeIds); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java index a1a29f09cb..d70938af79 100644 --- a/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java +++ b/test/distributed/org/apache/cassandra/distributed/mock/nodetool/InternalNodeProbe.java @@ -44,6 +44,7 @@ import org.apache.cassandra.service.CacheServiceMBean; import org.apache.cassandra.service.GCInspector; import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.service.accord.AccordOperations; import org.apache.cassandra.streaming.StreamManager; import org.apache.cassandra.tcm.CMSOperations; import org.apache.cassandra.tools.NodeProbe; @@ -70,6 +71,7 @@ public class InternalNodeProbe extends NodeProbe ssProxy = StorageService.instance; cmsProxy = CMSOperations.instance; + accordProxy = AccordOperations.instance; msProxy = MessagingService.instance(); streamProxy = StreamManager.instance; compactionProxy = CompactionManager.instance; diff --git a/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolTest.java b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolTest.java new file mode 100644 index 0000000000..9af94b783d --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/accord/AccordNodetoolTest.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test.accord; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import accord.local.Node; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.distributed.test.TestBaseImpl; + + +import static org.junit.Assert.assertEquals; +import static org.apache.cassandra.distributed.shared.ClusterUtils.getNodeId; + +public class AccordNodetoolTest extends TestBaseImpl +{ + @Test + public void testMarkSingleNode() throws Throwable + { + try (Cluster cluster = init(builder().withNodes(3).withConfig((config) -> config.with(Feature.NETWORK, Feature.GOSSIP)).start())) + { + cluster.get(1).nodetoolResult("accord", "mark_stale", "1").asserts().success(); + cluster.get(1).runOnInstance(() -> assertEquals(ImmutableSet.of(new Node.Id(1)), ClusterMetadata.current().accordStaleReplicas.ids())); + cluster.get(1).nodetoolResult("accord", "describe").asserts().stdoutContains("Stale Replicas: 1"); + + // Reject the operation if the target node is already stale: + cluster.get(1).nodetoolResult("accord", "mark_stale", "1").asserts().failure().errorContains("it already is"); + + // Reject the operation if marking the node stale brings us below a quorum of non-stale nodes: + cluster.get(1).nodetoolResult("accord", "mark_stale", "2").asserts().failure().errorContains("that would leave fewer than a quorum"); + + // Reject the operation if the target node doesn't exist: + cluster.get(1).nodetoolResult("accord", "mark_stale", "4").asserts().failure().errorContains("not present in the directory"); + + cluster.get(1).nodetoolResult("accord", "mark_rejoining", "1").asserts().success(); + cluster.get(1).runOnInstance(() -> assertEquals(Collections.emptySet(), ClusterMetadata.current().accordStaleReplicas.ids())); + + cluster.get(1).nodetoolResult("accord", "mark_rejoining", "1").asserts().failure().errorContains("it is not stale"); + cluster.get(1).nodetoolResult("accord", "mark_rejoining", "4").asserts().failure().errorContains("not present in the directory"); + } + } + + @Test + public void testMarkMultipleNodes() throws Throwable + { + try (Cluster cluster = init(builder().withNodes(5).withConfig((config) -> config.with(Feature.NETWORK, Feature.GOSSIP)).start())) + { + // Reject the operation if marking the node stale brings us below a quorum of non-stale nodes: + cluster.get(1).nodetoolResult("accord", "mark_stale", "1", "2", "3").asserts().failure().errorContains("that would leave fewer than a quorum"); + + cluster.get(1).nodetoolResult("accord", "mark_stale", "1", "2").asserts().success(); + cluster.get(1).runOnInstance(() -> assertEquals(ImmutableSet.of(new Node.Id(1), new Node.Id(2)), ClusterMetadata.current().accordStaleReplicas.ids())); + cluster.get(1).nodetoolResult("accord", "describe").asserts().stdoutContains("Stale Replicas: 1,2"); + + // Reject the operation if a target node is already stale: + cluster.get(1).nodetoolResult("accord", "mark_stale", "1", "2").asserts().failure().errorContains("it already is"); + + // Reject the operation if a target node doesn't exist: + cluster.get(1).nodetoolResult("accord", "mark_stale", "4", "6").asserts().failure().errorContains("not present in the directory"); + + Map<Integer, Integer> nodeIdToNode = new HashMap<>(); + for (int i = 1; i <= 5; i++) + nodeIdToNode.put(getNodeId(cluster.get(i)).id(), i); + + // Remove the second stale node, and ensure the set of stale replicas is updated: + cluster.get(nodeIdToNode.get(2)).shutdown().get(); + cluster.get(1).nodetoolResult("removenode", "2", "--force").asserts().success(); + cluster.get(1).nodetoolResult("cms", "unregister", "2").asserts().success(); + cluster.get(1).runOnInstance(() -> assertEquals(ImmutableSet.of(new Node.Id(1)), ClusterMetadata.current().accordStaleReplicas.ids())); + + cluster.get(1).nodetoolResult("accord", "mark_rejoining", "1", "3").asserts().failure().errorContains("it is not stale"); + cluster.get(1).nodetoolResult("accord", "mark_rejoining", "1", "6").asserts().failure().errorContains("not present in the directory"); + cluster.get(1).runOnInstance(() -> assertEquals(ImmutableSet.of(new Node.Id(1)), ClusterMetadata.current().accordStaleReplicas.ids())); + } + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java index b19c2c091e..e52b365d9a 100644 --- a/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/test/log/ClusterMetadataTestHelper.java @@ -58,6 +58,7 @@ import org.apache.cassandra.schema.Schema; import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.schema.SchemaTransformation; import org.apache.cassandra.service.ClientState; +import org.apache.cassandra.service.accord.AccordStaleReplicas; import org.apache.cassandra.tcm.AtomicLongBackedProcessor; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.ClusterMetadataService; @@ -157,7 +158,8 @@ public class ClusterMetadataTestHelper LockedRanges.EMPTY, InProgressSequences.EMPTY, null, - ImmutableMap.of()); + ImmutableMap.of(), + AccordStaleReplicas.EMPTY); } public static ClusterMetadata minimalForTesting(IPartitioner partitioner) @@ -172,7 +174,8 @@ public class ClusterMetadataTestHelper null, null, null, - ImmutableMap.of()); + ImmutableMap.of(), + AccordStaleReplicas.EMPTY); } public static ClusterMetadata minimalForTesting(Keyspaces keyspaces) @@ -187,7 +190,8 @@ public class ClusterMetadataTestHelper null, null, null, - ImmutableMap.of()); + ImmutableMap.of(), + AccordStaleReplicas.EMPTY); } public static ClusterMetadataService syncInstanceForTest() diff --git a/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java b/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java index ce3dfc7b7e..34828dd471 100644 --- a/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java +++ b/test/unit/org/apache/cassandra/locator/MetaStrategyTest.java @@ -26,14 +26,15 @@ import java.util.Map; import java.util.Set; import com.google.common.collect.ImmutableMap; -import org.apache.cassandra.service.accord.AccordFastPath; -import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.junit.Assert; import org.junit.Test; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.schema.DistributedSchema; +import org.apache.cassandra.service.accord.AccordFastPath; +import org.apache.cassandra.service.accord.AccordStaleReplicas; +import org.apache.cassandra.service.consensus.migration.ConsensusMigrationState; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; import org.apache.cassandra.tcm.membership.Directory; @@ -93,7 +94,8 @@ public class MetaStrategyTest LockedRanges.EMPTY, InProgressSequences.EMPTY, ConsensusMigrationState.EMPTY, - ImmutableMap.of()); + ImmutableMap.of(), + AccordStaleReplicas.EMPTY); } @Test @@ -159,4 +161,4 @@ public class MetaStrategyTest { return new Location(dc, rack); } -} \ No newline at end of file +} diff --git a/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java b/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java index 143f63a26c..4bec25b4d8 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordFastPathCoordinatorTest.java @@ -18,6 +18,15 @@ package org.apache.cassandra.service.accord; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.TimeUnit; + +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + import accord.local.Node; import accord.topology.Shard; import accord.topology.Topology; @@ -28,14 +37,6 @@ import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.schema.*; import org.apache.cassandra.service.accord.AccordFastPath.Status; import org.apache.cassandra.tcm.ClusterMetadata; -import org.junit.Assert; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.TimeUnit; import static org.apache.cassandra.service.accord.AccordTestUtils.*; diff --git a/test/unit/org/apache/cassandra/service/accord/AccordStaleReplicasTest.java b/test/unit/org/apache/cassandra/service/accord/AccordStaleReplicasTest.java new file mode 100644 index 0000000000..83eced9b66 --- /dev/null +++ b/test/unit/org/apache/cassandra/service/accord/AccordStaleReplicasTest.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.service.accord; + +import java.util.HashSet; +import java.util.Set; + +import org.junit.Test; + +import accord.local.Node; +import accord.utils.AccordGens; +import accord.utils.Gen; +import accord.utils.Gens; +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.tcm.Epoch; +import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializers; +import org.apache.cassandra.tcm.serialization.Version; + +import static accord.utils.Property.qt; + +public class AccordStaleReplicasTest +{ + @Test + public void serde() + { + try (DataOutputBuffer buffer = new DataOutputBuffer()) + { + Gen<Set<Node.Id>> nodesGen = Gens.lists(AccordGens.nodes()).unique().ofSizeBetween(0, 9).map(nodes -> new HashSet<>(nodes)); + Gen<Epoch> epochGen = AccordGens.epochs().map(Epoch::create); + + qt().check(rs -> { + Epoch epoch = epochGen.next(rs); + Set<Node.Id> nodes = nodesGen.next(rs); + AsymmetricMetadataSerializers.testSerde(buffer, AccordStaleReplicas.serializer, new AccordStaleReplicas(nodes, epoch), Version.V2); + }); + } + } +} diff --git a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTransformationTest.java b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTransformationTest.java index f5e1ecdfe2..109fefeda2 100644 --- a/test/unit/org/apache/cassandra/tcm/ClusterMetadataTransformationTest.java +++ b/test/unit/org/apache/cassandra/tcm/ClusterMetadataTransformationTest.java @@ -309,6 +309,8 @@ public class ClusterMetadataTransformationTest return metadata.accordFastPath; else if (key == CONSENSUS_MIGRATION_STATE) return metadata.consensusMigrationState; + else if (key == ACCORD_STALE_REPLICAS) + return metadata.accordStaleReplicas; throw new IllegalArgumentException("Unknown metadata key " + key); } diff --git a/test/unit/org/apache/cassandra/tcm/serialization/AsymmetricMetadataSerializers.java b/test/unit/org/apache/cassandra/tcm/serialization/AsymmetricMetadataSerializers.java new file mode 100644 index 0000000000..bd3cd4547e --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/serialization/AsymmetricMetadataSerializers.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.serialization; + +import java.io.IOException; + +import org.assertj.core.api.Assertions; + +import accord.utils.LazyToString; +import accord.utils.ReflectionUtils; +import org.apache.cassandra.io.util.DataInputBuffer; +import org.apache.cassandra.io.util.DataOutputBuffer; + +public class AsymmetricMetadataSerializers +{ + public static <In, Out> void testSerde(DataOutputBuffer output, AsymmetricMetadataSerializer<In, Out> serializer, In input, Version version) throws IOException + { + output.clear(); + long expectedSize = serializer.serializedSize(input, version); + serializer.serialize(input, output, version); + Assertions.assertThat(output.getLength()).describedAs("The serialized size and bytes written do not match").isEqualTo(expectedSize); + DataInputBuffer in = new DataInputBuffer(output.unsafeGetBufferAndFlip(), false); + Out read = serializer.deserialize(in, version); + Assertions.assertThat(read).describedAs("The deserialized output does not match the serialized input; difference %s", new LazyToString(() -> ReflectionUtils.recursiveEquals(read, input).toString())).isEqualTo(input); + } +} diff --git a/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkRejoiningTest.java b/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkRejoiningTest.java new file mode 100644 index 0000000000..032c8dd1ec --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkRejoiningTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations; + +import java.io.IOException; +import java.util.Collections; + +import com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializers; +import org.apache.cassandra.tcm.serialization.Version; + +public class AccordMarkRejoiningTest +{ + @Test + public void shouldSerializeEmpty() throws IOException + { + DataOutputBuffer buffer = new DataOutputBuffer(); + AsymmetricMetadataSerializers.testSerde(buffer, AccordMarkRejoining.serializer, new AccordMarkRejoining(Collections.emptySet()), Version.V2); + } + + @Test + public void shouldSerializeSingleton() throws IOException + { + DataOutputBuffer buffer = new DataOutputBuffer(); + AccordMarkRejoining markStale = new AccordMarkRejoining(Collections.singleton(NodeId.fromString("1"))); + AsymmetricMetadataSerializers.testSerde(buffer, AccordMarkRejoining.serializer, markStale, Version.V2); + } + + @Test + public void shouldSerializeMulti() throws IOException + { + DataOutputBuffer buffer = new DataOutputBuffer(); + AccordMarkRejoining markStale = new AccordMarkRejoining(ImmutableSet.of(NodeId.fromString("1"), NodeId.fromString("2"))); + AsymmetricMetadataSerializers.testSerde(buffer, AccordMarkRejoining.serializer, markStale, Version.V2); + } +} diff --git a/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkStaleTest.java b/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkStaleTest.java new file mode 100644 index 0000000000..d794b3a2a9 --- /dev/null +++ b/test/unit/org/apache/cassandra/tcm/transformations/AccordMarkStaleTest.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.tcm.transformations; + +import java.io.IOException; +import java.util.Collections; + +import com.google.common.collect.ImmutableSet; +import org.junit.Test; + +import org.apache.cassandra.io.util.DataOutputBuffer; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.serialization.AsymmetricMetadataSerializers; +import org.apache.cassandra.tcm.serialization.Version; + +public class AccordMarkStaleTest +{ + @Test + public void shouldSerializeEmpty() throws IOException + { + DataOutputBuffer buffer = new DataOutputBuffer(); + AsymmetricMetadataSerializers.testSerde(buffer, AccordMarkStale.serializer, new AccordMarkStale(Collections.emptySet()), Version.V2); + } + + @Test + public void shouldSerializeSingleton() throws IOException + { + DataOutputBuffer buffer = new DataOutputBuffer(); + AccordMarkStale markStale = new AccordMarkStale(Collections.singleton(NodeId.fromString("1"))); + AsymmetricMetadataSerializers.testSerde(buffer, AccordMarkStale.serializer, markStale, Version.V2); + } + + @Test + public void shouldSerializeMulti() throws IOException + { + DataOutputBuffer buffer = new DataOutputBuffer(); + AccordMarkStale markStale = new AccordMarkStale(ImmutableSet.of(NodeId.fromString("1"), NodeId.fromString("2"))); + AsymmetricMetadataSerializers.testSerde(buffer, AccordMarkStale.serializer, markStale, Version.V2); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org