This is an automated email from the ASF dual-hosted git repository. marcuse pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 4ed54ecb2adefe679dc74e74f90e61b5fcefd8b0 Author: Marcus Eriksson <[email protected]> AuthorDate: Fri Nov 21 16:10:55 2025 +0100 Improve isGossipOnlyMember and location lookup performance Patch by marcuse; reviewed by Sam Tunnicliffe for CASSANDRA-21039 --- CHANGES.txt | 1 + .../cassandra/config/DatabaseDescriptor.java | 12 +- src/java/org/apache/cassandra/gms/Gossiper.java | 13 +- src/java/org/apache/cassandra/locator/Locator.java | 38 +++-- .../cassandra/tcm/ClusterMetadataService.java | 1 + .../cassandra/tcm/compatibility/GossipHelper.java | 5 +- .../apache/cassandra/tcm/membership/NodeId.java | 5 +- .../cassandra/utils/btree/AbstractBTreeMap.java | 1 - .../apache/cassandra/utils/btree/BTreeBiMap.java | 6 +- .../cassandra/utils/btree/BTreeMultimap.java | 21 +++ .../cassandra/test/microbench/LocatorBench.java | 158 +++++++++++++++++++++ 11 files changed, 234 insertions(+), 27 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 9e31f5ea19..f31d7c39f7 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.1 + * Improve isGossipOnlyMember and location lookup performance (CASSANDRA-21039) * Improve debug around paused and disabled compaction (CASSANDRA-20131,CASSANDRA-19728) * DiskUsageBroadcaster does not update usageInfo on node replacement (CASSANDRA-21033) * Reject PrepareJoin if tokens are already assigned (CASSANDRA-21006) diff --git a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java index bf0beadd0e..23e0aa8341 100644 --- a/src/java/org/apache/cassandra/config/DatabaseDescriptor.java +++ b/src/java/org/apache/cassandra/config/DatabaseDescriptor.java @@ -1692,9 +1692,7 @@ public class DatabaseDescriptor // responsible for querying the cloud metadata service to get the public IP used for // broadcast_address and we only want to instantiate the snitch here. addressConfig.configureAddresses(); - initializationLocator = new Locator(RegistrationStatus.instance, - FBUtilities.getBroadcastAddressAndPort(), - initialLocationProvider); + applyLocator(); nodeProximity = conf.dynamic_snitch ? new DynamicEndpointSnitch(proximity) : proximity; localAddressReconnector = addressConfig.preferLocalConnections() ? new ReconnectableSnitchHelper(initializationLocator, true) @@ -1709,6 +1707,14 @@ public class DatabaseDescriptor newFailureDetector = () -> createFailureDetector(conf.failure_detector); } + @VisibleForTesting + public static void applyLocator() + { + initializationLocator = new Locator(RegistrationStatus.instance, + FBUtilities.getBroadcastAddressAndPort(), + initialLocationProvider); + } + // definitely not safe for tools + clients - implicitly instantiates schema public static void applyPartitioner() { diff --git a/src/java/org/apache/cassandra/gms/Gossiper.java b/src/java/org/apache/cassandra/gms/Gossiper.java index 386bb0e13a..83bf5169af 100644 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@ -106,6 +106,7 @@ import static org.apache.cassandra.config.DatabaseDescriptor.getPartitionerName; import static org.apache.cassandra.gms.Gossiper.GossipedWith.CMS; import static org.apache.cassandra.gms.Gossiper.GossipedWith.SEED; import static org.apache.cassandra.gms.VersionedValue.BOOTSTRAPPING_STATUS; +import static org.apache.cassandra.gms.VersionedValue.HIBERNATE; import static org.apache.cassandra.gms.VersionedValue.unsafeMakeVersionedValue; import static org.apache.cassandra.net.NoPayload.noPayload; import static org.apache.cassandra.net.Verb.ECHO_REQ; @@ -881,11 +882,15 @@ public class Gossiper implements IFailureDetectionEventListener, GossiperMBean, public boolean isGossipOnlyMember(InetAddressAndPort endpoint) { EndpointState epState = endpointStateMap.get(endpoint); - if (epState == null) - { + // gossip status only used for checking transient state HIBERNATE + if (epState == null || Gossiper.getGossipStatus(epState).equals(HIBERNATE)) return false; - } - return !isDeadState(epState) && !ClusterMetadata.current().directory.allJoinedEndpoints().contains(endpoint); + + ClusterMetadata metadata = ClusterMetadata.current(); + NodeId nodeId = metadata.directory.peerId(endpoint); + if (nodeId == null) + return false; + return NodeState.isPreJoin(metadata.directory.states.get(nodeId)); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/locator/Locator.java b/src/java/org/apache/cassandra/locator/Locator.java index 20ad604692..10f3047c29 100644 --- a/src/java/org/apache/cassandra/locator/Locator.java +++ b/src/java/org/apache/cassandra/locator/Locator.java @@ -18,6 +18,9 @@ package org.apache.cassandra.locator; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.tcm.ClusterMetadata; import org.apache.cassandra.tcm.Epoch; @@ -65,6 +68,10 @@ public class Locator // is always taken from ClusterMetadata. private volatile VersionedLocation local; + // Convenience to avoid making too many Directory lookups, which can be relatively expensive due to + // being backed by BTreeMap/BTreeBiMap. + private final Map<InetAddressAndPort, VersionedLocation> locationCache = new ConcurrentHashMap<>(); + private static class VersionedLocation { final Epoch epoch; @@ -167,23 +174,28 @@ public class Locator return new VersionedLocation(Epoch.EMPTY, Location.UNKNOWN); source = metadata.directory; } - NodeId nodeId = source.peerId(endpoint); - Location location = nodeId != null ? source.location(nodeId) : Location.UNKNOWN; - return new VersionedLocation(source.lastModified(), location); + + VersionedLocation versionedLocation = locationCache.get(endpoint); + if (versionedLocation == null || versionedLocation.epoch.isBefore(source.lastModified())) + { + NodeId nodeId = source.peerId(endpoint); + if (nodeId != null) + { + Location location = source.location(nodeId); + versionedLocation = new VersionedLocation(source.lastModified(), location); + locationCache.put(endpoint, versionedLocation); + } + else + { + versionedLocation = new VersionedLocation(source.lastModified(), Location.UNKNOWN); + } + } + return versionedLocation; } private Location fromDirectory(InetAddressAndPort endpoint) { - Directory source = directory; - if (source == null) - { - ClusterMetadata metadata = ClusterMetadata.currentNullable(); - if (metadata == null) - return Location.UNKNOWN; - source = metadata.directory; - } - NodeId nodeId = source.peerId(endpoint); - return nodeId != null ? source.location(nodeId) : Location.UNKNOWN; + return versionedFromDirectory(endpoint).location; } private Location initialLocation() diff --git a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java index 23ba4385e4..74062e91c1 100644 --- a/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java +++ b/src/java/org/apache/cassandra/tcm/ClusterMetadataService.java @@ -113,6 +113,7 @@ public class ClusterMetadataService if (newInstance.metadata().myNodeId() != null) RegistrationStatus.instance.onRegistration(); trace = new RuntimeException("Previously initialized trace"); + DatabaseDescriptor.applyLocator(); } @VisibleForTesting diff --git a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java index f569a9d1cb..2e24120873 100644 --- a/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java +++ b/src/java/org/apache/cassandra/tcm/compatibility/GossipHelper.java @@ -433,7 +433,10 @@ public class GossipHelper Set<String> hostIds = new HashSet<>(); for (EndpointState epstate : epstates.values()) { - String hostIdString = epstate.getApplicationState(HOST_ID).value; + VersionedValue vv = epstate.getApplicationState(HOST_ID); + if (vv == null) + continue; + String hostIdString = vv.value; if (hostIds.contains(hostIdString)) return true; hostIds.add(hostIdString); diff --git a/src/java/org/apache/cassandra/tcm/membership/NodeId.java b/src/java/org/apache/cassandra/tcm/membership/NodeId.java index f011314815..b73c4f00c7 100644 --- a/src/java/org/apache/cassandra/tcm/membership/NodeId.java +++ b/src/java/org/apache/cassandra/tcm/membership/NodeId.java @@ -19,7 +19,6 @@ package org.apache.cassandra.tcm.membership; import java.io.IOException; -import java.util.Objects; import java.util.UUID; import com.google.common.primitives.Ints; @@ -84,13 +83,13 @@ public class NodeId implements Comparable<NodeId>, MultiStepOperation.SequenceKe if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; NodeId nodeId = (NodeId) o; - return Objects.equals(id, nodeId.id); + return id == nodeId.id; } @Override public int hashCode() { - return Objects.hash(id); + return Long.hashCode(id); } @Override diff --git a/src/java/org/apache/cassandra/utils/btree/AbstractBTreeMap.java b/src/java/org/apache/cassandra/utils/btree/AbstractBTreeMap.java index 7ef33b417a..a515b2584b 100644 --- a/src/java/org/apache/cassandra/utils/btree/AbstractBTreeMap.java +++ b/src/java/org/apache/cassandra/utils/btree/AbstractBTreeMap.java @@ -146,7 +146,6 @@ public abstract class AbstractBTreeMap<K, V> extends AbstractMap<K, V> public void clear() { throw new UnsupportedOperationException(); } public Map.Entry<K, V> pollFirstEntry() { throw new UnsupportedOperationException(); } public Map.Entry<K, V> pollLastEntry() { throw new UnsupportedOperationException(); } - protected static class KeyComparator<K, V> implements Comparator<Map.Entry<K, V>> { protected final Comparator<K> keyComparator; diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeBiMap.java b/src/java/org/apache/cassandra/utils/btree/BTreeBiMap.java index 3480b8dbaa..675a4876db 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTreeBiMap.java +++ b/src/java/org/apache/cassandra/utils/btree/BTreeBiMap.java @@ -68,10 +68,12 @@ public class BTreeBiMap<K, V> extends AbstractBTreeMap<K, V> implements BiMap<K, return BTreeBiMap.<K, V>empty(naturalOrder(), naturalOrder()); } + private transient BTreeBiMap<V, K> inverseMap = null; @Override public BiMap<V, K> inverse() { - return new BTreeBiMap<>(inverse, tree, valueComparator, asymmetricValueComparator, comparator, asymmetricComparator); + BiMap<V, K> res = inverseMap; + return res == null ? inverseMap = new BTreeBiMap<>(inverse, tree, valueComparator, asymmetricValueComparator, comparator, asymmetricComparator) : res; } @Override @@ -87,7 +89,7 @@ public class BTreeBiMap<K, V> extends AbstractBTreeMap<K, V> implements BiMap<K, throw new IllegalArgumentException("Value already exists in map: " + value); return new BTreeBiMap<>(BTree.update(tree, new Object[]{ entry }, comparator, UpdateFunction.noOp()), - BTree.update(inverse, new Object[] { new AbstractBTreeMap.Entry<>(value, key) }, valueComparator, UpdateFunction.noOp()), + BTree.update(inverse, new Object[] { inverseEntry }, valueComparator, UpdateFunction.noOp()), comparator, asymmetricComparator, valueComparator, asymmetricValueComparator); } diff --git a/src/java/org/apache/cassandra/utils/btree/BTreeMultimap.java b/src/java/org/apache/cassandra/utils/btree/BTreeMultimap.java index 3f5a9438e6..0d44fa4afe 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTreeMultimap.java +++ b/src/java/org/apache/cassandra/utils/btree/BTreeMultimap.java @@ -151,16 +151,30 @@ public class BTreeMultimap<K, V> implements Multimap<K, V> return map.keySet(); } + private transient Multiset<K> keys = null; @Override public Multiset<K> keys() + { + Multiset<K> res = keys; + return res == null ? keys = createKeys() : res; + } + + private Multiset<K> createKeys() { ImmutableMultiset.Builder<K> keys = ImmutableMultiset.builder(); keys.addAll(map.keySet()); return keys.build(); } + private transient Collection<V> values = null; @Override public Collection<V> values() + { + Collection<V> res = values; + return res == null ? values = createValues() : res; + } + + private Collection<V> createValues() { ImmutableList.Builder<V> builder = ImmutableList.builder(); for (Map.Entry<K, Collection<V>> entry : map.entrySet()) @@ -168,8 +182,15 @@ public class BTreeMultimap<K, V> implements Multimap<K, V> return builder.build(); } + private transient Collection<Map.Entry<K,V>> entries = null; @Override public Collection<Map.Entry<K, V>> entries() + { + Collection<Map.Entry<K, V>> res = entries; + return res == null ? entries = createEntries() : res; + } + + public Collection<Map.Entry<K, V>> createEntries() { Set<Map.Entry<K, V>> entries = new HashSet<>(); for (Map.Entry<K, Collection<V>> entry : map.entrySet()) diff --git a/test/microbench/org/apache/cassandra/test/microbench/LocatorBench.java b/test/microbench/org/apache/cassandra/test/microbench/LocatorBench.java new file mode 100644 index 0000000000..28bf600b8d --- /dev/null +++ b/test/microbench/org/apache/cassandra/test/microbench/LocatorBench.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.test.microbench; + +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; +import org.apache.cassandra.distributed.api.TokenSupplier; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.tcm.ClusterMetadata; +import org.apache.cassandra.tcm.RegistrationStatus; +import org.apache.cassandra.tcm.membership.Location; +import org.apache.cassandra.tcm.membership.NodeAddresses; +import org.apache.cassandra.tcm.membership.NodeId; +import org.apache.cassandra.tcm.membership.NodeVersion; +import org.apache.cassandra.tcm.ownership.PlacementProvider; +import org.apache.cassandra.tcm.ownership.UniformRangePlacement; +import org.apache.cassandra.tcm.transformations.UnsafeJoin; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.RunnerException; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +@State(Scope.Benchmark) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(value = 1) +@Warmup(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 5000) +@Measurement(iterations = 5, timeUnit = TimeUnit.MILLISECONDS, time = 5000) +public class LocatorBench +{ + static Random random = new Random(1); + static ClusterMetadata metadata; + static InetAddressAndPort[] queryEndpoints = new InetAddressAndPort[20000]; + @Setup(Level.Trial) + public void setup() throws UnknownHostException + { + DatabaseDescriptor.daemonInitialization(); + int nodecount = 1000; + metadata = fakeMetadata(nodecount, 3, 3); + RegistrationStatus.instance.onRegistration(); + List<InetAddressAndPort> allEndpoints = new ArrayList<>(metadata.directory.allJoinedEndpoints()); + for (int i = 0; i < 20000; i++) + queryEndpoints[i] = allEndpoints.get(random.nextInt(allEndpoints.size())); + } + + @Benchmark + public void bench() + { + for (InetAddressAndPort ep : queryEndpoints) + metadata.locator.location(ep); + } + + public static ClusterMetadata fakeMetadata(int nodeCount, int dcCount, int rackCount) throws UnknownHostException + { + ClusterMetadata metadata = new ClusterMetadata(Murmur3Partitioner.instance); + TokenSupplier tokensupplier = TokenSupplier.evenlyDistributedTokens(nodeCount); + PlacementProvider placementProvider = new UniformRangePlacement(); + for (int i = 1; i < nodeCount; i++) + { + ClusterMetadata.Transformer transformer = metadata.transformer(); + UUID uuid = UUID.randomUUID(); + NodeAddresses addresses = addresses(uuid, i); + metadata = transformer.register(addresses, new Location("dc" + random.nextInt(dcCount), "rack"+random.nextInt(rackCount)), NodeVersion.CURRENT).build().metadata; + NodeId nodeId = metadata.directory.peerId(addresses.broadcastAddress); + metadata = new UnsafeJoin(nodeId, Collections.singleton(new Murmur3Partitioner.LongToken(tokensupplier.token(i))), placementProvider).execute(metadata).success().metadata; + } + + return metadata; + } + + static NodeAddresses addresses(UUID uuid, int idx) throws UnknownHostException + { + byte [] address = new byte [] {127, 0, + (byte) (((idx + 1) & 0x0000ff00) >> 8), + (byte) ((idx + 1) & 0x000000ff)}; + + InetAddressAndPort host = InetAddressAndPort.getByAddress(address); + return new NodeAddresses(uuid, host, host, host); + } + + public static void main(String[] args) throws RunnerException, UnknownHostException { + Options options = new OptionsBuilder() + .include(LocatorBench.class.getSimpleName()) + .build(); + new Runner(options).run(); + } +/* +$ ant microbench -Dbenchmark.name=LocatorBench + +Approaches tried - switching out the peers + locations data structures in Directory: + Original BTreeMap: + [java] Iteration 1: 4,406 ms/op + [java] Iteration 2: 4,463 ms/op + [java] Iteration 3: 4,527 ms/op + [java] Iteration 4: 4,527 ms/op + [java] Iteration 5: 4,491 ms/op + ImmutableMap: + [java] Iteration 1: 1,822 ms/op + [java] Iteration 2: 1,820 ms/op + [java] Iteration 3: 1,822 ms/op + [java] Iteration 4: 1,800 ms/op + [java] Iteration 5: 1,793 ms/op + ImmutableMap with Long.hashCode and id == nodeId.id instead of Objects.equals; + [java] Iteration 1: 1,389 ms/op + [java] Iteration 2: 1,370 ms/op + [java] Iteration 3: 1,381 ms/op + [java] Iteration 4: 1,405 ms/op + [java] Iteration 5: 1,408 ms/op + HashMap: + [java] Iteration 1: 0,938 ms/op + [java] Iteration 2: 0,936 ms/op + [java] Iteration 3: 0,916 ms/op + [java] Iteration 4: 0,931 ms/op + [java] Iteration 5: 0,941 ms/op + Caching locations: + [java] Iteration 1: 0,189 ms/op + [java] Iteration 2: 0,188 ms/op + [java] Iteration 3: 0,189 ms/op + [java] Iteration 4: 0,192 ms/op + [java] Iteration 5: 0,191 ms/op + */ + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
