This is an automated email from the ASF dual-hosted git repository.

dcapwell pushed a commit to branch cassandra-4.1
in repository https://gitbox.apache.org/repos/asf/cassandra.git

commit 005d3c28e554de3b48d0acad1d2dbd5c3a6c86c9
Merge: cf528bc99c c736d22cf8
Author: David Capwell <dcapw...@apache.org>
AuthorDate: Thu May 22 14:12:42 2025 -0700

    Merge branch 'cassandra-4.0' into cassandra-4.1

 CHANGES.txt                                        |   1 +
 .../org/apache/cassandra/gms/EndpointState.java    | 104 ++++++++++++++++-----
 src/java/org/apache/cassandra/gms/Gossiper.java    |  30 +++---
 .../org/apache/cassandra/gms/HeartBeatState.java   |  20 ++--
 .../distributed/impl/UnsafeGossipHelper.java       |   2 +-
 .../org/apache/cassandra/gms/GossiperTest.java     |   2 +-
 .../apache/cassandra/gms/SerializationsTest.java   |   2 +-
 7 files changed, 105 insertions(+), 56 deletions(-)

diff --cc CHANGES.txt
index 491ed9b958,676522b9bf..0f381b8125
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@@ -1,11 -1,5 +1,12 @@@
 -4.0.18
 +4.1.9
 + * Grant permission on keyspaces system_views and system_virtual_schema not 
possible (CASSANDRA-20171)
 + * Fix mixed mode paxos ttl commit hang (CASSANDRA-20514)
 + * Fix paxos mixed mode infinite loop (CASSANDRA-20493)
 + * Optionally skip exception logging on invalid legacy protocol magic 
exception (CASSANDRA-19483)
 + * Fix SimpleClient ability to release acquired capacity (CASSANDRA-20202)
 + * Fix WaitQueue.Signal.awaitUninterruptibly may block forever if invoking 
thread is interrupted (CASSANDRA-20084)
 +Merged from 4.0:
+  * Gossip doesn't converge due to race condition when updating EndpointStates 
multiple fields (CASSANDRA-20659)
   * Handle sstable metadata stats file getting a new mtime after compaction 
has finished (CASSANDRA-18119)
   * Honor MAX_PARALLEL_TRANSFERS correctly (CASSANDRA-20532)
   * Updating a column with a new TTL but same expiration time is 
non-deterministic and causes repair mismatches. (CASSANDRA-20561)
diff --cc src/java/org/apache/cassandra/gms/EndpointState.java
index 198946fa51,6cec0cef34..17bed7f4d2
--- a/src/java/org/apache/cassandra/gms/EndpointState.java
+++ b/src/java/org/apache/cassandra/gms/EndpointState.java
@@@ -47,10 -45,20 +48,21 @@@ public class EndpointStat
      protected static final Logger logger = 
LoggerFactory.getLogger(EndpointState.class);
  
      public final static IVersionedSerializer<EndpointState> serializer = new 
EndpointStateSerializer();
 +    public final static IVersionedSerializer<EndpointState> 
nullableSerializer = NullableSerializer.wrap(serializer);
  
-     private volatile HeartBeatState hbState;
-     private final AtomicReference<Map<ApplicationState, VersionedValue>> 
applicationState;
+     private static class View
+     {
+         final HeartBeatState hbState;
+         final Map<ApplicationState, VersionedValue> applicationState;
+ 
+         private View(HeartBeatState hbState, Map<ApplicationState, 
VersionedValue> applicationState)
+         {
+             this.hbState = hbState;
+             this.applicationState = applicationState;
+         }
+     }
+ 
+     private final AtomicReference<View> ref;
  
      /* fields below do not get serialized */
      private volatile long updateTimestamp;
@@@ -63,14 -71,16 +75,16 @@@
  
      public EndpointState(EndpointState other)
      {
-         this(new HeartBeatState(other.hbState), new 
EnumMap<>(other.applicationState.get()));
+         ref = new AtomicReference<>(other.ref.get());
 -        updateTimestamp = System.nanoTime();
++        updateTimestamp = nanoTime();
+         isAlive = true;
      }
  
-     EndpointState(HeartBeatState initialHbState, Map<ApplicationState, 
VersionedValue> states)
+     @VisibleForTesting
+     public EndpointState(HeartBeatState initialHbState, Map<ApplicationState, 
VersionedValue> states)
      {
-         hbState = initialHbState;
-         applicationState = new AtomicReference<Map<ApplicationState, 
VersionedValue>>(new EnumMap<>(states));
+         ref = new AtomicReference<>(new View(initialHbState, new 
EnumMap<>(states)));
 -        updateTimestamp = System.nanoTime();
 +        updateTimestamp = nanoTime();
          isAlive = true;
      }
  
@@@ -258,22 -311,9 +315,23 @@@
  
      public String toString()
      {
-         return "EndpointState: HeartBeatState = " + hbState + ", AppStateMap 
= " + applicationState.get();
+         View view = ref.get();
+         return "EndpointState: HeartBeatState = " + view.hbState + ", 
AppStateMap = " + view.applicationState;
      }
 +
 +    public boolean isSupersededBy(EndpointState that)
 +    {
 +        int thisGeneration = this.getHeartBeatState().getGeneration();
 +        int thatGeneration = that.getHeartBeatState().getGeneration();
 +
 +        if (thatGeneration > thisGeneration)
 +            return true;
 +
 +        if (thisGeneration > thatGeneration)
 +            return false;
 +
 +        return Gossiper.getMaxEndpointStateVersion(that) > 
Gossiper.getMaxEndpointStateVersion(this);
 +    }
  }
  
  class EndpointStateSerializer implements IVersionedSerializer<EndpointState>
diff --cc src/java/org/apache/cassandra/gms/Gossiper.java
index 0175df0bbc,12c532b162..0dc94767a4
--- a/src/java/org/apache/cassandra/gms/Gossiper.java
+++ b/src/java/org/apache/cassandra/gms/Gossiper.java
@@@ -328,7 -310,7 +328,7 @@@ public class Gossiper implements IFailu
                  taskLock.lock();
  
                  /* Update the local heartbeat counter. */
-                 
endpointStateMap.get(getBroadcastAddressAndPort()).getHeartBeatState().updateHeartBeat();
 -                
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).updateHeartBeat();
++                
endpointStateMap.get(getBroadcastAddressAndPort()).updateHeartBeat();
                  if (logger.isTraceEnabled())
                      logger.trace("My heartbeat is now {}", 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort()).getHeartBeatState().getHeartBeatVersion());
                  final List<GossipDigest> gDigests = new ArrayList<>();
@@@ -2061,8 -1976,8 +2055,8 @@@
  
      public void forceNewerGeneration()
      {
 -        EndpointState epstate = 
endpointStateMap.get(FBUtilities.getBroadcastAddressAndPort());
 +        EndpointState epstate = 
endpointStateMap.get(getBroadcastAddressAndPort());
-         epstate.getHeartBeatState().forceNewerGenerationUnsafe();
+         epstate.forceNewerGenerationUnsafe();
      }
  
  
@@@ -2522,101 -2425,4 +2516,101 @@@
  
          return minVersion;
      }
 +
 +    public void unsafeSetEnabled()
 +    {
 +        scheduledGossipTask = new NotScheduledFuture<>();
 +        firstSynSendAt = 1;
 +    }
 +
 +    public Collection<InetAddressAndPort> unsafeClearRemoteState()
 +    {
 +        List<InetAddressAndPort> removed = new ArrayList<>();
 +        for (InetAddressAndPort ep : endpointStateMap.keySet())
 +        {
 +            if (ep.equals(getBroadcastAddressAndPort()))
 +                continue;
 +
 +            for (IEndpointStateChangeSubscriber subscriber : subscribers)
 +                subscriber.onRemove(ep);
 +
 +            removed.add(ep);
 +        }
 +        
this.endpointStateMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
 +        
this.endpointShadowStateMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
 +        
this.expireTimeEndpointMap.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
 +        
this.justRemovedEndpoints.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
 +        
this.unreachableEndpoints.keySet().retainAll(Collections.singleton(getBroadcastAddressAndPort()));
 +        return removed;
 +    }
 +
 +    public void unsafeGossipWith(InetAddressAndPort ep)
 +    {
 +        /* Update the local heartbeat counter. */
 +        EndpointState epState = 
endpointStateMap.get(getBroadcastAddressAndPort());
 +        if (epState != null)
 +        {
-             epState.getHeartBeatState().updateHeartBeat();
++            epState.updateHeartBeat();
 +            if (logger.isTraceEnabled())
 +                logger.trace("My heartbeat is now {}", 
epState.getHeartBeatState().getHeartBeatVersion());
 +        }
 +
 +        final List<GossipDigest> gDigests = new ArrayList<GossipDigest>();
 +        Gossiper.instance.makeGossipDigest(gDigests);
 +
 +        GossipDigestSyn digestSynMessage = new 
GossipDigestSyn(getClusterName(),
 +                getPartitionerName(),
 +                gDigests);
 +        Message<GossipDigestSyn> message = Message.out(GOSSIP_DIGEST_SYN, 
digestSynMessage);
 +
 +        MessagingService.instance().send(message, ep);
 +    }
 +
 +    public void unsafeSendShutdown(InetAddressAndPort to)
 +    {
 +        Message<?> message = Message.out(Verb.GOSSIP_SHUTDOWN, noPayload);
 +        MessagingService.instance().send(message, to);
 +    }
 +
 +    public void unsafeSendLocalEndpointStateTo(InetAddressAndPort ep)
 +    {
 +        /* Update the local heartbeat counter. */
 +        EndpointState epState = 
endpointStateMap.get(getBroadcastAddressAndPort());
 +        if (epState == null)
 +            throw new IllegalStateException();
 +
 +        GossipDigestAck2 digestAck2Message = new 
GossipDigestAck2(Collections.singletonMap(getBroadcastAddressAndPort(), 
epState));
 +        Message<GossipDigestAck2> message = 
Message.out(Verb.GOSSIP_DIGEST_ACK2, digestAck2Message);
 +        MessagingService.instance().send(message, ep);
 +    }
 +
 +    public Map<String,List<String>> compareGossipAndTokenMetadata()
 +    {
 +        // local epstate will be part of endpointStateMap
 +        Map<String,List<String>> mismatches = new HashMap<>();
 +        for (InetAddressAndPort endpoint : endpointStateMap.keySet())
 +        {
 +            EndpointState ep = endpointStateMap.get(endpoint);
 +            // check the status only for NORMAL nodes
 +            if (ep.isNormalState())
 +            {
 +                List<Token> tokensFromMetadata;
 +                try
 +                {
 +                    tokensFromMetadata = new 
ArrayList<>(StorageService.instance.getTokenMetadata().getTokens(endpoint));
 +                    Collections.sort(tokensFromMetadata);
 +                }
 +                catch(AssertionError e)
 +                {
 +                    tokensFromMetadata = Collections.EMPTY_LIST;
 +                }
 +                List<Token> tokensFromGossip = new 
ArrayList<>(StorageService.instance.getTokensFor(endpoint));
 +                Collections.sort(tokensFromGossip);
 +
 +                if (!tokensFromMetadata.equals(tokensFromGossip))
 +                    mismatches.put(endpoint.toString(), 
ImmutableList.of(tokensFromGossip.toString(), tokensFromMetadata.toString()));
 +            }
 +        }
 +        return mismatches;
 +    }
  }
diff --cc 
test/distributed/org/apache/cassandra/distributed/impl/UnsafeGossipHelper.java
index 7cf4c167f7,0000000000..8b5c01b02b
mode 100644,000000..100644
--- 
a/test/distributed/org/apache/cassandra/distributed/impl/UnsafeGossipHelper.java
+++ 
b/test/distributed/org/apache/cassandra/distributed/impl/UnsafeGossipHelper.java
@@@ -1,277 -1,0 +1,277 @@@
 +/*
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.cassandra.distributed.impl;
 +
 +import java.io.Serializable;
 +import java.net.InetSocketAddress;
 +import java.util.Collection;
 +import java.util.UUID;
 +
 +import org.apache.cassandra.config.DatabaseDescriptor;
 +import org.apache.cassandra.db.SystemKeyspace;
 +import org.apache.cassandra.dht.IPartitioner;
 +import org.apache.cassandra.dht.Token;
 +import org.apache.cassandra.distributed.api.IInstance;
 +import org.apache.cassandra.distributed.api.IInvokableInstance;
 +import org.apache.cassandra.distributed.api.IIsolatedExecutor;
 +import org.apache.cassandra.gms.ApplicationState;
 +import org.apache.cassandra.gms.EndpointState;
 +import org.apache.cassandra.gms.Gossiper;
 +import org.apache.cassandra.gms.VersionedValue;
 +import org.apache.cassandra.locator.InetAddressAndPort;
 +import org.apache.cassandra.net.MessagingService;
 +import org.apache.cassandra.service.PendingRangeCalculatorService;
 +import org.apache.cassandra.service.StorageService;
 +import org.apache.cassandra.utils.FBUtilities;
 +
 +import static com.google.common.collect.Iterables.getOnlyElement;
 +import static java.util.Collections.singleton;
 +import static 
org.apache.cassandra.distributed.impl.DistributedTestSnitch.toCassandraInetAddressAndPort;
 +import static org.apache.cassandra.locator.InetAddressAndPort.getByAddress;
 +
 +public class UnsafeGossipHelper
 +{
 +    public static class HostInfo implements Serializable
 +    {
 +        final InetSocketAddress address;
 +        final UUID hostId;
 +        final String tokenString;
 +        final int messagingVersion;
 +        final boolean isShutdown;
 +
 +        private HostInfo(InetSocketAddress address, UUID hostId, String 
tokenString, int messagingVersion, boolean isShutdown)
 +        {
 +            this.address = address;
 +            this.hostId = hostId;
 +            this.tokenString = tokenString;
 +            this.messagingVersion = messagingVersion;
 +            this.isShutdown = isShutdown;
 +        }
 +
 +        private HostInfo(IInstance instance)
 +        {
 +            this(instance, instance.config().hostId(), 
instance.config().getString("initial_token"));
 +        }
 +
 +        private HostInfo(IInstance instance, UUID hostId, String tokenString)
 +        {
 +            this(instance.broadcastAddress(), hostId, tokenString, 
instance.getMessagingVersion(), instance.isShutdown());
 +        }
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingRunner(IIsolatedExecutor.SerializableBiFunction<VersionedValue.VersionedValueFactory,
 Collection<Token>, VersionedValue> statusFactory, InetSocketAddress address, 
UUID hostId, String tokenString, int messagingVersion, boolean isShutdown)
 +    {
 +        return () -> {
 +            try
 +            {
 +                IPartitioner partitioner = 
DatabaseDescriptor.getPartitioner();
 +                InetAddressAndPort addressAndPort = getByAddress(address);
 +                Token token;
 +                if 
(FBUtilities.getBroadcastAddressAndPort().equals(addressAndPort))
 +                {
 +                    // try grabbing saved tokens so that - if we're leaving - 
we get the ones we may have adopted as part of a range movement
 +                    // if that fails, grab them from config (as we're 
probably joining and should just use the default token)
 +                    Token.TokenFactory tokenFactory = 
DatabaseDescriptor.getPartitioner().getTokenFactory();
 +                    if (tokenString == null)
 +                    {
 +                        Token tmp;
 +                        try
 +                        {
 +                             tmp = 
getOnlyElement(SystemKeyspace.getSavedTokens());
 +                        }
 +                        catch (Throwable t)
 +                        {
 +                            tmp = 
tokenFactory.fromString(getOnlyElement(DatabaseDescriptor.getInitialTokens()));
 +                        }
 +                        token = tmp;
 +                    }
 +                    else
 +                    {
 +                        token = tokenFactory.fromString(tokenString);
 +                    }
 +
 +                    SystemKeyspace.setLocalHostId(hostId);
 +                    SystemKeyspace.updateTokens(singleton(token));
 +                }
 +                else
 +                {
 +                    if (tokenString == null)
 +                        throw new IllegalArgumentException();
 +
 +                    token = 
DatabaseDescriptor.getPartitioner().getTokenFactory().fromString(tokenString);
 +                }
 +
 +                Gossiper.runInGossipStageBlocking(() -> {
 +                    EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(addressAndPort);
 +                    if (state == null)
 +                    {
 +                        
Gossiper.instance.initializeNodeUnsafe(addressAndPort, hostId, 1);
 +                        state = 
Gossiper.instance.getEndpointStateForEndpoint(addressAndPort);
 +                        Gossiper.instance.realMarkAlive(addressAndPort, 
state);
 +                    }
 +
 +                    state.addApplicationState(ApplicationState.TOKENS, new 
VersionedValue.VersionedValueFactory(partitioner).tokens(singleton(token)));
 +                    VersionedValue status = statusFactory.apply(new 
VersionedValue.VersionedValueFactory(partitioner), singleton(token));
 +                    
state.addApplicationState(ApplicationState.STATUS_WITH_PORT, status);
 +                    StorageService.instance.onChange(addressAndPort, 
ApplicationState.STATUS_WITH_PORT, status);
 +                });
 +
 +                int setMessagingVersion = isShutdown
 +                                          ? MessagingService.current_version
 +                                          : 
Math.min(MessagingService.current_version, messagingVersion);
 +                MessagingService.instance().versions.set(addressAndPort, 
setMessagingVersion);
 +
 +                PendingRangeCalculatorService.instance.blockUntilFinished();
 +            }
 +            catch (Throwable e) // UnknownHostException
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        };
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingNormalRunner(IInstance peer)
 +    {
 +        return addToRingNormalRunner(new HostInfo(peer));
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingNormalRunner(IInstance peer, UUID hostId, String tokenString)
 +    {
 +        return addToRingNormalRunner(new HostInfo(peer, hostId, tokenString));
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingNormalRunner(HostInfo info)
 +    {
 +        return addToRingNormalRunner(info.address, info.hostId, 
info.tokenString, info.messagingVersion, info.isShutdown);
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingNormalRunner(InetSocketAddress address, UUID hostId, String 
tokenString, int messagingVersion, boolean isShutdown)
 +    {
 +        return addToRingRunner(VersionedValue.VersionedValueFactory::normal, 
address, hostId, tokenString, messagingVersion, isShutdown);
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingRunner(IIsolatedExecutor.SerializableBiFunction<VersionedValue.VersionedValueFactory,
 Collection<Token>, VersionedValue> statusFactory, HostInfo info)
 +    {
 +        return addToRingRunner(statusFactory, info.address, info.hostId, 
info.tokenString, info.messagingVersion, info.isShutdown);
 +    }
 +
 +    // reset gossip state so we know of the node being alive only
 +    public static IIsolatedExecutor.SerializableRunnable 
removeFromRingRunner(IInstance instance)
 +    {
 +        return removeFromRingRunner(new HostInfo(instance));
 +    }
 +
 +    // reset gossip state so we know of the node being alive only
 +    public static IIsolatedExecutor.SerializableRunnable 
removeFromRingRunner(HostInfo info)
 +    {
 +        return removeFromRingRunner(info.address, info.hostId, 
info.tokenString);
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
removeFromRingRunner(InetSocketAddress address, UUID hostId, String tokenString)
 +    {
 +        return () -> {
 +
 +            try
 +            {
 +                IPartitioner partitioner = 
DatabaseDescriptor.getPartitioner();
 +                Token token = 
partitioner.getTokenFactory().fromString(tokenString);
 +                InetAddressAndPort addressAndPort = 
toCassandraInetAddressAndPort(address);
 +
 +                Gossiper.runInGossipStageBlocking(() -> {
 +                    StorageService.instance.onChange(addressAndPort,
 +                                                     ApplicationState.STATUS,
 +                                                     new 
VersionedValue.VersionedValueFactory(partitioner).left(singleton(token), 0L));
 +                    Gossiper.instance.unsafeAnnulEndpoint(addressAndPort);
 +                    Gossiper.instance.initializeNodeUnsafe(addressAndPort, 
hostId, 1);
 +                    Gossiper.instance.realMarkAlive(addressAndPort, 
Gossiper.instance.getEndpointStateForEndpoint(addressAndPort));
 +                });
 +                PendingRangeCalculatorService.instance.blockUntilFinished();
 +            }
 +            catch (Throwable e) // UnknownHostException
 +            {
 +                throw new RuntimeException(e);
 +            }
 +        };
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingBootstrappingRunner(IInstance peer)
 +    {
 +        return 
addToRingRunner(VersionedValue.VersionedValueFactory::bootstrapping, new 
HostInfo(peer));
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingBootstrapReplacingRunner(IInstance peer, IInvokableInstance replacing, 
UUID hostId, String tokenString)
 +    {
 +        return addToRingBootstrapReplacingRunner(peer, 
replacing.broadcastAddress(), hostId, tokenString);
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingBootstrapReplacingRunner(IInstance peer, InetSocketAddress 
replacingAddress, UUID hostId, String tokenString)
 +    {
 +        return addToRingRunner((factory, ignore) -> 
factory.bootReplacingWithPort(getByAddress(replacingAddress)), new 
HostInfo(peer, hostId, tokenString));
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingNormalReplacedRunner(IInstance peer, IInstance replaced)
 +    {
 +        return addToRingNormalReplacedRunner(peer, 
replaced.broadcastAddress());
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingNormalReplacedRunner(IInstance peer, InetSocketAddress replacedAddress)
 +    {
 +        return addToRingRunner((factory, ignore) -> 
factory.bootReplacingWithPort(getByAddress(replacedAddress)), new 
HostInfo(peer, null, null));
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingLeavingRunner(IInstance peer)
 +    {
 +        return addToRingRunner(VersionedValue.VersionedValueFactory::leaving, 
new HostInfo(peer, null, null));
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
addToRingLeftRunner(IInstance peer)
 +    {
 +        return addToRingRunner((factory, tokens) -> factory.left(tokens, 
Long.MAX_VALUE), new HostInfo(peer, null, null));
 +    }
 +
 +    public static void removeFromRing(IInstance peer)
 +    {
 +        removeFromRingRunner(peer).run();
 +    }
 +
 +    public static void addToRingNormal(IInstance peer)
 +    {
 +        addToRingNormalRunner(peer).run();
 +        assert 
StorageService.instance.getTokenMetadata().isMember(toCassandraInetAddressAndPort(peer.broadcastAddress()));
 +    }
 +
 +    public static void addToRingBootstrapping(IInstance peer)
 +    {
 +        addToRingBootstrappingRunner(peer).run();
 +    }
 +
 +    public static IIsolatedExecutor.SerializableRunnable 
markShutdownRunner(InetSocketAddress address)
 +    {
 +        return () -> {
 +            IPartitioner partitioner = DatabaseDescriptor.getPartitioner();
 +            Gossiper.runInGossipStageBlocking(() -> {
 +                EndpointState state = 
Gossiper.instance.getEndpointStateForEndpoint(getByAddress(address));
 +                VersionedValue status = new 
VersionedValue.VersionedValueFactory(partitioner).shutdown(true);
 +                state.addApplicationState(ApplicationState.STATUS, status);
-                 state.getHeartBeatState().forceHighestPossibleVersionUnsafe();
++                state.forceHighestPossibleVersionUnsafe();
 +                StorageService.instance.onChange(getByAddress(address), 
ApplicationState.STATUS, status);
 +            });
 +        };
 +    }
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@cassandra.apache.org
For additional commands, e-mail: commits-h...@cassandra.apache.org

Reply via email to