This is an automated email from the ASF dual-hosted git repository.

bereng pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit f7462446a450eb40daeb93c61dd1a419f3dee6b4
Merge: d3658f0491 0f9bb246cd
Author: Bereng <[email protected]>
AuthorDate: Mon Apr 14 09:23:30 2025 +0200

    Merge branch 'cassandra-5.0' into trunk
    
    * cassandra-5.0:
      CASSANDRA-19633 Replaced node is stuck in a loop calculating ranges

 .../config/CassandraRelevantProperties.java        |   7 ++
 .../org/apache/cassandra/dht/RangeStreamer.java    |   9 +-
 .../org/apache/cassandra/dht/BootStrapperTest.java | 128 +++++++++++++++------
 3 files changed, 106 insertions(+), 38 deletions(-)

diff --cc src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
index e83dfaf53a,e1e080a0a4..e14afdcdca
--- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
+++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java
@@@ -492,8 -481,13 +492,15 @@@ public enum CassandraRelevantPropertie
      SET_SEP_THREAD_NAME("cassandra.set_sep_thread_name", "true"),
      SHUTDOWN_ANNOUNCE_DELAY_IN_MS("cassandra.shutdown_announce_in_ms", 
"2000"),
      SIZE_RECORDER_INTERVAL("cassandra.size_recorder_interval", "300"),
 +    SKIP_AUTH_SETUP("cassandra.skip_auth_setup", "false"),
 +    SKIP_GC_INSPECTOR("cassandra.skip_gc_inspector", "false"),
+ 
+     /**
+      * Do not try to calculate optimal streaming candidates. This can take a 
lot of time in some configs specially
+      * with vnodes.
+      */
+     
SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation",
 "false"),
+ 
      
SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE("cassandra.skip_paxos_repair_on_topology_change"),
      /** If necessary for operational purposes, permit certain keyspaces to be 
ignored for paxos topology repairs. */
      
SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_KEYSPACES("cassandra.skip_paxos_repair_on_topology_change_keyspaces"),
diff --cc test/unit/org/apache/cassandra/dht/BootStrapperTest.java
index f82a51fc58,8f766015f7..ad5829c7c0
--- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
+++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java
@@@ -18,12 -18,10 +18,14 @@@
  package org.apache.cassandra.dht;
  
  import java.net.UnknownHostException;
 +import java.util.Collection;
  import java.util.List;
  import java.util.Random;
 +import java.util.Set;
 +import java.util.stream.Collectors;
+ import java.util.concurrent.atomic.AtomicBoolean;
+ 
 +
  import com.google.common.base.Predicate;
  import com.google.common.base.Predicates;
  import com.google.common.collect.Lists;
@@@ -31,32 -29,30 +33,37 @@@ import com.google.common.collect.Multim
  import org.junit.AfterClass;
  import org.junit.BeforeClass;
  import org.junit.Test;
+ import org.junit.runner.RunWith;
  
  import org.apache.cassandra.SchemaLoader;
 +import org.apache.cassandra.ServerTestUtils;
+ import org.apache.cassandra.config.CassandraRelevantProperties;
  import org.apache.cassandra.config.DatabaseDescriptor;
  import org.apache.cassandra.db.Keyspace;
 -import org.apache.cassandra.dht.RangeStreamer.FetchReplica;
 +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper;
  import org.apache.cassandra.exceptions.ConfigurationException;
  import org.apache.cassandra.gms.IFailureDetectionEventListener;
  import org.apache.cassandra.gms.IFailureDetector;
  import org.apache.cassandra.locator.InetAddressAndPort;
 -import org.apache.cassandra.locator.TokenMetadata;
  import org.apache.cassandra.locator.Replica;
  import org.apache.cassandra.schema.Schema;
 +import org.apache.cassandra.schema.SchemaConstants;
  import org.apache.cassandra.service.StorageService;
  import org.apache.cassandra.streaming.StreamOperation;
 +import org.apache.cassandra.tcm.ClusterMetadata;
 +import org.apache.cassandra.tcm.membership.NodeId;
 +import org.apache.cassandra.tcm.ownership.MovementMap;
 +import org.apache.cassandra.tcm.sequences.BootstrapAndJoin;
 +import org.apache.cassandra.utils.Pair;
+ import org.jboss.byteman.contrib.bmunit.BMRule;
+ import org.jboss.byteman.contrib.bmunit.BMRules;
+ import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
  
  import static org.junit.Assert.assertEquals;
  import static org.junit.Assert.assertNotNull;
 +import static org.junit.Assert.assertTrue;
  
- 
+ @RunWith(BMUnitRunner.class)
  public class BootStrapperTest
  {
      static IPartitioner oldPartitioner;
@@@ -96,91 -105,88 +119,124 @@@
          }
      }
  
+     @Test
+     @BMRules(rules = { @BMRule(name = "Make sure the non-optimized path is 
picked up for some operations",
+                                targetClass = 
"org.apache.cassandra.dht.RangeStreamer",
+                                targetMethod = 
"convertPreferredEndpointsToWorkMap(EndpointsByReplica)",
+                                action = 
"org.apache.cassandra.dht.BootStrapperTest.nonOptimizationHit.set(true)"),
+                        @BMRule(name = "Make sure the optimized path is picked 
up for some operations",
+                                targetClass = 
"org.apache.cassandra.dht.RangeStreamer",
+                                targetMethod = 
"getOptimizedWorkMap(EndpointsByReplica,Collection,String)",
+                                action = 
"org.apache.cassandra.dht.BootStrapperTest.optimizationHit.set(true)") })
+     public void testStreamingCandidatesOptmizationSkip() throws 
UnknownHostException
+     {
 -        testSkipStreamingCandidatesOptmizationFeatureFlag(true, true, false);
 -        testSkipStreamingCandidatesOptmizationFeatureFlag(false, true, true);
++        testSkipStreamingCandidatesOptmizationFeatureFlag(true, true, false, 
getRangeStreamer());
++        testSkipStreamingCandidatesOptmizationFeatureFlag(false, true, true, 
getRangeStreamer());
+     }
+ 
 -    private void testSkipStreamingCandidatesOptmizationFeatureFlag(boolean 
disableOptimization, boolean nonOptimizedPathHit, boolean optimizedPathHit) 
throws UnknownHostException
++    private void testSkipStreamingCandidatesOptmizationFeatureFlag(boolean 
disableOptimization, boolean nonOptimizedPathHit, boolean optimizedPathHit, 
RangeStreamer s) throws UnknownHostException
+     {
+         try
+         {
+             nonOptimizationHit.set(false);
+             optimizationHit.set(false);
+             
CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.setBoolean(disableOptimization);
+ 
 -            for (String keyspaceName : Schema.instance.getUserKeyspaces())
 -            {
 -                StorageService ss = StorageService.instance;
 -                TokenMetadata tmd = ss.getTokenMetadata();
 -
 -                generateFakeEndpoints(10);
 -                Token myToken = tmd.partitioner.getRandomToken();
 -                InetAddressAndPort myEndpoint = 
InetAddressAndPort.getByName("127.0.0.1");
 -
 -                assertEquals(10, tmd.sortedTokens().size());
 -                RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, 
StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new 
StreamStateStore(), mockFailureDetector, false, 1);
 -                s.addRanges(keyspaceName, 
Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd,
 myToken, myEndpoint));
 -            }
++            for (String keyspaceName : 
Schema.instance.getUserKeyspaces().names())
++                s.addKeyspaceToFetch(keyspaceName);
+ 
+             assertEquals(nonOptimizedPathHit, nonOptimizationHit.get());
 -            assertEquals(optimizedPathHit, optimizationHit.get());
++            if (disableOptimization) // The optimized path may or not be hit 
depending on the code.
++                assertEquals(optimizedPathHit, optimizationHit.get());
+         }
+         finally
+         {
+             
CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.reset();
+         }
+     }
+ 
      private RangeStreamer testSourceTargetComputation(String keyspaceName, 
int numOldNodes, int replicationFactor) throws UnknownHostException
      {
 -        StorageService ss = StorageService.instance;
 -        TokenMetadata tmd = ss.getTokenMetadata();
 -
 +        ServerTestUtils.resetCMS();
          generateFakeEndpoints(numOldNodes);
 -        Token myToken = tmd.partitioner.getRandomToken();
 -        InetAddressAndPort myEndpoint = 
InetAddressAndPort.getByName("127.0.0.1");
 +        ClusterMetadata metadata = ClusterMetadata.current();
- 
 +        assertEquals(numOldNodes, metadata.tokenMap.tokens().size());
-         IFailureDetector mockFailureDetector = new IFailureDetector()
-         {
-             public boolean isAlive(InetAddressAndPort ep)
-             {
-                 return true;
-             }
- 
-             public void interpret(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
-             public void report(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
-             public void 
registerFailureDetectionEventListener(IFailureDetectionEventListener listener) 
{ throw new UnsupportedOperationException(); }
-             public void 
unregisterFailureDetectionEventListener(IFailureDetectionEventListener 
listener) { throw new UnsupportedOperationException(); }
-             public void remove(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
-             public void forceConviction(InetAddressAndPort ep) { throw new 
UnsupportedOperationException(); }
-         };
- 
-         Token myToken = metadata.partitioner.getRandomToken();
-         InetAddressAndPort myEndpoint = 
InetAddressAndPort.getByName("127.0.0.1");
-         NodeId newNode = ClusterMetadataTestHelper.register(myEndpoint);
-         ClusterMetadataTestHelper.JoinProcess join = 
ClusterMetadataTestHelper.lazyJoin(myEndpoint, myToken);
-         join.prepareJoin();
-         metadata = ClusterMetadata.current();
-         BootstrapAndJoin joiningPlan = (BootstrapAndJoin) 
metadata.inProgressSequences.get(newNode);
-         Pair<MovementMap, MovementMap> movements = 
joiningPlan.getMovementMaps(metadata);
-         RangeStreamer s = new RangeStreamer(metadata,
-                                             StreamOperation.BOOTSTRAP,
-                                             true,
-                                             
DatabaseDescriptor.getNodeProximity(),
-                                             new StreamStateStore(),
-                                             mockFailureDetector,
-                                             false,
-                                             1,
-                                             movements.left,
-                                             movements.right);
++        RangeStreamer s = getRangeStreamer();
  
 -        assertEquals(numOldNodes, tmd.sortedTokens().size());
 -        RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, 
StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new 
StreamStateStore(), mockFailureDetector, false, 1);
          assertNotNull(Keyspace.open(keyspaceName));
 -        s.addRanges(keyspaceName, 
Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd,
 myToken, myEndpoint));
 -
 +        s.addKeyspaceToFetch(keyspaceName);
 +        Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> toFetch = 
s.toFetch().get(keyspaceName);
  
 -        Multimap<InetAddressAndPort, FetchReplica> toFetch = 
s.toFetch().get(keyspaceName);
 -
 -        // Check we get get RF new ranges in total
 -        assertEquals(replicationFactor, toFetch.size());
 +        // Pre-TCM this test would always run with 
RangeStreamer::useStrictSourcesForRanges returning false because
 +        // the RangeStreamer instance was constructed with a null 
Collection<Token>, relying on pending ranges being
 +        // calculated in the test code, and then cloning TokenMetadata with 
pending tokens added to pass to
 +        // calculateRangesToFetchWithPreferredEndpoints. Post-TCM, we 
calculate both relaxed and strict movements
 +        // together and TokenMetadata is no more, so the equivalent operation 
now ends up using strict movements. For
 +        // that reason, when RF includes transient replicas, toFetch will 
include both a transient and full source,
 +        // hence we dedupe the ranges here.
 +        Set<Range<Token>> fetchRanges = toFetch.values()
 +                                               .stream()
 +                                               .map(fr -> fr.remote.range())
 +                                               .collect(Collectors.toSet());
 +        // Post CEP-21 wrapping ranges are also unwrapped at this point, so 
account for that when setting expectation
 +        int expectedRangeCount = replicationFactor + 
(includesWraparound(fetchRanges) ? 1 : 0);
 +        assertEquals(expectedRangeCount, fetchRanges.size());
  
          // there isn't any point in testing the size of these collections for 
any specific size.  When a random partitioner
          // is used, they will vary.
          assert toFetch.values().size() > 0;
--        assert toFetch.keys().stream().noneMatch(myEndpoint::equals);
++
++        assert 
toFetch.keys().stream().noneMatch(InetAddressAndPort.getByName("127.0.0.1")::equals);
          return s;
      }
  
++    private RangeStreamer getRangeStreamer() throws UnknownHostException
++    {
++        ClusterMetadata metadata = ClusterMetadata.current();
++        Pair<MovementMap, MovementMap> movements = 
Pair.create(MovementMap.empty(), MovementMap.empty());
++
++        if (metadata.myNodeId() == null)
++        {
++            Token myToken = metadata.partitioner.getRandomToken();
++            InetAddressAndPort myEndpoint = 
InetAddressAndPort.getByName("127.0.0.1");
++            NodeId newNode = ClusterMetadataTestHelper.register(myEndpoint);
++            ClusterMetadataTestHelper.JoinProcess join = 
ClusterMetadataTestHelper.lazyJoin(myEndpoint, myToken);
++            join.prepareJoin();
++            metadata = ClusterMetadata.current();
++            BootstrapAndJoin joiningPlan = (BootstrapAndJoin) 
metadata.inProgressSequences.get(newNode);
++            movements = joiningPlan.getMovementMaps(metadata);
++        }
++
++        return new RangeStreamer(metadata,
++               StreamOperation.BOOTSTRAP,
++               true,
++               DatabaseDescriptor.getNodeProximity(),
++               new StreamStateStore(),
++               mockFailureDetector,
++               false,
++               1,
++               movements.left,
++               movements.right);
++    }
++
 +    private boolean includesWraparound(Collection<Range<Token>> toFetch)
 +    {
 +        long minTokenCount = toFetch.stream()
 +                                    .filter(r -> r.left.isMinimum() || 
r.right.isMinimum())
 +                                    .count();
 +        assertTrue("Ranges to fetch should either include both or neither 
parts of normalised wrapping range",
 +                   minTokenCount % 2 == 0);
 +        return minTokenCount > 0;
 +    }
 +
      private void generateFakeEndpoints(int numOldNodes) throws 
UnknownHostException
      {
 -        generateFakeEndpoints(StorageService.instance.getTokenMetadata(), 
numOldNodes, 1);
 +        generateFakeEndpoints(numOldNodes, 1);
      }
  
 -    private void generateFakeEndpoints(TokenMetadata tmd, int numOldNodes, 
int numVNodes) throws UnknownHostException
 +    private void generateFakeEndpoints(int numOldNodes, int numVNodes) throws 
UnknownHostException
      {
 -        tmd.clearUnsafe();
 -        generateFakeEndpoints(tmd, numOldNodes, numVNodes, "0", "0");
 +        generateFakeEndpoints(numOldNodes, numVNodes, "0", "0");
      }
  
      Random rand = new Random(1);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to