This is an automated email from the ASF dual-hosted git repository. bereng pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit f7462446a450eb40daeb93c61dd1a419f3dee6b4 Merge: d3658f0491 0f9bb246cd Author: Bereng <[email protected]> AuthorDate: Mon Apr 14 09:23:30 2025 +0200 Merge branch 'cassandra-5.0' into trunk * cassandra-5.0: CASSANDRA-19633 Replaced node is stuck in a loop calculating ranges .../config/CassandraRelevantProperties.java | 7 ++ .../org/apache/cassandra/dht/RangeStreamer.java | 9 +- .../org/apache/cassandra/dht/BootStrapperTest.java | 128 +++++++++++++++------ 3 files changed, 106 insertions(+), 38 deletions(-) diff --cc src/java/org/apache/cassandra/config/CassandraRelevantProperties.java index e83dfaf53a,e1e080a0a4..e14afdcdca --- a/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java +++ b/src/java/org/apache/cassandra/config/CassandraRelevantProperties.java @@@ -492,8 -481,13 +492,15 @@@ public enum CassandraRelevantPropertie SET_SEP_THREAD_NAME("cassandra.set_sep_thread_name", "true"), SHUTDOWN_ANNOUNCE_DELAY_IN_MS("cassandra.shutdown_announce_in_ms", "2000"), SIZE_RECORDER_INTERVAL("cassandra.size_recorder_interval", "300"), + SKIP_AUTH_SETUP("cassandra.skip_auth_setup", "false"), + SKIP_GC_INSPECTOR("cassandra.skip_gc_inspector", "false"), + + /** + * Do not try to calculate optimal streaming candidates. This can take a lot of time in some configs specially + * with vnodes. + */ + SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION("cassandra.skip_optimal_streaming_candidates_calculation", "false"), + SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE("cassandra.skip_paxos_repair_on_topology_change"), /** If necessary for operational purposes, permit certain keyspaces to be ignored for paxos topology repairs. */ SKIP_PAXOS_REPAIR_ON_TOPOLOGY_CHANGE_KEYSPACES("cassandra.skip_paxos_repair_on_topology_change_keyspaces"), diff --cc test/unit/org/apache/cassandra/dht/BootStrapperTest.java index f82a51fc58,8f766015f7..ad5829c7c0 --- a/test/unit/org/apache/cassandra/dht/BootStrapperTest.java +++ b/test/unit/org/apache/cassandra/dht/BootStrapperTest.java @@@ -18,12 -18,10 +18,14 @@@ package org.apache.cassandra.dht; import java.net.UnknownHostException; +import java.util.Collection; import java.util.List; import java.util.Random; +import java.util.Set; +import java.util.stream.Collectors; + import java.util.concurrent.atomic.AtomicBoolean; + + import com.google.common.base.Predicate; import com.google.common.base.Predicates; import com.google.common.collect.Lists; @@@ -31,32 -29,30 +33,37 @@@ import com.google.common.collect.Multim import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; + import org.junit.runner.RunWith; import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.ServerTestUtils; + import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.Keyspace; -import org.apache.cassandra.dht.RangeStreamer.FetchReplica; +import org.apache.cassandra.distributed.test.log.ClusterMetadataTestHelper; import org.apache.cassandra.exceptions.ConfigurationException; import org.apache.cassandra.gms.IFailureDetectionEventListener; import org.apache.cassandra.gms.IFailureDetector; import org.apache.cassandra.locator.InetAddressAndPort; -import org.apache.cassandra.locator.TokenMetadata; import org.apache.cassandra.locator.Replica; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.SchemaConstants; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.streaming.StreamOperation; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.ownership.MovementMap; +import org.apache.cassandra.tcm.sequences.BootstrapAndJoin; +import org.apache.cassandra.utils.Pair; + import org.jboss.byteman.contrib.bmunit.BMRule; + import org.jboss.byteman.contrib.bmunit.BMRules; + import org.jboss.byteman.contrib.bmunit.BMUnitRunner; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; - + @RunWith(BMUnitRunner.class) public class BootStrapperTest { static IPartitioner oldPartitioner; @@@ -96,91 -105,88 +119,124 @@@ } } + @Test + @BMRules(rules = { @BMRule(name = "Make sure the non-optimized path is picked up for some operations", + targetClass = "org.apache.cassandra.dht.RangeStreamer", + targetMethod = "convertPreferredEndpointsToWorkMap(EndpointsByReplica)", + action = "org.apache.cassandra.dht.BootStrapperTest.nonOptimizationHit.set(true)"), + @BMRule(name = "Make sure the optimized path is picked up for some operations", + targetClass = "org.apache.cassandra.dht.RangeStreamer", + targetMethod = "getOptimizedWorkMap(EndpointsByReplica,Collection,String)", + action = "org.apache.cassandra.dht.BootStrapperTest.optimizationHit.set(true)") }) + public void testStreamingCandidatesOptmizationSkip() throws UnknownHostException + { - testSkipStreamingCandidatesOptmizationFeatureFlag(true, true, false); - testSkipStreamingCandidatesOptmizationFeatureFlag(false, true, true); ++ testSkipStreamingCandidatesOptmizationFeatureFlag(true, true, false, getRangeStreamer()); ++ testSkipStreamingCandidatesOptmizationFeatureFlag(false, true, true, getRangeStreamer()); + } + - private void testSkipStreamingCandidatesOptmizationFeatureFlag(boolean disableOptimization, boolean nonOptimizedPathHit, boolean optimizedPathHit) throws UnknownHostException ++ private void testSkipStreamingCandidatesOptmizationFeatureFlag(boolean disableOptimization, boolean nonOptimizedPathHit, boolean optimizedPathHit, RangeStreamer s) throws UnknownHostException + { + try + { + nonOptimizationHit.set(false); + optimizationHit.set(false); + CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.setBoolean(disableOptimization); + - for (String keyspaceName : Schema.instance.getUserKeyspaces()) - { - StorageService ss = StorageService.instance; - TokenMetadata tmd = ss.getTokenMetadata(); - - generateFakeEndpoints(10); - Token myToken = tmd.partitioner.getRandomToken(); - InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1"); - - assertEquals(10, tmd.sortedTokens().size()); - RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), mockFailureDetector, false, 1); - s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint)); - } ++ for (String keyspaceName : Schema.instance.getUserKeyspaces().names()) ++ s.addKeyspaceToFetch(keyspaceName); + + assertEquals(nonOptimizedPathHit, nonOptimizationHit.get()); - assertEquals(optimizedPathHit, optimizationHit.get()); ++ if (disableOptimization) // The optimized path may or not be hit depending on the code. ++ assertEquals(optimizedPathHit, optimizationHit.get()); + } + finally + { + CassandraRelevantProperties.SKIP_OPTIMAL_STREAMING_CANDIDATES_CALCULATION.reset(); + } + } + private RangeStreamer testSourceTargetComputation(String keyspaceName, int numOldNodes, int replicationFactor) throws UnknownHostException { - StorageService ss = StorageService.instance; - TokenMetadata tmd = ss.getTokenMetadata(); - + ServerTestUtils.resetCMS(); generateFakeEndpoints(numOldNodes); - Token myToken = tmd.partitioner.getRandomToken(); - InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1"); + ClusterMetadata metadata = ClusterMetadata.current(); - + assertEquals(numOldNodes, metadata.tokenMap.tokens().size()); - IFailureDetector mockFailureDetector = new IFailureDetector() - { - public boolean isAlive(InetAddressAndPort ep) - { - return true; - } - - public void interpret(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } - public void report(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } - public void registerFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); } - public void unregisterFailureDetectionEventListener(IFailureDetectionEventListener listener) { throw new UnsupportedOperationException(); } - public void remove(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } - public void forceConviction(InetAddressAndPort ep) { throw new UnsupportedOperationException(); } - }; - - Token myToken = metadata.partitioner.getRandomToken(); - InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1"); - NodeId newNode = ClusterMetadataTestHelper.register(myEndpoint); - ClusterMetadataTestHelper.JoinProcess join = ClusterMetadataTestHelper.lazyJoin(myEndpoint, myToken); - join.prepareJoin(); - metadata = ClusterMetadata.current(); - BootstrapAndJoin joiningPlan = (BootstrapAndJoin) metadata.inProgressSequences.get(newNode); - Pair<MovementMap, MovementMap> movements = joiningPlan.getMovementMaps(metadata); - RangeStreamer s = new RangeStreamer(metadata, - StreamOperation.BOOTSTRAP, - true, - DatabaseDescriptor.getNodeProximity(), - new StreamStateStore(), - mockFailureDetector, - false, - 1, - movements.left, - movements.right); ++ RangeStreamer s = getRangeStreamer(); - assertEquals(numOldNodes, tmd.sortedTokens().size()); - RangeStreamer s = new RangeStreamer(tmd, null, myEndpoint, StreamOperation.BOOTSTRAP, true, DatabaseDescriptor.getEndpointSnitch(), new StreamStateStore(), mockFailureDetector, false, 1); assertNotNull(Keyspace.open(keyspaceName)); - s.addRanges(keyspaceName, Keyspace.open(keyspaceName).getReplicationStrategy().getPendingAddressRanges(tmd, myToken, myEndpoint)); - + s.addKeyspaceToFetch(keyspaceName); + Multimap<InetAddressAndPort, RangeStreamer.FetchReplica> toFetch = s.toFetch().get(keyspaceName); - Multimap<InetAddressAndPort, FetchReplica> toFetch = s.toFetch().get(keyspaceName); - - // Check we get get RF new ranges in total - assertEquals(replicationFactor, toFetch.size()); + // Pre-TCM this test would always run with RangeStreamer::useStrictSourcesForRanges returning false because + // the RangeStreamer instance was constructed with a null Collection<Token>, relying on pending ranges being + // calculated in the test code, and then cloning TokenMetadata with pending tokens added to pass to + // calculateRangesToFetchWithPreferredEndpoints. Post-TCM, we calculate both relaxed and strict movements + // together and TokenMetadata is no more, so the equivalent operation now ends up using strict movements. For + // that reason, when RF includes transient replicas, toFetch will include both a transient and full source, + // hence we dedupe the ranges here. + Set<Range<Token>> fetchRanges = toFetch.values() + .stream() + .map(fr -> fr.remote.range()) + .collect(Collectors.toSet()); + // Post CEP-21 wrapping ranges are also unwrapped at this point, so account for that when setting expectation + int expectedRangeCount = replicationFactor + (includesWraparound(fetchRanges) ? 1 : 0); + assertEquals(expectedRangeCount, fetchRanges.size()); // there isn't any point in testing the size of these collections for any specific size. When a random partitioner // is used, they will vary. assert toFetch.values().size() > 0; -- assert toFetch.keys().stream().noneMatch(myEndpoint::equals); ++ ++ assert toFetch.keys().stream().noneMatch(InetAddressAndPort.getByName("127.0.0.1")::equals); return s; } ++ private RangeStreamer getRangeStreamer() throws UnknownHostException ++ { ++ ClusterMetadata metadata = ClusterMetadata.current(); ++ Pair<MovementMap, MovementMap> movements = Pair.create(MovementMap.empty(), MovementMap.empty()); ++ ++ if (metadata.myNodeId() == null) ++ { ++ Token myToken = metadata.partitioner.getRandomToken(); ++ InetAddressAndPort myEndpoint = InetAddressAndPort.getByName("127.0.0.1"); ++ NodeId newNode = ClusterMetadataTestHelper.register(myEndpoint); ++ ClusterMetadataTestHelper.JoinProcess join = ClusterMetadataTestHelper.lazyJoin(myEndpoint, myToken); ++ join.prepareJoin(); ++ metadata = ClusterMetadata.current(); ++ BootstrapAndJoin joiningPlan = (BootstrapAndJoin) metadata.inProgressSequences.get(newNode); ++ movements = joiningPlan.getMovementMaps(metadata); ++ } ++ ++ return new RangeStreamer(metadata, ++ StreamOperation.BOOTSTRAP, ++ true, ++ DatabaseDescriptor.getNodeProximity(), ++ new StreamStateStore(), ++ mockFailureDetector, ++ false, ++ 1, ++ movements.left, ++ movements.right); ++ } ++ + private boolean includesWraparound(Collection<Range<Token>> toFetch) + { + long minTokenCount = toFetch.stream() + .filter(r -> r.left.isMinimum() || r.right.isMinimum()) + .count(); + assertTrue("Ranges to fetch should either include both or neither parts of normalised wrapping range", + minTokenCount % 2 == 0); + return minTokenCount > 0; + } + private void generateFakeEndpoints(int numOldNodes) throws UnknownHostException { - generateFakeEndpoints(StorageService.instance.getTokenMetadata(), numOldNodes, 1); + generateFakeEndpoints(numOldNodes, 1); } - private void generateFakeEndpoints(TokenMetadata tmd, int numOldNodes, int numVNodes) throws UnknownHostException + private void generateFakeEndpoints(int numOldNodes, int numVNodes) throws UnknownHostException { - tmd.clearUnsafe(); - generateFakeEndpoints(tmd, numOldNodes, numVNodes, "0", "0"); + generateFakeEndpoints(numOldNodes, numVNodes, "0", "0"); } Random rand = new Random(1); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
