This is an automated email from the ASF dual-hosted git repository.

hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 43b567c  sync cluster info to nodes concurrently (#320)
43b567c is described below

commit 43b567cdc24782762227953e51cb52bc941ed8bf
Author: Raphael <[email protected]>
AuthorDate: Sun Jun 29 20:55:55 2025 +0800

    sync cluster info to nodes concurrently (#320)
    
    Co-authored-by: hulk <[email protected]>
---
 controller/cluster.go | 19 +++++++++++++++----
 store/store.go        |  3 +++
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git a/controller/cluster.go b/controller/cluster.go
index 2cb08b5..15c9184 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -182,12 +182,23 @@ func (c *ClusterChecker) syncClusterToNodes(ctx 
context.Context) error {
        if err != nil {
                return err
        }
+       version := clusterInfo.Version.Load()
        for _, shard := range clusterInfo.Shards {
                for _, node := range shard.Nodes {
-                       // sync the clusterName to the latest version
-                       if err := node.SyncClusterInfo(ctx, clusterInfo); err 
!= nil {
-                               return err
-                       }
+                       go func(n store.Node) {
+                               log := logger.Get().With(
+                                       zap.String("namespace", c.namespace),
+                                       zap.String("cluster", c.clusterName),
+                                       zap.Int64("version", version),
+                                       zap.String("node_id", n.ID()),
+                                       zap.String("addr", n.Addr()))
+                               // sync the clusterName to the latest version
+                               if err := n.SyncClusterInfo(ctx, clusterInfo); 
err != nil {
+                                       log.Error("Failed to sync the cluster 
topology to the node", zap.Error(err))
+                               } else {
+                                       log.Info("Succeed to sync the cluster 
topology to the node")
+                               }
+                       }(node)
                }
        }
        return nil
diff --git a/store/store.go b/store/store.go
index bb0edfc..fc319fc 100644
--- a/store/store.go
+++ b/store/store.go
@@ -24,6 +24,8 @@ import (
        "context"
        "encoding/json"
        "fmt"
+       "github.com/apache/kvrocks-controller/logger"
+       "go.uber.org/zap"
        "sync"
 
        "github.com/apache/kvrocks-controller/consts"
@@ -192,6 +194,7 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context, 
ns string, clusterInfo
        if err := s.e.Set(ctx, buildClusterKey(ns, clusterInfo.Name), 
clusterBytes); err != nil {
                return err
        }
+       logger.Get().With(zap.String("cluster_info", 
string(clusterBytes))).Info("Updated the cluster version")
 
        s.EmitEvent(EventPayload{
                Namespace: ns,

Reply via email to