This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 4fb81ea483dc8d95dac11c234ca6cb3aaeac44a8 Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Mon Mar 24 13:27:38 2025 +0100 Add nodetool command to abort failed nodetool cms initialize Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-20482 --- CHANGES.txt | 1 + .../org/apache/cassandra/tcm/CMSOperations.java | 6 +++ .../apache/cassandra/tcm/CMSOperationsMBean.java | 1 + .../apache/cassandra/tcm/migration/Election.java | 44 +++++++++++++---- src/java/org/apache/cassandra/tools/NodeTool.java | 3 +- .../apache/cassandra/tools/nodetool/CMSAdmin.java | 13 +++++ .../ClusterMetadataUpgradeAbortMigrationTest.java | 55 ++++++++++++++++++++++ .../upgrade/ClusterMetadataUpgradeTest.java | 3 +- 8 files changed, 115 insertions(+), 11 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 1360ab01bc..864e664620 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Add nodetool command to abort failed nodetool cms initialize (CASSANDRA-20482) * Repair Paxos for the distributed metadata log when CMS membership changes (CASSANDRA-20467) * Reintroduce CASSANDRA-17411 in trunk (CASSANDRA-19346) * Add min/max/mean/percentiles to timer metrics vtable (CASSANDRA-20466) diff --git a/src/java/org/apache/cassandra/tcm/CMSOperations.java b/src/java/org/apache/cassandra/tcm/CMSOperations.java index f4a608b31e..b37da9dd94 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperations.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperations.java @@ -35,6 +35,7 @@ import org.apache.cassandra.schema.ReplicationParams; import org.apache.cassandra.tcm.membership.NodeId; import org.apache.cassandra.tcm.membership.NodeState; import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.migration.Election; import org.apache.cassandra.tcm.sequences.CancelCMSReconfiguration; import org.apache.cassandra.tcm.sequences.InProgressSequences; import org.apache.cassandra.tcm.sequences.ReconfigureCMS; @@ -80,6 +81,11 @@ public class CMSOperations implements CMSOperationsMBean cms.upgradeFromGossip(ignoredEndpoints); } + public void abortInitialization(String initiator) + { + Election.instance.abortInitialization(initiator); + } + @Override public void resumeReconfigureCms() { diff --git a/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java b/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java index 3b6e4b526b..1e2d9e1473 100644 --- a/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java +++ b/src/java/org/apache/cassandra/tcm/CMSOperationsMBean.java @@ -25,6 +25,7 @@ import java.util.Map; public interface CMSOperationsMBean { public void initializeCMS(List<String> ignore); + public void abortInitialization(String initiator); public void resumeReconfigureCms(); public void reconfigureCMS(int rf); public void reconfigureCMS(Map<String, Integer> rf); diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java b/src/java/org/apache/cassandra/tcm/migration/Election.java index 04e1a8fa4c..69fac6f4ba 100644 --- a/src/java/org/apache/cassandra/tcm/migration/Election.java +++ b/src/java/org/apache/cassandra/tcm/migration/Election.java @@ -56,6 +56,7 @@ import org.apache.cassandra.utils.Pair; public class Election { private static final Logger logger = LoggerFactory.getLogger(Election.class); + private static final CMSInitializationRequest.Initiator MIGRATING = new CMSInitializationRequest.Initiator(null, null); private static final CMSInitializationRequest.Initiator MIGRATED = new CMSInitializationRequest.Initiator(null, null); private final AtomicReference<CMSInitializationRequest.Initiator> initiator = new AtomicReference<>(); @@ -127,14 +128,21 @@ public class Election private void finish(Set<InetAddressAndPort> sendTo) { CMSInitializationRequest.Initiator currentInitiator = initiator.get(); - assert currentInitiator.initiator.equals(FBUtilities.getBroadcastAddressAndPort()); - - Startup.initializeAsFirstCMSNode(); - Register.maybeRegister(); - SystemKeyspace.setLocalHostId(ClusterMetadata.current().myNodeId().toUUID()); + if (currentInitiator != null && + Objects.equals(currentInitiator.initiator, FBUtilities.getBroadcastAddressAndPort()) && + initiator.compareAndSet(currentInitiator, MIGRATING)) + { + Startup.initializeAsFirstCMSNode(); + Register.maybeRegister(); + SystemKeyspace.setLocalHostId(ClusterMetadata.current().myNodeId().toUUID()); - updateInitiator(currentInitiator, MIGRATED); - MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, false)); + updateInitiator(MIGRATING, MIGRATED); + MessageDelivery.fanoutAndWait(messaging, sendTo, Verb.TCM_NOTIFY_REQ, DistributedMetadataLogKeyspace.getLogState(Epoch.EMPTY, false)); + } + else + { + throw new IllegalStateException("Can't finish migration, initiator="+currentInitiator); + } } private void abort(Set<InetAddressAndPort> sendTo) @@ -166,6 +174,25 @@ public class Election return initiator != null && initiator != MIGRATED; } + public void abortInitialization(String initiatorEp) + { + InetAddressAndPort expectedInitiator = InetAddressAndPort.getByNameUnchecked(initiatorEp); + CMSInitializationRequest.Initiator currentInitiator = initiator.get(); + if (currentInitiator != null && Objects.equals(currentInitiator.initiator, expectedInitiator) && initiator.compareAndSet(currentInitiator, null)) + { + for (InetAddressAndPort ep : ClusterMetadata.current().directory.allJoinedEndpoints()) + { + if (!ep.equals(FBUtilities.getBroadcastAddressAndPort())) + messaging.send(Message.out(Verb.TCM_ABORT_MIG, currentInitiator), ep); + } + } + else + { + throw new IllegalStateException("Current initiator [" + currentInitiator +"] does not match provided " + expectedInitiator + + " - run this command on a node where initialization has not yet been cleared, with the correct expected initiator"); + } + } + public class PrepareHandler implements IVerbHandler<CMSInitializationRequest> { @Override @@ -209,7 +236,8 @@ public class Election public void doVerb(Message<CMSInitializationRequest.Initiator> message) throws IOException { logger.info("Received election abort message {} from {}", message.payload, message.from()); - if (!message.from().equals(initiator().initiator) || !updateInitiator(message.payload, null)) + CMSInitializationRequest.Initiator initiator = message.payload; + if (!initiator.initiator.equals(initiator().initiator) || !updateInitiator(message.payload, null)) logger.error("Could not clear initiator - initiator is set to {}, abort message received from {}", initiator(), message.payload); } } diff --git a/src/java/org/apache/cassandra/tools/NodeTool.java b/src/java/org/apache/cassandra/tools/NodeTool.java index 49be441b89..d7bcc25b71 100644 --- a/src/java/org/apache/cassandra/tools/NodeTool.java +++ b/src/java/org/apache/cassandra/tools/NodeTool.java @@ -268,7 +268,8 @@ public class NodeTool .withCommand(CMSAdmin.InitializeCMS.class) .withCommand(CMSAdmin.ReconfigureCMS.class) .withCommand(CMSAdmin.Snapshot.class) - .withCommand(CMSAdmin.Unregister.class); + .withCommand(CMSAdmin.Unregister.class) + .withCommand(CMSAdmin.AbortInitialization.class); Cli<NodeToolCmdRunnable> parser = builder.build(); diff --git a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java index 0ed853ba68..02cc045545 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java +++ b/src/java/org/apache/cassandra/tools/nodetool/CMSAdmin.java @@ -194,4 +194,17 @@ public abstract class CMSAdmin extends NodeTool.NodeToolCmd probe.getCMSOperationsProxy().unregisterLeftNodes(nodeIds); } } + + @Command(name = "abortinitialization", description = "Abort an incomplete initialization") + public static class AbortInitialization extends NodeTool.NodeToolCmd + { + @Option(required = true, name = "--initiator", title = "Initiator", description = "The address of the node where `cms initialize` was run.") + public String initiator; + + @Override + protected void execute(NodeProbe probe) + { + probe.getCMSOperationsProxy().abortInitialization(initiator); + } + } } diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAbortMigrationTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAbortMigrationTest.java new file mode 100644 index 0000000000..bab722346d --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeAbortMigrationTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.upgrade; + +import org.junit.Test; + +import org.apache.cassandra.distributed.Constants; +import org.apache.cassandra.distributed.api.Feature; +import org.apache.cassandra.net.Verb; + +public class ClusterMetadataUpgradeAbortMigrationTest extends UpgradeTestBase +{ + @Test + public void upgradeLostNotifyTest() throws Throwable + { + new UpgradeTestBase.TestCase() + .nodes(3) + .nodesToUpgrade(1, 2, 3) + .withConfig((cfg) -> cfg.with(Feature.NETWORK, Feature.GOSSIP) + .set(Constants.KEY_DTEST_FULL_STARTUP, true)) + .upgradesToCurrentFrom(v50) + .setup((cluster) -> { + cluster.schemaChange(withKeyspace("ALTER KEYSPACE %s WITH replication = {'class': 'SimpleStrategy', 'replication_factor':2}")); + cluster.schemaChange("CREATE TABLE " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))"); + }) + .runAfterClusterUpgrade((cluster) -> { + cluster.filters().verbs(Verb.TCM_INIT_MIG_RSP.id, + Verb.TCM_ABORT_MIG.id).drop(); + cluster.get(1).nodetoolResult("cms", "initialize").asserts().failure(); + cluster.filters().reset(); + // it was already cleared on node1: + cluster.get(1).nodetoolResult("cms", "abortinitialization", "--initiator", "127.0.0.1").asserts().failure(); + // but not on node2 + cluster.get(2).nodetoolResult("cms", "abortinitialization", "--initiator", "127.0.0.1").asserts().success(); + + cluster.get(1).nodetoolResult("cms", "initialize").asserts().success(); + }).run(); + } +} diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java index 930933425c..eb717602f3 100644 --- a/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java +++ b/test/distributed/org/apache/cassandra/distributed/upgrade/ClusterMetadataUpgradeTest.java @@ -113,6 +113,5 @@ public class ClusterMetadataUpgradeTest extends UpgradeTestBase cluster.get(1).nodetoolResult("cms", "initialize", "--ignore", "127.0.0.3").asserts().success(); cluster.schemaChange(withKeyspace("create table %s.tbl2 (id int primary key)")); }).run(); - } - + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org