This is an automated email from the ASF dual-hosted git repository.
marcuse pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 95aca49915 Avoid NPE during cms initialization abort
95aca49915 is described below
commit 95aca49915fc0dab09129bcc662449cef75dceab
Author: Marcus Eriksson <[email protected]>
AuthorDate: Thu Apr 3 15:01:55 2025 +0200
Avoid NPE during cms initialization abort
Patch by marcuse; reviewed by David Capwell and Caleb Rackliffe for
CASSANDRA-20527
---
CHANGES.txt | 1 +
src/java/org/apache/cassandra/tcm/Startup.java | 2 +-
.../cassandra/tcm/migration/CMSInitializationRequest.java | 14 +++++++-------
src/java/org/apache/cassandra/tcm/migration/Election.java | 12 +++++++-----
4 files changed, 16 insertions(+), 13 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index eee55c65e6..cd2619e4cb 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.1
+ * Avoid NPE during cms initialization abort (CASSANDRA-20527)
* Avoid failing queries when epoch changes and replica goes up/down
(CASSANDRA-20489)
* Split out truncation record lock (CASSANDRA-20480)
* Throw new IndexBuildInProgressException when queries fail during index
build, instead of IndexNotAvailableException (CASSANDRA-20402)
diff --git a/src/java/org/apache/cassandra/tcm/Startup.java
b/src/java/org/apache/cassandra/tcm/Startup.java
index d17b687698..6d3fe6b2af 100644
--- a/src/java/org/apache/cassandra/tcm/Startup.java
+++ b/src/java/org/apache/cassandra/tcm/Startup.java
@@ -246,7 +246,7 @@ import static
org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
else
{
CMSInitializationRequest.Initiator initiator =
Election.instance.initiator();
- candidates = Discovery.instance.discoverOnce(initiator == null
? null : initiator.initiator);
+ candidates = Discovery.instance.discoverOnce(initiator == null
? null : initiator.endpoint);
}
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
}
diff --git
a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java
b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java
index dac50e5edb..599bfca0da 100644
--- a/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java
+++ b/src/java/org/apache/cassandra/tcm/migration/CMSInitializationRequest.java
@@ -109,12 +109,12 @@ public class CMSInitializationRequest
public static class Initiator
{
public static final Serializer serializer = new Serializer();
- public final InetAddressAndPort initiator;
+ public final InetAddressAndPort endpoint;
public final UUID initToken;
public Initiator(InetAddressAndPort initiator, UUID initToken)
{
- this.initiator = initiator;
+ this.endpoint = initiator;
this.initToken = initToken;
}
@@ -124,20 +124,20 @@ public class CMSInitializationRequest
if (this == o) return true;
if (!(o instanceof Initiator)) return false;
Initiator other = (Initiator) o;
- return Objects.equals(initiator, other.initiator) &&
Objects.equals(initToken, other.initToken);
+ return Objects.equals(endpoint, other.endpoint) &&
Objects.equals(initToken, other.initToken);
}
@Override
public int hashCode()
{
- return Objects.hash(initiator, initToken);
+ return Objects.hash(endpoint, initToken);
}
@Override
public String toString()
{
return "Initiator{" +
- "initiator=" + initiator +
+ "initiator=" + endpoint +
", initToken=" + initToken +
'}';
}
@@ -147,7 +147,7 @@ public class CMSInitializationRequest
@Override
public void serialize(Initiator t, DataOutputPlus out, int
version) throws IOException
{
-
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.initiator,
out, version);
+
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serialize(t.endpoint,
out, version);
UUIDSerializer.serializer.serialize(t.initToken, out, version);
}
@@ -161,7 +161,7 @@ public class CMSInitializationRequest
@Override
public long serializedSize(Initiator t, int version)
{
- return
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.initiator,
version) +
+ return
InetAddressAndPort.Serializer.inetAddressAndPortSerializer.serializedSize(t.endpoint,
version) +
UUIDSerializer.serializer.serializedSize(t.initToken,
version);
}
}
diff --git a/src/java/org/apache/cassandra/tcm/migration/Election.java
b/src/java/org/apache/cassandra/tcm/migration/Election.java
index 94f5dc4a06..507a55d31c 100644
--- a/src/java/org/apache/cassandra/tcm/migration/Election.java
+++ b/src/java/org/apache/cassandra/tcm/migration/Election.java
@@ -134,7 +134,7 @@ public class Election
{
CMSInitializationRequest.Initiator currentInitiator = initiator.get();
if (currentInitiator != null &&
- Objects.equals(currentInitiator.initiator,
FBUtilities.getBroadcastAddressAndPort()) &&
+ Objects.equals(currentInitiator.endpoint,
FBUtilities.getBroadcastAddressAndPort()) &&
initiator.compareAndSet(currentInitiator, MIGRATING))
{
Startup.initializeAsFirstCMSNode();
@@ -183,7 +183,7 @@ public class Election
{
InetAddressAndPort expectedInitiator =
InetAddressAndPort.getByNameUnchecked(initiatorEp);
CMSInitializationRequest.Initiator currentInitiator = initiator.get();
- if (currentInitiator != null &&
Objects.equals(currentInitiator.initiator, expectedInitiator) &&
initiator.compareAndSet(currentInitiator, null))
+ if (currentInitiator != null &&
Objects.equals(currentInitiator.endpoint, expectedInitiator) &&
initiator.compareAndSet(currentInitiator, null))
{
ClusterMetadata metadata = ClusterMetadata.current();
for (Map.Entry<NodeId, NodeState> entry :
metadata.directory.states.entrySet())
@@ -243,9 +243,11 @@ public class Election
public void doVerb(Message<CMSInitializationRequest.Initiator>
message) throws IOException
{
logger.info("Received election abort message {} from {}",
message.payload, message.from());
- CMSInitializationRequest.Initiator initiator = message.payload;
- if (!initiator.initiator.equals(initiator().initiator) ||
!updateInitiator(message.payload, null))
- logger.error("Could not clear initiator - initiator is set to
{}, abort message received from {}", initiator(), message.payload);
+ CMSInitializationRequest.Initiator remoteInitiator =
message.payload;
+ if (initiator() == null)
+ logger.info("Initiator already cleared, ignoring abort message
from {}: {}", message.from(), remoteInitiator);
+ else if (!remoteInitiator.endpoint.equals(initiator().endpoint) ||
!updateInitiator(remoteInitiator, null))
+ logger.error("Could not clear initiator - initiator is set to
{}, abort message received from {}: {}", initiator(), message.from(),
remoteInitiator);
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]