This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4e17922b61 Gossip stateMapOrdering does not have correct ordering when 
both EndpointState are in the bootstrapping set
4e17922b61 is described below

commit 4e17922b61d6e90151c5e165ddbf0731232e566a
Author: David Capwell <[email protected]>
AuthorDate: Wed Mar 1 11:45:55 2023 -0800

    Gossip stateMapOrdering does not have correct ordering when both 
EndpointState are in the bootstrapping set
    
    patch by David Capwell; reviewed by Marcus Eriksson for CASSANDRA-18292
---
 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/gms/EndpointState.java    |  12 +-
 src/java/org/apache/cassandra/gms/Gossiper.java    |  15 +-
 .../org/apache/cassandra/gms/VersionedValue.java   |   6 +
 .../distributed/test/PaxosRepair2Test.java         |   5 +-
 .../org/apache/cassandra/gms/GossiperTest.java     |  91 +++++++++++
 .../cassandra/utils/CassandraGenerators.java       | 172 +++++++++++++++++++++
 7 files changed, 294 insertions(+), 8 deletions(-)

diff --git a/CHANGES.txt b/CHANGES.txt
index c1d8d00bca..7e3930c375 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
 4.2
+ * Gossip stateMapOrdering does not have correct ordering when both 
EndpointState are in the bootstrapping set (CASSANDRA-18292)
  * Snapshot only sstables containing mismatching ranges on preview repair 
mismatch (CASSANDRA-17561)
  * More accurate skipping of sstables in read path (CASSANDRA-18134)
  * Prepare for JDK17 experimental support (CASSANDRA-18179, CASSANDRA-18258)
diff --git a/src/java/org/apache/cassandra/gms/EndpointState.java 
b/src/java/org/apache/cassandra/gms/EndpointState.java
index 69684e4b67..0886bde156 100644
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@ -184,17 +184,25 @@ public class EndpointState
         updateTimestamp = nanoTime();
     }
 
+    @VisibleForTesting
+    public void unsafeSetUpdateTimestamp(long value)
+    {
+        updateTimestamp = value;
+    }
+
     public boolean isAlive()
     {
         return isAlive;
     }
 
-    void markAlive()
+    @VisibleForTesting
+    public void markAlive()
     {
         isAlive = true;
     }
 
-    void markDead()
+    @VisibleForTesting
+    public void markDead()
     {
         isAlive = false;
     }
diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java 
b/src/java/org/apache/cassandra/gms/Gossiper.java
index a19954adec..0f13f6ae48 100644
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@ -1489,7 +1489,7 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         return isAdministrativelyInactiveState(epState);
     }
 
-    private static String getGossipStatus(EndpointState epState)
+    public static String getGossipStatus(EndpointState epState)
     {
         if (epState == null)
         {
@@ -1536,7 +1536,8 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
      * In that case above, the {@link Map#entrySet()} ordering can be random, 
causing h4 to apply before h2, which will
      * be rejected by subscripers (only after updating gossip causing zero 
retries).
      */
-    private static Comparator<Entry<InetAddressAndPort, EndpointState>> 
stateOrderMap()
+    @VisibleForTesting
+    static Comparator<Entry<InetAddressAndPort, EndpointState>> stateOrderMap()
     {
         // There apears to be some edge cases where the state we are ordering 
get added to the global state causing
         // ordering to change... to avoid that rely on a cache
@@ -1553,10 +1554,16 @@ public class Gossiper implements 
IFailureDetectionEventListener, GossiperMBean
         }
         Cache cache = new Cache();
         return ((Comparator<Entry<InetAddressAndPort, EndpointState>>) (e1, 
e2) -> {
+            String e1status = getGossipStatus(cache.get(e1));
+            String e2status = getGossipStatus(cache.get(e2));
+
+            if (Objects.equals(e1status, e2status) || 
(BOOTSTRAPPING_STATUS.contains(e1status) && 
BOOTSTRAPPING_STATUS.contains(e2status)))
+                return 0;
+
             // check status first, make sure bootstrap status happens-after 
all others
-            if (BOOTSTRAPPING_STATUS.contains(getGossipStatus(cache.get(e1))))
+            if (BOOTSTRAPPING_STATUS.contains(e1status))
                 return 1;
-            if (BOOTSTRAPPING_STATUS.contains(getGossipStatus(cache.get(e2))))
+            if (BOOTSTRAPPING_STATUS.contains(e2status))
                 return -1;
             return 0;
         })
diff --git a/src/java/org/apache/cassandra/gms/VersionedValue.java 
b/src/java/org/apache/cassandra/gms/VersionedValue.java
index 519fffa9b8..d76f3011f4 100644
--- a/src/java/org/apache/cassandra/gms/VersionedValue.java
+++ b/src/java/org/apache/cassandra/gms/VersionedValue.java
@@ -102,6 +102,12 @@ public class VersionedValue implements 
Comparable<VersionedValue>
         this(value, VersionGenerator.getNextVersion());
     }
 
+    @VisibleForTesting
+    public VersionedValue withVersion(int version)
+    {
+        return new VersionedValue(value, version);
+    }
+
     public static VersionedValue unsafeMakeVersionedValue(String value, int 
version)
     {
         return new VersionedValue(value, version);
diff --git 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
index 8371bd4c4e..a66bf1bd32 100644
--- 
a/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/test/PaxosRepair2Test.java
@@ -66,6 +66,7 @@ import org.apache.cassandra.distributed.Cluster;
 import org.apache.cassandra.distributed.api.ConsistencyLevel;
 import org.apache.cassandra.distributed.api.Feature;
 import org.apache.cassandra.distributed.api.IInvokableInstance;
+import org.apache.cassandra.distributed.shared.ClusterUtils;
 import org.apache.cassandra.exceptions.CasWriteTimeoutException;
 import org.apache.cassandra.gms.FailureDetector;
 import org.apache.cassandra.gms.Gossiper;
@@ -255,7 +256,7 @@ public class PaxosRepair2Test extends TestBaseImpl
         )
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " 
(k int primary key, v int)");
-            cluster.get(3).shutdown();
+            ClusterUtils.stopUnchecked(cluster.get(3));
             InetAddressAndPort node3 = 
InetAddressAndPort.getByAddress(cluster.get(3).broadcastAddress());
 
             for (int i = 0; i < 10; i++)
@@ -356,7 +357,7 @@ public class PaxosRepair2Test extends TestBaseImpl
         )
         {
             cluster.schemaChange("CREATE TABLE " + KEYSPACE + '.' + TABLE + " 
(pk int, ck int, v int, PRIMARY KEY (pk, ck))");
-            cluster.get(3).shutdown();
+            ClusterUtils.stopUnchecked(cluster.get(3));
             cluster.verbs(Verb.PAXOS_COMMIT_REQ).drop();
             try
             {
diff --git a/test/unit/org/apache/cassandra/gms/GossiperTest.java 
b/test/unit/org/apache/cassandra/gms/GossiperTest.java
index 68841ea30a..8921b13c07 100644
--- a/test/unit/org/apache/cassandra/gms/GossiperTest.java
+++ b/test/unit/org/apache/cassandra/gms/GossiperTest.java
@@ -22,12 +22,15 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
+import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
 import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
 import com.google.common.net.InetAddresses;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -46,14 +49,19 @@ import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.locator.SeedProvider;
 import org.apache.cassandra.locator.TokenMetadata;
 import org.apache.cassandra.service.StorageService;
+import org.apache.cassandra.utils.CassandraGenerators;
 import org.apache.cassandra.utils.CassandraVersion;
 import org.apache.cassandra.utils.FBUtilities;
+import org.assertj.core.api.Assertions;
+import org.quicktheories.core.Gen;
+import org.quicktheories.impl.Constraint;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.quicktheories.QuickTheory.qt;
 
 public class GossiperTest
 {
@@ -419,6 +427,89 @@ public class GossiperTest
         }
     }
 
+    @Test
+    public void orderingComparator()
+    {
+        qt().forAll(epStateMapGen()).checkAssert(map -> {
+            Comparator<Map.Entry<InetAddressAndPort, EndpointState>> comp = 
Gossiper.stateOrderMap();
+            List<Map.Entry<InetAddressAndPort, EndpointState>> elements = new 
ArrayList<>(map.entrySet());
+            for (int i = 0; i < elements.size(); i++)
+            {
+                for (int j = 0; j < elements.size(); j++)
+                {
+                    Map.Entry<InetAddressAndPort, EndpointState> e1 = 
elements.get(i);
+                    boolean e1Bootstrapping = 
VersionedValue.BOOTSTRAPPING_STATUS.contains(Gossiper.getGossipStatus(e1.getValue()));
+                    Map.Entry<InetAddressAndPort, EndpointState> e2 = 
elements.get(j);
+                    boolean e2Bootstrapping = 
VersionedValue.BOOTSTRAPPING_STATUS.contains(Gossiper.getGossipStatus(e2.getValue()));
+                    Ordering ordering = Ordering.compare(comp, e1, e2);
+
+                    if (e1Bootstrapping == e2Bootstrapping)
+                    {
+                        // check generation
+                        Ordering sub = 
Ordering.compare(e1.getValue().getHeartBeatState().getGeneration(), 
e2.getValue().getHeartBeatState().getGeneration());
+                        if (sub == Ordering.EQ)
+                        {
+                            // check addressWPort
+                            sub = Ordering.compare(e1.getKey(), e2.getKey());
+                        }
+                        Assertions.assertThat(ordering)
+                                  .describedAs("Both elements bootstrap check 
were equal: %s == %s", e1Bootstrapping, e2Bootstrapping)
+                                  .isEqualTo(sub);
+                    }
+                    else if (e1Bootstrapping)
+                    {
+                        Assertions.assertThat(ordering).isEqualTo(Ordering.GT);
+                    }
+                    else
+                    {
+                        Assertions.assertThat(ordering).isEqualTo(Ordering.LT);
+                    }
+                }
+            }
+        });
+    }
+
+    enum Ordering
+    {
+        LT, EQ, GT;
+
+        static <T> Ordering compare(Comparator<T> comparator, T a, T b)
+        {
+            int rc = comparator.compare(a, b);
+            if (rc < 0) return LT;
+            if (rc == 0) return EQ;
+            return GT;
+        }
+
+        static <T extends Comparable<T>> Ordering compare(T a, T b)
+        {
+            return compare(Comparator.naturalOrder(), a, b);
+        }
+    }
+
+    private static Gen<Map<InetAddressAndPort, EndpointState>> epStateMapGen()
+    {
+        Gen<InetAddressAndPort> addressAndPorts = 
CassandraGenerators.INET_ADDRESS_AND_PORT_GEN;
+        Gen<EndpointState> states = CassandraGenerators.endpointStates();
+        Constraint sizeGen = Constraint.between(2, 10);
+        Gen<Map<InetAddressAndPort, EndpointState>> mapGen = rs -> {
+            int size = Math.toIntExact(rs.next(sizeGen));
+            Map<InetAddressAndPort, EndpointState> map = 
Maps.newHashMapWithExpectedSize(size);
+            for (int i = 0; i < size; i++)
+            {
+                while (true)
+                {
+                    InetAddressAndPort address = addressAndPorts.generate(rs);
+                    if (map.containsKey(address)) continue;
+                    map.put(address, states.generate(rs));
+                    break;
+                }
+            }
+            return map;
+        };
+        return mapGen;
+    }
+
     static class SimpleStateChangeListener implements 
IEndpointStateChangeSubscriber
     {
         static class OnChangeParams
diff --git a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java 
b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
index ad9cb6b6aa..cf0c0ed286 100644
--- a/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
+++ b/test/unit/org/apache/cassandra/utils/CassandraGenerators.java
@@ -18,20 +18,26 @@
 package org.apache.cassandra.utils;
 
 import java.lang.reflect.Modifier;
+import java.math.BigInteger;
 import java.net.InetAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Stream;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import org.apache.commons.lang3.builder.MultilineRecursiveToStringStyle;
 import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
 
+import org.apache.cassandra.config.DatabaseDescriptor;
 import org.apache.cassandra.cql3.ColumnIdentifier;
 import org.apache.cassandra.cql3.FieldIdentifier;
 import org.apache.cassandra.db.ReadCommand;
@@ -50,6 +56,10 @@ import org.apache.cassandra.dht.Murmur3Partitioner;
 import org.apache.cassandra.dht.OrderPreservingPartitioner;
 import org.apache.cassandra.dht.RandomPartitioner;
 import org.apache.cassandra.dht.Token;
+import org.apache.cassandra.gms.ApplicationState;
+import org.apache.cassandra.gms.EndpointState;
+import org.apache.cassandra.gms.HeartBeatState;
+import org.apache.cassandra.gms.VersionedValue;
 import org.apache.cassandra.locator.InetAddressAndPort;
 import org.apache.cassandra.net.ConnectionType;
 import org.apache.cassandra.net.Message;
@@ -316,4 +326,166 @@ public final class CassandraGenerators
             }
         }, true);
     }
+
+    public static Gen<Token> murmurToken()
+    {
+        Constraint token = Constraint.between(Long.MIN_VALUE, Long.MAX_VALUE);
+        return rs -> new Murmur3Partitioner.LongToken(rs.next(token));
+    }
+
+    public static Gen<Token> byteOrderToken()
+    {
+        Constraint size = Constraint.between(0, 10);
+        Constraint byteRange = Constraint.between(Byte.MIN_VALUE, 
Byte.MAX_VALUE);
+        return rs -> {
+            byte[] token = new byte[Math.toIntExact(rs.next(size))];
+            for (int i = 0; i < token.length; i++)
+                token[i] = (byte) rs.next(byteRange);
+            return new ByteOrderedPartitioner.BytesToken(token);
+        };
+    }
+
+    public static Gen<Token> randomPartitionerToken()
+    {
+        Constraint domain = Constraint.none();
+        return rs -> new 
RandomPartitioner.BigIntegerToken(BigInteger.valueOf(rs.next(domain)));
+    }
+
+    public static Gen<Token> localPartitionerToken(LocalPartitioner 
partitioner)
+    {
+        Gen<ByteBuffer> bytes = 
AbstractTypeGenerators.getTypeSupport(partitioner.getTokenValidator()).bytesGen();
+        return rs -> partitioner.getToken(bytes.generate(rs));
+    }
+
+    public static Gen<Token> orderPreservingToken()
+    {
+        Gen<String> string = Generators.utf8(0, 10);
+        return rs -> new 
OrderPreservingPartitioner.StringToken(string.generate(rs));
+    }
+
+    public static Gen<Token> token(IPartitioner partitioner)
+    {
+        if (partitioner instanceof Murmur3Partitioner) return murmurToken();
+        if (partitioner instanceof ByteOrderedPartitioner) return 
byteOrderToken();
+        if (partitioner instanceof RandomPartitioner) return 
randomPartitionerToken();
+        if (partitioner instanceof LocalPartitioner) return 
localPartitionerToken((LocalPartitioner) partitioner);
+        if (partitioner instanceof OrderPreservingPartitioner) return 
orderPreservingToken();
+        throw new UnsupportedOperationException("Unsupported partitioner: " + 
partitioner.getClass());
+    }
+
+    public static Gen<? extends Collection<Token>> tokens(IPartitioner 
partitioner)
+    {
+        Gen<Token> tokenGen = token(partitioner);
+        return SourceDSL.lists().of(tokenGen).ofSizeBetween(1, 16);
+    }
+
+    public static Gen<HeartBeatState> heartBeatStates()
+    {
+        Constraint generationDomain = Constraint.between(0, Integer.MAX_VALUE);
+        Constraint versionDomain = Constraint.between(-1, Integer.MAX_VALUE);
+        return rs -> new 
HeartBeatState(Math.toIntExact(rs.next(generationDomain)), 
Math.toIntExact(rs.next(versionDomain)));
+    }
+
+    private static Gen<Map<ApplicationState, VersionedValue>> 
gossipApplicationStates()
+    {
+        //TODO support all application states...
+        // atm only used by a single test, which only looks at status
+        Gen<Boolean> statusWithPort = SourceDSL.booleans().all();
+        Gen<VersionedValue> statusGen = gossipStatusValue();
+
+        return rs -> {
+            ApplicationState statusState = statusWithPort.generate(rs) ? 
ApplicationState.STATUS_WITH_PORT : ApplicationState.STATUS;
+            VersionedValue vv = statusGen.generate(rs);
+            if (vv == null) return ImmutableMap.of();
+            return ImmutableMap.of(statusState, vv);
+        };
+    }
+
+    private static Gen<String> gossipStatus()
+    {
+        return SourceDSL.arbitrary()
+                        .pick(VersionedValue.STATUS_NORMAL,
+                              VersionedValue.STATUS_BOOTSTRAPPING_REPLACE,
+                              VersionedValue.STATUS_BOOTSTRAPPING,
+                              VersionedValue.STATUS_MOVING,
+                              VersionedValue.STATUS_LEAVING,
+                              VersionedValue.STATUS_LEFT,
+
+                              //TODO would be good to prefix with STATUS_ like 
others
+                              VersionedValue.REMOVING_TOKEN,
+                              VersionedValue.REMOVED_TOKEN,
+                              VersionedValue.HIBERNATE + 
VersionedValue.DELIMITER + true,
+                              VersionedValue.HIBERNATE + 
VersionedValue.DELIMITER + false,
+                              VersionedValue.SHUTDOWN + 
VersionedValue.DELIMITER + true,
+                              VersionedValue.SHUTDOWN + 
VersionedValue.DELIMITER + false,
+                              ""
+                        );
+    }
+
+    private static Gen<VersionedValue> gossipStatusValue()
+    {
+        IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
+        Gen<String> statusGen = gossipStatus();
+        Gen<Token> tokenGen = token(partitioner);
+        Gen<? extends Collection<Token>> tokensGen = tokens(partitioner);
+        Gen<InetAddress> addressGen = Generators.INET_ADDRESS_GEN;
+        Gen<InetAddressAndPort> addressAndGenGen = INET_ADDRESS_AND_PORT_GEN;
+        Gen<Boolean> bool = SourceDSL.booleans().all();
+        Constraint millis = Constraint.between(0, Long.MAX_VALUE);
+        Constraint version = Constraint.between(0, Integer.MAX_VALUE);
+        Gen<UUID> hostId = Generators.UUID_RANDOM_GEN;
+        VersionedValue.VersionedValueFactory factory = new 
VersionedValue.VersionedValueFactory(partitioner);
+        return rs -> {
+            String status = statusGen.generate(rs);
+            switch (status)
+            {
+                case "":
+                    return null;
+                case VersionedValue.STATUS_NORMAL:
+                    return 
factory.normal(tokensGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.STATUS_BOOTSTRAPPING:
+                    return 
factory.bootstrapping(tokensGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.STATUS_BOOTSTRAPPING_REPLACE:
+                    if (bool.generate(rs)) return 
factory.bootReplacingWithPort(addressAndGenGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+                    else return 
factory.bootReplacing(addressGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.STATUS_MOVING:
+                    return 
factory.moving(tokenGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.STATUS_LEAVING:
+                    return 
factory.leaving(tokensGen.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.STATUS_LEFT:
+                    return factory.left(tokensGen.generate(rs), 
rs.next(millis)).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.REMOVING_TOKEN:
+                    return 
factory.removingNonlocal(hostId.generate(rs)).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.REMOVED_TOKEN:
+                    return factory.removedNonlocal(hostId.generate(rs), 
rs.next(millis)).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.HIBERNATE + VersionedValue.DELIMITER + 
true:
+                    return 
factory.hibernate(true).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.HIBERNATE + VersionedValue.DELIMITER + 
false:
+                    return 
factory.hibernate(false).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.SHUTDOWN + VersionedValue.DELIMITER + true:
+                    return 
factory.shutdown(true).withVersion(Math.toIntExact(rs.next(version)));
+                case VersionedValue.SHUTDOWN + VersionedValue.DELIMITER + 
false:
+                    return 
factory.shutdown(false).withVersion(Math.toIntExact(rs.next(version)));
+                default:
+                    throw new AssertionError("Unexpected status: " + status);
+            }
+        };
+    }
+
+    public static Gen<EndpointState> endpointStates()
+    {
+        Gen<HeartBeatState> hbGen = heartBeatStates();
+        Gen<Map<ApplicationState, VersionedValue>> appStates = 
gossipApplicationStates();
+        Gen<Boolean> alive = SourceDSL.booleans().all();
+        Constraint updateTimestamp = Constraint.between(0, Long.MAX_VALUE);
+        return rs -> {
+            EndpointState state = new EndpointState(hbGen.generate(rs));
+            Map<ApplicationState, VersionedValue> map = appStates.generate(rs);
+            if (!map.isEmpty()) state.addApplicationStates(map);
+            if (alive.generate(rs)) state.markAlive();
+            else state.markDead();
+            state.unsafeSetUpdateTimestamp(rs.next(updateTimestamp));
+            return state;
+        };
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to