This is an automated email from the ASF dual-hosted git repository.
hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new 43b567c sync cluster info to nodes concurrently (#320)
43b567c is described below
commit 43b567cdc24782762227953e51cb52bc941ed8bf
Author: Raphael <[email protected]>
AuthorDate: Sun Jun 29 20:55:55 2025 +0800
sync cluster info to nodes concurrently (#320)
Co-authored-by: hulk <[email protected]>
---
controller/cluster.go | 19 +++++++++++++++----
store/store.go | 3 +++
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git a/controller/cluster.go b/controller/cluster.go
index 2cb08b5..15c9184 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -182,12 +182,23 @@ func (c *ClusterChecker) syncClusterToNodes(ctx
context.Context) error {
if err != nil {
return err
}
+ version := clusterInfo.Version.Load()
for _, shard := range clusterInfo.Shards {
for _, node := range shard.Nodes {
- // sync the clusterName to the latest version
- if err := node.SyncClusterInfo(ctx, clusterInfo); err
!= nil {
- return err
- }
+ go func(n store.Node) {
+ log := logger.Get().With(
+ zap.String("namespace", c.namespace),
+ zap.String("cluster", c.clusterName),
+ zap.Int64("version", version),
+ zap.String("node_id", n.ID()),
+ zap.String("addr", n.Addr()))
+ // sync the clusterName to the latest version
+ if err := n.SyncClusterInfo(ctx, clusterInfo);
err != nil {
+ log.Error("Failed to sync the cluster
topology to the node", zap.Error(err))
+ } else {
+ log.Info("Succeed to sync the cluster
topology to the node")
+ }
+ }(node)
}
}
return nil
diff --git a/store/store.go b/store/store.go
index bb0edfc..fc319fc 100644
--- a/store/store.go
+++ b/store/store.go
@@ -24,6 +24,8 @@ import (
"context"
"encoding/json"
"fmt"
+ "github.com/apache/kvrocks-controller/logger"
+ "go.uber.org/zap"
"sync"
"github.com/apache/kvrocks-controller/consts"
@@ -192,6 +194,7 @@ func (s *ClusterStore) UpdateCluster(ctx context.Context,
ns string, clusterInfo
if err := s.e.Set(ctx, buildClusterKey(ns, clusterInfo.Name),
clusterBytes); err != nil {
return err
}
+ logger.Get().With(zap.String("cluster_info",
string(clusterBytes))).Info("Updated the cluster version")
s.EmitEvent(EventPayload{
Namespace: ns,