This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push: new 95aca49915 Avoid NPE during cms initialization abort 95aca49915 is described below commit 95aca49915fc0dab09129bcc662449cef75dceab Author: Marcus Eriksson <marc...@apache.org> AuthorDate: Thu Apr 3 15:01:55 2025 +0200 Avoid NPE during cms initialization abort Patch by marcuse; reviewed by David Capwell and Caleb Rackliffe for CASSANDRA-20527 --- CHANGES.txt | 1 + src/java/org/apache/cassandra/tcm/Startup.java | 2 +- .../cassandra/tcm/migration/CMSInitializationRequest.java | 14 +++++++------- src/java/org/apache/cassandra/tcm/migration/Election.java | 12 +++++++----- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index eee55c65e6..cd2619e4cb 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Avoid NPE during cms initialization abort (CASSANDRA-20527) * Avoid failing queries when epoch changes and replica goes up/down (CASSANDRA-20489) * Split out truncation record lock (CASSANDRA-20480) * Throw new IndexBuildInProgressException when queries fail during index build, instead of IndexNotAvailableException (CASSANDRA-20402) diff --git a/src/java/org/apache/cassandra/tcm/Startup.java b/src/java/org/apache/cassandra/tcm/Startup.java index d17b687698..6d3fe6b2af 100644 --- a/src/java/org/apache/cassandra/tcm/Startup.java +++ b/src/java/org/apache/cassandra/tcm/Startup.java @@ -246,7 +246,7 @@ import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; else { CMSInitializationRequest.Initiator initiator = Election.instance.initiator(); - candidates = Discovery.instance.discoverOnce(initiator == null ? null : initiator.initiator); + candidates = Discovery.instance.discoverOnce(initiator == null ? null : initiator.endpoint); } Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS); } diff --git a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java index dac50e5edb..599bfca0da 100644 --- a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java +++ b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java @@ -109,12 +109,12 @@ public class CMSInitializationRequest public static class Initiator { public static final Serializer serializer = new Serializer(); - public final InetAddressAndPort initiator; + public final InetAddressAndPort endpoint; public final UUID initToken; public Initiator(InetAddressAndPort initiator, UUID initToken) { - this.initiator = initiator; + this.endpoint = initiator; this.initToken = initToken; } @@ -124,20 +124,20 @@ public class CMSInitializationRequest if (this == o) return true; if (!(o instanceof Initiator)) return false; Initiator other = (Initiator) o; - return Objects.equals(initiator, other.initiator) && Objects.equals(initToken, other.initToken); + return Objects.equals(endpoint, other.endpoint) && Objects.equals(initToken, other.initToken); } @Override public int hashCode() { - return Objects.hash(initiator, initToken); + return Objects.hash(endpoint, initToken); } @Override public String toString() { return "Initiator{" + - "initiator=" + initiator + + "initiator=" + endpoint + ", initToken=" + initToken + '}'; } @@ -147,7 +147,7 @@ public class CMSInitializationRequest @Override public void serialize(Initiator t, DataOutputPlus out, int version) throws IOException { - InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.initiator, out, version); + InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.endpoint, out, version); UUIDSerializer.serializer.serialize(t.initToken, out, version); } @@ -161,7 +161,7 @@ public class CMSInitializationRequest @Override public long serializedSize(Initiator t, int version) { - return InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.initiator, version) + + return InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.endpoint, version) + UUIDSerializer.serializer.serializedSize(t.initToken, version); } } diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java b/src/java/org/apache/cassandra/tcm/migration/Election.java index 94f5dc4a06..507a55d31c 100644 --- a/src/java/org/apache/cassandra/tcm/migration/Election.java +++ b/src/java/org/apache/cassandra/tcm/migration/Election.java @@ -134,7 +134,7 @@ public class Election { CMSInitializationRequest.Initiator currentInitiator = initiator.get(); if (currentInitiator != null && - Objects.equals(currentInitiator.initiator, FBUtilities.getBroadcastAddressAndPort()) && + Objects.equals(currentInitiator.endpoint, FBUtilities.getBroadcastAddressAndPort()) && initiator.compareAndSet(currentInitiator, MIGRATING)) { Startup.initializeAsFirstCMSNode(); @@ -183,7 +183,7 @@ public class Election { InetAddressAndPort expectedInitiator = InetAddressAndPort.getByNameUnchecked(initiatorEp); CMSInitializationRequest.Initiator currentInitiator = initiator.get(); - if (currentInitiator != null && Objects.equals(currentInitiator.initiator, expectedInitiator) && initiator.compareAndSet(currentInitiator, null)) + if (currentInitiator != null && Objects.equals(currentInitiator.endpoint, expectedInitiator) && initiator.compareAndSet(currentInitiator, null)) { ClusterMetadata metadata = ClusterMetadata.current(); for (Map.Entry<NodeId, NodeState> entry : metadata.directory.states.entrySet()) @@ -243,9 +243,11 @@ public class Election public void doVerb(Message<CMSInitializationRequest.Initiator> message) throws IOException { logger.info("Received election abort message {} from {}", message.payload, message.from()); - CMSInitializationRequest.Initiator initiator = message.payload; - if (!initiator.initiator.equals(initiator().initiator) || !updateInitiator(message.payload, null)) - logger.error("Could not clear initiator - initiator is set to {}, abort message received from {}", initiator(), message.payload); + CMSInitializationRequest.Initiator remoteInitiator = message.payload; + if (initiator() == null) + logger.info("Initiator already cleared, ignoring abort message from {}: {}", message.from(), remoteInitiator); + else if (!remoteInitiator.endpoint.equals(initiator().endpoint) || !updateInitiator(remoteInitiator, null)) + logger.error("Could not clear initiator - initiator is set to {}, abort message received from {}: {}", initiator(), message.from(), remoteInitiator); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org