beobal commented on code in PR #4688:
URL: https://github.com/apache/cassandra/pull/4688#discussion_r3111294190


##########
src/java/org/apache/cassandra/tcm/listeners/LegacyStateListener.java:
##########
@@ -75,52 +77,96 @@ public void notifyPostCommit(ClusterMetadata prev, 
ClusterMetadata next, boolean
                 changed.add(node);
         }
 
-        for (InetAddressAndPort remove : removedAddr)
+        // next.myNodeId() can be null during replay (before we have 
registered) but if it is present and
+        // there is a relevant change to the state of the local node, process 
that synchronously.
+        if (next.myNodeId() != null && changed.contains(next.myNodeId()))
         {
-            GossipHelper.removeFromGossip(remove);
-            GossipHelper.evictFromMembership(remove);
-            PeersTable.removeFromSystemPeersTables(remove);
+            // Default is to process updates for the local node synchronously, 
overridable via config/hotprop
+            if (DatabaseDescriptor.getLegacyStateListenerSyncLocalUpdates())
+                processChangesToLocalState(prev, next, next.myNodeId());
+            else
+                ScheduledExecutors.optionalTasks.submit(() -> 
processChangesToLocalState(prev, next, next.myNodeId()));
+
+            changed.remove(next.myNodeId());
         }
 
-        for (NodeId change : changed)
+        // Schedule async processing of changes to peers and removing 
unregistered nodes (potentially including the
+        // local node).
+        ScheduledExecutors.optionalTasks.submit(() -> {
+            processRemovedNodes(removedAddr);
+            processChangesToRemotePeers(prev, next, changed);
+        });
+    }
+
+    private void processChangesToLocalState(ClusterMetadata prev, 
ClusterMetadata next, NodeId localId)
+    {
+        logger.info("Processing changes to local node state {} for epoch 
{}->{}", localId, prev.epoch.getEpoch(), next.epoch.getEpoch());
+        Collection<Token> tokensForGossip = next.tokenMap.tokens(localId);
+        NodeState state = next.directory.peerState(localId);
+        switch (state)
         {
-            // next.myNodeId() can be null during replay (before we have 
registered)
-            if (next.myNodeId() != null && next.myNodeId().equals(change))
-            {
-                switch (next.directory.peerState(change))
+            case BOOTSTRAPPING:
+            case BOOT_REPLACING:
+                // For compatibility with clients, ensure we set TOKENS for 
bootstrapping nodes in gossip.
+                // As these are not yet added to the token map they must be 
extracted from the in progress sequence.
+                tokensForGossip = GossipHelper.getTokensFromOperation(localId, 
next);
+                if (state == BOOTSTRAPPING && 
prev.directory.peerState(localId) != BOOTSTRAPPING)
                 {
-                    case BOOTSTRAPPING:
-                        if (prev.directory.peerState(change) != BOOTSTRAPPING)
-                        {
-                            // legacy log messages for tests
-                            logger.info("JOINING: Starting to bootstrap");
-                            logger.info("JOINING: calculation complete, ready 
to bootstrap");
-                        }
-                        break;
-                    case BOOT_REPLACING:
-                    case REGISTERED:
-                        break;
-                    case JOINED:
-                        
SystemKeyspace.updateTokens(next.directory.endpoint(change), 
next.tokenMap.tokens(change));
-                        // needed if we miss the REGISTERED above; Does 
nothing if we are already in epStateMap:
-                        
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
-                        
StreamSupport.stream(ColumnFamilyStore.all().spliterator(), false)
-                                     .filter(cfs -> 
Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName()))
-                                     .forEach(cfs -> 
cfs.indexManager.executePreJoinTasksBlocking(true));
-                        if (prev.directory.peerState(change) == MOVING)
-                            logger.info("Node {} state jump to NORMAL", 
next.directory.endpoint(change));
-                        break;
+                    // legacy log messages for tests
+                    logger.info("JOINING: Starting to bootstrap");
+                    logger.info("JOINING: calculation complete, ready to 
bootstrap");
                 }
-                // Maybe intitialise local epstate whatever the node state 
because we could be processing after a
-                // replay and so may have not seen any previous local states, 
making this the first mutation of gossip
-                // state for the local node.
-                
Gossiper.instance.maybeInitializeLocalState(SystemKeyspace.incrementAndGetGeneration());
-                Gossiper.instance.addLocalApplicationState(SCHEMA, 
StorageService.instance.valueFactory.schema(next.schema.getVersion()));
-                // if the local node's location has changed, update 
system.local.
-                if 
(!next.directory.location(change).equals(prev.directory.location(change)))
-                    
SystemKeyspace.updateLocation(next.directory.location(change));
-            }
+                break;
+            case JOINED:
+                tokensForGossip = next.tokenMap.tokens(localId);
+                SystemKeyspace.updateTokens(next.directory.endpoint(localId), 
tokensForGossip);
+                StreamSupport.stream(ColumnFamilyStore.all().spliterator(), 
false)
+                             .filter(cfs -> 
Schema.instance.getUserKeyspaces().names().contains(cfs.keyspace.getName()))

Review Comment:
   It does, but that also offloads to a separate async executor. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to