This is an automated email from the ASF dual-hosted git repository.
dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4e17922b61 Gossip stateMapOrdering does not have correct ordering when
both EndpointState are in the bootstrapping set
4e17922b61 is described below
commit 4e17922b61d6e90151c5e165ddbf0731232e566a
Author: David Capwell <[email protected]>
AuthorDate: Wed Mar 1 11:45:55 2023 -0800
Gossip stateMapOrdering does not have correct ordering when both
EndpointState are in the bootstrapping set
patch by David Capwell; reviewed by Marcus Eriksson for CASSANDRA-18292
---
CHANGES.txt | 1 +
.../org/apache/cassandra/gms/EndpointState.java | 12 +-
src/java/org/apache/cassandra/gms/Gossiper.java | 15 +-
.../org/apache/cassandra/gms/VersionedValue.java | 6 +
.../distributed/test/PaxosRepair2Test.java | 5 +-
.../org/apache/cassandra/gms/GossiperTest.java | 91 +++++++++++
.../cassandra/utils/CassandraGenerators.java | 172 +++++++++++++++++++++
7 files changed, 294 insertions(+), 8 deletions(-)
diff --git a/CHANGES.txt b/CHANGES.txt
index c1d8d00bca..7e3930c375 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
4.2
+ * Gossip stateMapOrdering does not have correct ordering when both
EndpointState are in the bootstrapping set (CASSANDRA-18292)
* Snapshot only sstables containing mismatching ranges on preview repair
mismatch (CASSANDRA-17561)
* More accurate skipping of sstables in read path (CASSANDRA-18134)
* Prepare for JDK17 experimental support (CASSANDRA-18179, CASSANDRA-18258)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 69684e4b67..0886bde156 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -184,17 +184,25 @@ public class EndpointState
updateTimestamp = nanoTime();
}
+ @VisibleForTesting
+ public void unsafeSetUpdateTimestamp(long value)
+ {
+ updateTimestamp = value;
+ }
+
public boolean isAlive()
{
return isAlive;
}
- void markAlive()
+ @VisibleForTesting
+ public void markAlive()
{
isAlive = true;
}
- void markDead()
+ @VisibleForTesting
+ public void markDead()
{
isAlive = false;
}
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java
b/src/java/org/apache/cassandra/gms/Gossiper.java
index a19954adec..0f13f6ae48 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1489,7 +1489,7 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
return isAdministrativelyInactiveState(epState);
}
- private static String getGossipStatus(EndpointState epState)
+ public static String getGossipStatus(EndpointState epState)
{
if (epState == null)
{
@@ -1536,7 +1536,8 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
* In that case above, the {@link Map#entrySet()} ordering can be random,
causing h4 to apply before h2, which will
* be rejected by subscripers (only after updating gossip causing zero
retries).
*/
- private static Comparator<Entry<InetAddressAndPort, EndpointState>>
stateOrderMap()
+ @VisibleForTesting
+ static Comparator<Entry<InetAddressAndPort, EndpointState>> stateOrderMap()
{
// There apears to be some edge cases where the state we are ordering
get added to the global state causing
// ordering to change... to avoid that rely on a cache
@@ -1553,10 +1554,16 @@ public class Gossiper implements
IFailureDetectionEventListener, GossiperMBean
}
Cache cache = new Cache();
return ((Comparator<Entry<InetAddressAndPort, EndpointState>>) (e1,
e2) -> {
+ String e1status = getGossipStatus(cache.get(e1));
+ String e2status = getGossipStatus(cache.get(e2));
+
+ if (Objects.equals(e1status, e2status) ||
(BOOTSTRAPPING_STATUS.contains(e1status) &&
BOOTSTRAPPING_STATUS.contains(e2status)))
+ return 0;
+
// check status first, make sure bootstrap status happens-after
all others
- if (BOOTSTRAPPING_STATUS.contains(getGossipStatus(cache.get(e1))))
+ if (BOOTSTRAPPING_STATUS.contains(e1status))
return 1;
- if (BOOTSTRAPPING_STATUS.contains(getGossipStatus(cache.get(e2))))
+ if (BOOTSTRAPPING_STATUS.contains(e2status))
return -1;
return 0;
})
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 519fffa9b8..d76f3011f4 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -102,6 +102,12 @@ public class VersionedValue implements
Comparable<VersionedValue>
this(value, VersionGenerator.getNextVersion());
}
+ @VisibleForTesting
+ public VersionedValue withVersion(int version)
+ {
+ return new VersionedValue(value, version);
+ }
+
public static VersionedValue unsafeMakeVersionedValue(String value, int
version)
{
return new VersionedValue(value, version);
diff --git
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
index 8371bd4c4e..a66bf1bd32 100644
---
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
+++
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
@@ -66,6 +66,7 @@ import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
import org.apache.cassandra.exceptions.CasWriteTimeoutException;
import org.apache.cassandra.gms.FailureDetector;
import org.apache.cassandra.gms.Gossiper;
@@ -255,7 +256,7 @@ public class PaxosRepair2Test extends TestBaseImpl
)
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + "
(k int primary key, v int)");
- cluster.get(3).shutdown();
+ ClusterUtils.stopUnchecked(cluster.get(3));
InetAddressAndPort node3 =
InetAddressAndPort.getByAddress(cluster.get(3).broadcastAddress());
for (int i = 0; i < 10; i++)
@@ -356,7 +357,7 @@ public class PaxosRepair2Test extends TestBaseImpl
)
{
cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + "
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
- cluster.get(3).shutdown();
+ ClusterUtils.stopUnchecked(cluster.get(3));
cluster.verbs(Verb.PAXOS_COMMIT_REQ).drop();
try
{
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java
b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index 68841ea30a..8921b13c07 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -22,12 +22,15 @@ import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.List;
+import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.net.InetAddresses;
import org.junit.After;
import org.junit.AfterClass;
@@ -46,14 +49,19 @@ import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.locator.SeedProvider;
import org.apache.cassandra.locator.TokenMetadata;
import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CassandraGenerators;
import org.apache.cassandra.utils.CassandraVersion;
import org.apache.cassandra.utils.FBUtilities;
+import org.assertj.core.api.Assertions;
+import org.quicktheories.core.Gen;
+import org.quicktheories.impl.Constraint;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.quicktheories.QuickTheory.qt;
public class GossiperTest
{
@@ -419,6 +427,89 @@ public class GossiperTest
}
}
+ @Test
+ public void orderingComparator()
+ {
+ qt().forAll(epStateMapGen()).checkAssert(map -> {
+ Comparator<Map.Entry<InetAddressAndPort, EndpointState>> comp =
Gossiper.stateOrderMap();
+ List<Map.Entry<InetAddressAndPort, EndpointState>> elements = new
ArrayList<>(map.entrySet());
+ for (int i = 0; i < elements.size(); i++)
+ {
+ for (int j = 0; j < elements.size(); j++)
+ {
+ Map.Entry<InetAddressAndPort, EndpointState> e1 =
elements.get(i);
+ boolean e1Bootstrapping =
VersionedValue.BOOTSTRAPPING_STATUS.contains(Gossiper.getGossipStatus(e1.getValue()));
+ Map.Entry<InetAddressAndPort, EndpointState> e2 =
elements.get(j);
+ boolean e2Bootstrapping =
VersionedValue.BOOTSTRAPPING_STATUS.contains(Gossiper.getGossipStatus(e2.getValue()));
+ Ordering ordering = Ordering.compare(comp, e1, e2);
+
+ if (e1Bootstrapping == e2Bootstrapping)
+ {
+ // check generation
+ Ordering sub =
Ordering.compare(e1.getValue().getHeartBeatState().getGeneration(),
e2.getValue().getHeartBeatState().getGeneration());
+ if (sub == Ordering.EQ)
+ {
+ // check addressWPort
+ sub = Ordering.compare(e1.getKey(), e2.getKey());
+ }
+ Assertions.assertThat(ordering)
+ .describedAs("Both elements bootstrap check
were equal: %s == %s", e1Bootstrapping, e2Bootstrapping)
+ .isEqualTo(sub);
+ }
+ else if (e1Bootstrapping)
+ {
+ Assertions.assertThat(ordering).isEqualTo(Ordering.GT);
+ }
+ else
+ {
+ Assertions.assertThat(ordering).isEqualTo(Ordering.LT);
+ }
+ }
+ }
+ });
+ }
+
+ enum Ordering
+ {
+ LT, EQ, GT;
+
+ static <T> Ordering compare(Comparator<T> comparator, T a, T b)
+ {
+ int rc = comparator.compare(a, b);
+ if (rc < 0) return LT;
+ if (rc == 0) return EQ;
+ return GT;
+ }
+
+ static <T extends Comparable<T>> Ordering compare(T a, T b)
+ {
+ return compare(Comparator.naturalOrder(), a, b);
+ }
+ }
+
+ private static Gen<Map<InetAddressAndPort, EndpointState>> epStateMapGen()
+ {
+ Gen<InetAddressAndPort> addressAndPorts =
CassandraGenerators.INET_ADDRESS_AND_PORT_GEN;
+ Gen<EndpointState> states = CassandraGenerators.endpointStates();
+ Constraint sizeGen = Constraint.between(2, 10);
+ Gen<Map<InetAddressAndPort, EndpointState>> mapGen = rs -> {
+ int size = Math.toIntExact(rs.next(sizeGen));
+ Map<InetAddressAndPort, EndpointState> map =
Maps.newHashMapWithExpectedSize(size);
+ for (int i = 0; i < size; i++)
+ {
+ while (true)
+ {
+ InetAddressAndPort address = addressAndPorts.generate(rs);
+ if (map.containsKey(address)) continue;
+ map.put(address, states.generate(rs));
+ break;
+ }
+ }
+ return map;
+ };
+ return mapGen;
+ }
+
static class SimpleStateChangeListener implements
IEndpointStateChangeSubscriber
{
static class OnChangeParams
diff --git a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
index ad9cb6b6aa..cf0c0ed286 100644
--- a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
@@ -18,20 +18,26 @@
package org.apache.cassandra.utils;
import java.lang.reflect.Modifier;
+import java.math.BigInteger;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Stream;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.builder.MultilineRecursiveToStringStyle;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
+import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.FieldIdentifier;
import org.apache.cassandra.db.ReadCommand;
@@ -50,6 +56,10 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
import org.apache.cassandra.dht.OrderPreservingPartitioner;
import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.HeartBeatState;
+import org.apache.cassandra.gms.VersionedValue;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.ConnectionType;
import org.apache.cassandra.net.Message;
@@ -316,4 +326,166 @@ public final class CassandraGenerators
}
}, true);
}
+
+ public static Gen<Token> murmurToken()
+ {
+ Constraint token = Constraint.between(Long.MIN_VALUE, Long.MAX_VALUE);
+ return rs -> new Murmur3Partitioner.LongToken(rs.next(token));
+ }
+
+ public static Gen<Token> byteOrderToken()
+ {
+ Constraint size = Constraint.between(0, 10);
+ Constraint byteRange = Constraint.between(Byte.MIN_VALUE,
Byte.MAX_VALUE);
+ return rs -> {
+ byte[] token = new byte[Math.toIntExact(rs.next(size))];
+ for (int i = 0; i < token.length; i++)
+ token[i] = (byte) rs.next(byteRange);
+ return new ByteOrderedPartitioner.BytesToken(token);
+ };
+ }
+
+ public static Gen<Token> randomPartitionerToken()
+ {
+ Constraint domain = Constraint.none();
+ return rs -> new
RandomPartitioner.BigIntegerToken(BigInteger.valueOf(rs.next(domain)));
+ }
+
+ public static Gen<Token> localPartitionerToken(LocalPartitioner
partitioner)
+ {
+ Gen<ByteBuffer> bytes =
AbstractTypeGenerators.getTypeSupport(partitioner.getTokenValidator()).bytesGen();
+ return rs -> partitioner.getToken(bytes.generate(rs));
+ }
+
+ public static Gen<Token> orderPreservingToken()
+ {
+ Gen<String> string = Generators.utf8(0, 10);
+ return rs -> new
OrderPreservingPartitioner.StringToken(string.generate(rs));
+ }
+
+ public static Gen<Token> token(IPartitioner partitioner)
+ {
+ if (partitioner instanceof Murmur3Partitioner) return murmurToken();
+ if (partitioner instanceof ByteOrderedPartitioner) return
byteOrderToken();
+ if (partitioner instanceof RandomPartitioner) return
randomPartitionerToken();
+ if (partitioner instanceof LocalPartitioner) return
localPartitionerToken((LocalPartitioner) partitioner);
+ if (partitioner instanceof OrderPreservingPartitioner) return
orderPreservingToken();
+ throw new UnsupportedOperationException("Unsupported partitioner: " +
partitioner.getClass());
+ }
+
+ public static Gen<? extends Collection<Token>> tokens(IPartitioner
partitioner)
+ {
+ Gen<Token> tokenGen = token(partitioner);
+ return SourceDSL.lists().of(tokenGen).ofSizeBetween(1, 16);
+ }
+
+ public static Gen<HeartBeatState> heartBeatStates()
+ {
+ Constraint generationDomain = Constraint.between(0, Integer.MAX_VALUE);
+ Constraint versionDomain = Constraint.between(-1, Integer.MAX_VALUE);
+ return rs -> new
HeartBeatState(Math.toIntExact(rs.next(generationDomain)),
Math.toIntExact(rs.next(versionDomain)));
+ }
+
+ private static Gen<Map<ApplicationState, VersionedValue>>
gossipApplicationStates()
+ {
+ //TODO support all application states...
+ // atm only used by a single test, which only looks at status
+ Gen<Boolean> statusWithPort = SourceDSL.booleans().all();
+ Gen<VersionedValue> statusGen = gossipStatusValue();
+
+ return rs -> {
+ ApplicationState statusState = statusWithPort.generate(rs) ?
ApplicationState.STATUS_WITH_PORT : ApplicationState.STATUS;
+ VersionedValue vv = statusGen.generate(rs);
+ if (vv == null) return ImmutableMap.of();
+ return ImmutableMap.of(statusState, vv);
+ };
+ }
+
+ private static Gen<String> gossipStatus()
+ {
+ return SourceDSL.arbitrary()
+ .pick(VersionedValue.STATUS_NORMAL,
+ VersionedValue.STATUS_BOOTSTRAPPING_REPLACE,
+ VersionedValue.STATUS_BOOTSTRAPPING,
+ VersionedValue.STATUS_MOVING,
+ VersionedValue.STATUS_LEAVING,
+ VersionedValue.STATUS_LEFT,
+
+ //TODO would be good to prefix with STATUS_ like
others
+ VersionedValue.REMOVING_TOKEN,
+ VersionedValue.REMOVED_TOKEN,
+ VersionedValue.HIBERNATE +
VersionedValue.DELIMITER + true,
+ VersionedValue.HIBERNATE +
VersionedValue.DELIMITER + false,
+ VersionedValue.SHUTDOWN +
VersionedValue.DELIMITER + true,
+ VersionedValue.SHUTDOWN +
VersionedValue.DELIMITER + false,
+ ""
+ );
+ }
+
+ private static Gen<VersionedValue> gossipStatusValue()
+ {
+ IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+ Gen<String> statusGen = gossipStatus();
+ Gen<Token> tokenGen = token(partitioner);
+ Gen<? extends Collection<Token>> tokensGen = tokens(partitioner);
+ Gen<InetAddress> addressGen = Generators.INET_ADDRESS_GEN;
+ Gen<InetAddressAndPort> addressAndGenGen = INET_ADDRESS_AND_PORT_GEN;
+ Gen<Boolean> bool = SourceDSL.booleans().all();
+ Constraint millis = Constraint.between(0, Long.MAX_VALUE);
+ Constraint version = Constraint.between(0, Integer.MAX_VALUE);
+ Gen<UUID> hostId = Generators.UUID_RANDOM_GEN;
+ VersionedValue.VersionedValueFactory factory = new
VersionedValue.VersionedValueFactory(partitioner);
+ return rs -> {
+ String status = statusGen.generate(rs);
+ switch (status)
+ {
+ case "":
+ return null;
+ case VersionedValue.STATUS_NORMAL:
+ return
factory.normal(tokensGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.STATUS_BOOTSTRAPPING:
+ return
factory.bootstrapping(tokensGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.STATUS_BOOTSTRAPPING_REPLACE:
+ if (bool.generate(rs)) return
factory.bootReplacingWithPort(addressAndGenGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+ else return
factory.bootReplacing(addressGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.STATUS_MOVING:
+ return
factory.moving(tokenGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.STATUS_LEAVING:
+ return
factory.leaving(tokensGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.STATUS_LEFT:
+ return factory.left(tokensGen.generate(rs),
rs.next(millis)).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.REMOVING_TOKEN:
+ return
factory.removingNonlocal(hostId.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.REMOVED_TOKEN:
+ return factory.removedNonlocal(hostId.generate(rs),
rs.next(millis)).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.HIBERNATE + VersionedValue.DELIMITER +
true:
+ return
factory.hibernate(true).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.HIBERNATE + VersionedValue.DELIMITER +
false:
+ return
factory.hibernate(false).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.SHUTDOWN + VersionedValue.DELIMITER + true:
+ return
factory.shutdown(true).withVersion(Math.toIntExact(rs.next(version)));
+ case VersionedValue.SHUTDOWN + VersionedValue.DELIMITER +
false:
+ return
factory.shutdown(false).withVersion(Math.toIntExact(rs.next(version)));
+ default:
+ throw new AssertionError("Unexpected status: " + status);
+ }
+ };
+ }
+
+ public static Gen<EndpointState> endpointStates()
+ {
+ Gen<HeartBeatState> hbGen = heartBeatStates();
+ Gen<Map<ApplicationState, VersionedValue>> appStates =
gossipApplicationStates();
+ Gen<Boolean> alive = SourceDSL.booleans().all();
+ Constraint updateTimestamp = Constraint.between(0, Long.MAX_VALUE);
+ return rs -> {
+ EndpointState state = new EndpointState(hbGen.generate(rs));
+ Map<ApplicationState, VersionedValue> map = appStates.generate(rs);
+ if (!map.isEmpty()) state.addApplicationStates(map);
+ if (alive.generate(rs)) state.markAlive();
+ else state.markDead();
+ state.unsafeSetUpdateTimestamp(rs.next(updateTimestamp));
+ return state;
+ };
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]