This is an automated email from the ASF dual-hosted git repository. dcapwell pushed a commit to branch cassandra-4.1 in repository https://gitbox.apache.org/repos/asf/cassandra.git
commit 005d3c28e554de3b48d0acad1d2dbd5c3a6c86c9 Merge: cf528bc99c c736d22cf8 Author: David Capwell <dcapw...@apache.org> AuthorDate: Thu May 22 14:12:42 2025 -0700 Merge branch 'cassandra-4.0' into cassandra-4.1 CHANGES.txt | 1 + .../org/apache/cassandra/gms/EndpointState.java | 104 ++++++++++++++++----- src/java/org/apache/cassandra/gms/Gossiper.java | 30 +++--- .../org/apache/cassandra/gms/HeartBeatState.java | 20 ++-- .../distributed/impl/UnsafeGossipHelper.java | 2 +- .../org/apache/cassandra/gms/GossiperTest.java | 2 +- .../apache/cassandra/gms/SerializationsTest.java | 2 +- 7 files changed, 105 insertions(+), 56 deletions(-) diff --cc CHANGES.txt index 491ed9b958,676522b9bf..0f381b8125 --- a/CHANGES.txt +++ b/CHANGES.txt @@@ -1,11 -1,5 +1,12 @@@ -4.0.18 +4.1.9 + * Grant permission on keyspaces system_views and system_virtual_schema not possible (CASSANDRA-20171) + * Fix mixed mode paxos ttl commit hang (CASSANDRA-20514) + * Fix paxos mixed mode infinite loop (CASSANDRA-20493) + * Optionally skip exception logging on invalid legacy protocol magic exception (CASSANDRA-19483) + * Fix SimpleClient ability to release acquired capacity (CASSANDRA-20202) + * Fix WaitQueue.Signal.awaitUninterruptibly may block forever if invoking thread is interrupted (CASSANDRA-20084) +Merged from 4.0: + * Gossip doesn't converge due to race condition when updating EndpointStates multiple fields (CASSANDRA-20659) * Handle sstable metadata stats file getting a new mtime after compaction has finished (CASSANDRA-18119) * Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532) * Updating a column with a new TTL but same expiration time is non-deterministic and causes repair mismatches. (CASSANDRA-20561) diff --cc src/java/org/apache/cassandra/gms/EndpointState.java index 198946fa51,6cec0cef34..17bed7f4d2 --- a/src/java/org/apache/cassandra/gms/EndpointState.java +++ b/src/java/org/apache/cassandra/gms/EndpointState.java @@@ -47,10 -45,20 +48,21 @@@ public class EndpointStat protected static final Logger logger = LoggerFactory.getLogger(EndpointState.class); public final static IVersionedSerializer<EndpointState> serializer = new EndpointStateSerializer(); + public final static IVersionedSerializer<EndpointState> nullableSerializer = NullableSerializer.wrap(serializer); - private volatile HeartBeatState hbState; - private final AtomicReference<Map<ApplicationState, VersionedValue>> applicationState; + private static class View + { + final HeartBeatState hbState; + final Map<ApplicationState, VersionedValue> applicationState; + + private View(HeartBeatState hbState, Map<ApplicationState, VersionedValue> applicationState) + { + this.hbState = hbState; + this.applicationState = applicationState; + } + } + + private final AtomicReference<View> ref; /* fields below do not get serialized */ private volatile long updateTimestamp; @@@ -63,14 -71,16 +75,16 @@@ public EndpointState(EndpointState other) { - this(new HeartBeatState(other.hbState), new EnumMap<>(other.applicationState.get())); + ref = new AtomicReference<>(other.ref.get()); - updateTimestamp = System.nanoTime(); ++ updateTimestamp = nanoTime(); + isAlive = true; } - EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue> states) + @VisibleForTesting + public EndpointState(HeartBeatState initialHbState, Map<ApplicationState, VersionedValue> states) { - hbState = initialHbState; - applicationState = new AtomicReference<Map<ApplicationState, VersionedValue>>(new EnumMap<>(states)); + ref = new AtomicReference<>(new View(initialHbState, new EnumMap<>(states))); - updateTimestamp = System.nanoTime(); + updateTimestamp = nanoTime(); isAlive = true; } @@@ -258,22 -311,9 +315,23 @@@ public String toString() { - return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap = " + applicationState.get(); + View view = ref.get(); + return "EndpointState: HeartBeatState = " + view.hbState + ", AppStateMap = " + view.applicationState; } + + public boolean isSupersededBy(EndpointState that) + { + int thisGeneration = this.getHeartBeatState().getGeneration(); + int thatGeneration = that.getHeartBeatState().getGeneration(); + + if (thatGeneration > thisGeneration) + return true; + + if (thisGeneration > thatGeneration) + return false; + + return Gossiper.getMaxEndpointStateVersion(that) > Gossiper.getMaxEndpointStateVersion(this); + } } class EndpointStateSerializer implements IVersionedSerializer<EndpointState> diff --cc src/java/org/apache/cassandra/gms/Gossiper.java index 0175df0bbc,12c532b162..0dc94767a4 --- a/src/java/org/apache/cassandra/gms/Gossiper.java +++ b/src/java/org/apache/cassandra/gms/Gossiper.java @@@ -328,7 -310,7 +328,7 @@@ public class Gossiper implements IFailu taskLock.lock(); /* Update the local heartbeat counter. */ - endpointStateMap.get(getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat(); - endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).updateHeartBeat(); ++ endpointStateMap.get(getBroadcastAddressAndPort()).updateHeartBeat(); if (logger.isTraceEnabled()) logger.trace("My heartbeat is now {}", endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion()); final List<GossipDigest> gDigests = new ArrayList<>(); @@@ -2061,8 -1976,8 +2055,8 @@@ public void forceNewerGeneration() { - EndpointState epstate = endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()); + EndpointState epstate = endpointStateMap.get(getBroadcastAddressAndPort()); - epstate.getHeartBeatState().forceNewerGenerationUnsafe(); + epstate.forceNewerGenerationUnsafe(); } @@@ -2522,101 -2425,4 +2516,101 @@@ return minVersion; } + + public void unsafeSetEnabled() + { + scheduledGossipTask = new NotScheduledFuture<>(); + firstSynSendAt = 1; + } + + public Collection<InetAddressAndPort> unsafeClearRemoteState() + { + List<InetAddressAndPort> removed = new ArrayList<>(); + for (InetAddressAndPort ep : endpointStateMap.keySet()) + { + if (ep.equals(getBroadcastAddressAndPort())) + continue; + + for (IEndpointStateChangeSubscriber subscriber : subscribers) + subscriber.onRemove(ep); + + removed.add(ep); + } + this.endpointStateMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort())); + this.endpointShadowStateMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort())); + this.expireTimeEndpointMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort())); + this.justRemovedEndpoints.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort())); + this.unreachableEndpoints.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort())); + return removed; + } + + public void unsafeGossipWith(InetAddressAndPort ep) + { + /* Update the local heartbeat counter. */ + EndpointState epState = endpointStateMap.get(getBroadcastAddressAndPort()); + if (epState != null) + { - epState.getHeartBeatState().updateHeartBeat(); ++ epState.updateHeartBeat(); + if (logger.isTraceEnabled()) + logger.trace("My heartbeat is now {}", epState.getHeartBeatState().getHeartBeatVersion()); + } + + final List<GossipDigest> gDigests = new ArrayList<GossipDigest>(); + Gossiper.instance.makeGossipDigest(gDigests); + + GossipDigestSyn digestSynMessage = new GossipDigestSyn(getClusterName(), + getPartitionerName(), + gDigests); + Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, digestSynMessage); + + MessagingService.instance().send(message, ep); + } + + public void unsafeSendShutdown(InetAddressAndPort to) + { + Message<?> message = Message.out(Verb.GOSSIP_SHUTDOWN, noPayload); + MessagingService.instance().send(message, to); + } + + public void unsafeSendLocalEndpointStateTo(InetAddressAndPort ep) + { + /* Update the local heartbeat counter. */ + EndpointState epState = endpointStateMap.get(getBroadcastAddressAndPort()); + if (epState == null) + throw new IllegalStateException(); + + GossipDigestAck2 digestAck2Message = new GossipDigestAck2(Collections.singletonMap(getBroadcastAddressAndPort(), epState)); + Message<GossipDigestAck2> message = Message.out(Verb.GOSSIP_DIGEST_ACK2, digestAck2Message); + MessagingService.instance().send(message, ep); + } + + public Map<String,List<String>> compareGossipAndTokenMetadata() + { + // local epstate will be part of endpointStateMap + Map<String,List<String>> mismatches = new HashMap<>(); + for (InetAddressAndPort endpoint : endpointStateMap.keySet()) + { + EndpointState ep = endpointStateMap.get(endpoint); + // check the status only for NORMAL nodes + if (ep.isNormalState()) + { + List<Token> tokensFromMetadata; + try + { + tokensFromMetadata = new ArrayList<>(StorageService.instance.getTokenMetadata().getTokens(endpoint)); + Collections.sort(tokensFromMetadata); + } + catch(AssertionError e) + { + tokensFromMetadata = Collections.EMPTY_LIST; + } + List<Token> tokensFromGossip = new ArrayList<>(StorageService.instance.getTokensFor(endpoint)); + Collections.sort(tokensFromGossip); + + if (!tokensFromMetadata.equals(tokensFromGossip)) + mismatches.put(endpoint.toString(), ImmutableList.of(tokensFromGossip.toString(), tokensFromMetadata.toString())); + } + } + return mismatches; + } } diff --cc test/distributed/org/apache/cassandra/distributed/impl/UnsafeGossipHelper.java index 7cf4c167f7,0000000000..8b5c01b02b mode 100644,000000..100644 --- a/test/distributed/org/apache/cassandra/distributed/impl/UnsafeGossipHelper.java +++ b/test/distributed/org/apache/cassandra/distributed/impl/UnsafeGossipHelper.java @@@ -1,277 -1,0 +1,277 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.impl; + +import java.io.Serializable; +import java.net.InetSocketAddress; +import java.util.Collection; +import java.util.UUID; + +import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.dht.IPartitioner; +import org.apache.cassandra.dht.Token; +import org.apache.cassandra.distributed.api.IInstance; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.distributed.api.IIsolatedExecutor; +import org.apache.cassandra.gms.ApplicationState; +import org.apache.cassandra.gms.EndpointState; +import org.apache.cassandra.gms.Gossiper; +import org.apache.cassandra.gms.VersionedValue; +import org.apache.cassandra.locator.InetAddressAndPort; +import org.apache.cassandra.net.MessagingService; +import org.apache.cassandra.service.PendingRangeCalculatorService; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.utils.FBUtilities; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static java.util.Collections.singleton; +import static org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort; +import static org.apache.cassandra.locator.InetAddressAndPort.getByAddress; + +public class UnsafeGossipHelper +{ + public static class HostInfo implements Serializable + { + final InetSocketAddress address; + final UUID hostId; + final String tokenString; + final int messagingVersion; + final boolean isShutdown; + + private HostInfo(InetSocketAddress address, UUID hostId, String tokenString, int messagingVersion, boolean isShutdown) + { + this.address = address; + this.hostId = hostId; + this.tokenString = tokenString; + this.messagingVersion = messagingVersion; + this.isShutdown = isShutdown; + } + + private HostInfo(IInstance instance) + { + this(instance, instance.config().hostId(), instance.config().getString("initial_token")); + } + + private HostInfo(IInstance instance, UUID hostId, String tokenString) + { + this(instance.broadcastAddress(), hostId, tokenString, instance.getMessagingVersion(), instance.isShutdown()); + } + } + + public static IIsolatedExecutor.SerializableRunnable addToRingRunner(IIsolatedExecutor.SerializableBiFunction<VersionedValue.VersionedValueFactory, Collection<Token>, VersionedValue> statusFactory, InetSocketAddress address, UUID hostId, String tokenString, int messagingVersion, boolean isShutdown) + { + return () -> { + try + { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + InetAddressAndPort addressAndPort = getByAddress(address); + Token token; + if (FBUtilities.getBroadcastAddressAndPort().equals(addressAndPort)) + { + // try grabbing saved tokens so that - if we're leaving - we get the ones we may have adopted as part of a range movement + // if that fails, grab them from config (as we're probably joining and should just use the default token) + Token.TokenFactory tokenFactory = DatabaseDescriptor.getPartitioner().getTokenFactory(); + if (tokenString == null) + { + Token tmp; + try + { + tmp = getOnlyElement(SystemKeyspace.getSavedTokens()); + } + catch (Throwable t) + { + tmp = tokenFactory.fromString(getOnlyElement(DatabaseDescriptor.getInitialTokens())); + } + token = tmp; + } + else + { + token = tokenFactory.fromString(tokenString); + } + + SystemKeyspace.setLocalHostId(hostId); + SystemKeyspace.updateTokens(singleton(token)); + } + else + { + if (tokenString == null) + throw new IllegalArgumentException(); + + token = DatabaseDescriptor.getPartitioner().getTokenFactory().fromString(tokenString); + } + + Gossiper.runInGossipStageBlocking(() -> { + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(addressAndPort); + if (state == null) + { + Gossiper.instance.initializeNodeUnsafe(addressAndPort, hostId, 1); + state = Gossiper.instance.getEndpointStateForEndpoint(addressAndPort); + Gossiper.instance.realMarkAlive(addressAndPort, state); + } + + state.addApplicationState(ApplicationState.TOKENS, new VersionedValue.VersionedValueFactory(partitioner).tokens(singleton(token))); + VersionedValue status = statusFactory.apply(new VersionedValue.VersionedValueFactory(partitioner), singleton(token)); + state.addApplicationState(ApplicationState.STATUS_WITH_PORT, status); + StorageService.instance.onChange(addressAndPort, ApplicationState.STATUS_WITH_PORT, status); + }); + + int setMessagingVersion = isShutdown + ? MessagingService.current_version + : Math.min(MessagingService.current_version, messagingVersion); + MessagingService.instance().versions.set(addressAndPort, setMessagingVersion); + + PendingRangeCalculatorService.instance.blockUntilFinished(); + } + catch (Throwable e) // UnknownHostException + { + throw new RuntimeException(e); + } + }; + } + + public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(IInstance peer) + { + return addToRingNormalRunner(new HostInfo(peer)); + } + + public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(IInstance peer, UUID hostId, String tokenString) + { + return addToRingNormalRunner(new HostInfo(peer, hostId, tokenString)); + } + + public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(HostInfo info) + { + return addToRingNormalRunner(info.address, info.hostId, info.tokenString, info.messagingVersion, info.isShutdown); + } + + public static IIsolatedExecutor.SerializableRunnable addToRingNormalRunner(InetSocketAddress address, UUID hostId, String tokenString, int messagingVersion, boolean isShutdown) + { + return addToRingRunner(VersionedValue.VersionedValueFactory::normal, address, hostId, tokenString, messagingVersion, isShutdown); + } + + public static IIsolatedExecutor.SerializableRunnable addToRingRunner(IIsolatedExecutor.SerializableBiFunction<VersionedValue.VersionedValueFactory, Collection<Token>, VersionedValue> statusFactory, HostInfo info) + { + return addToRingRunner(statusFactory, info.address, info.hostId, info.tokenString, info.messagingVersion, info.isShutdown); + } + + // reset gossip state so we know of the node being alive only + public static IIsolatedExecutor.SerializableRunnable removeFromRingRunner(IInstance instance) + { + return removeFromRingRunner(new HostInfo(instance)); + } + + // reset gossip state so we know of the node being alive only + public static IIsolatedExecutor.SerializableRunnable removeFromRingRunner(HostInfo info) + { + return removeFromRingRunner(info.address, info.hostId, info.tokenString); + } + + public static IIsolatedExecutor.SerializableRunnable removeFromRingRunner(InetSocketAddress address, UUID hostId, String tokenString) + { + return () -> { + + try + { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + Token token = partitioner.getTokenFactory().fromString(tokenString); + InetAddressAndPort addressAndPort = toCassandraInetAddressAndPort(address); + + Gossiper.runInGossipStageBlocking(() -> { + StorageService.instance.onChange(addressAndPort, + ApplicationState.STATUS, + new VersionedValue.VersionedValueFactory(partitioner).left(singleton(token), 0L)); + Gossiper.instance.unsafeAnnulEndpoint(addressAndPort); + Gossiper.instance.initializeNodeUnsafe(addressAndPort, hostId, 1); + Gossiper.instance.realMarkAlive(addressAndPort, Gossiper.instance.getEndpointStateForEndpoint(addressAndPort)); + }); + PendingRangeCalculatorService.instance.blockUntilFinished(); + } + catch (Throwable e) // UnknownHostException + { + throw new RuntimeException(e); + } + }; + } + + public static IIsolatedExecutor.SerializableRunnable addToRingBootstrappingRunner(IInstance peer) + { + return addToRingRunner(VersionedValue.VersionedValueFactory::bootstrapping, new HostInfo(peer)); + } + + public static IIsolatedExecutor.SerializableRunnable addToRingBootstrapReplacingRunner(IInstance peer, IInvokableInstance replacing, UUID hostId, String tokenString) + { + return addToRingBootstrapReplacingRunner(peer, replacing.broadcastAddress(), hostId, tokenString); + } + + public static IIsolatedExecutor.SerializableRunnable addToRingBootstrapReplacingRunner(IInstance peer, InetSocketAddress replacingAddress, UUID hostId, String tokenString) + { + return addToRingRunner((factory, ignore) -> factory.bootReplacingWithPort(getByAddress(replacingAddress)), new HostInfo(peer, hostId, tokenString)); + } + + public static IIsolatedExecutor.SerializableRunnable addToRingNormalReplacedRunner(IInstance peer, IInstance replaced) + { + return addToRingNormalReplacedRunner(peer, replaced.broadcastAddress()); + } + + public static IIsolatedExecutor.SerializableRunnable addToRingNormalReplacedRunner(IInstance peer, InetSocketAddress replacedAddress) + { + return addToRingRunner((factory, ignore) -> factory.bootReplacingWithPort(getByAddress(replacedAddress)), new HostInfo(peer, null, null)); + } + + public static IIsolatedExecutor.SerializableRunnable addToRingLeavingRunner(IInstance peer) + { + return addToRingRunner(VersionedValue.VersionedValueFactory::leaving, new HostInfo(peer, null, null)); + } + + public static IIsolatedExecutor.SerializableRunnable addToRingLeftRunner(IInstance peer) + { + return addToRingRunner((factory, tokens) -> factory.left(tokens, Long.MAX_VALUE), new HostInfo(peer, null, null)); + } + + public static void removeFromRing(IInstance peer) + { + removeFromRingRunner(peer).run(); + } + + public static void addToRingNormal(IInstance peer) + { + addToRingNormalRunner(peer).run(); + assert StorageService.instance.getTokenMetadata().isMember(toCassandraInetAddressAndPort(peer.broadcastAddress())); + } + + public static void addToRingBootstrapping(IInstance peer) + { + addToRingBootstrappingRunner(peer).run(); + } + + public static IIsolatedExecutor.SerializableRunnable markShutdownRunner(InetSocketAddress address) + { + return () -> { + IPartitioner partitioner = DatabaseDescriptor.getPartitioner(); + Gossiper.runInGossipStageBlocking(() -> { + EndpointState state = Gossiper.instance.getEndpointStateForEndpoint(getByAddress(address)); + VersionedValue status = new VersionedValue.VersionedValueFactory(partitioner).shutdown(true); + state.addApplicationState(ApplicationState.STATUS, status); - state.getHeartBeatState().forceHighestPossibleVersionUnsafe(); ++ state.forceHighestPossibleVersionUnsafe(); + StorageService.instance.onChange(getByAddress(address), ApplicationState.STATUS, status); + }); + }; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org For additional commands, e-mail: commits-h...@cassandra.apache.org