This is an automated email from the ASF dual-hosted git repository.

git-hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git


The following commit(s) were added to refs/heads/unstable by this push:
     new ae16132  Implement node status synchronization in Controller, 
including downtime notifications and online/offline management (#388)
ae16132 is described below

commit ae16132f082ab34a50ac02bddb73611751ac0112
Author: Ruifeng Guo <[email protected]>
AuthorDate: Fri Apr 24 16:42:22 2026 +0800

    Implement node status synchronization in Controller, including downtime 
notifications and online/offline management (#388)
---
 config/config.go            |  4 +++
 consts/errors.go            |  1 +
 controller/cluster.go       | 45 ++++++++++++++++++-----
 controller/cluster_test.go  | 78 ++++++++++++++++++++++++++++++++++++++--
 controller/controller.go    |  3 +-
 server/api/node.go          | 37 +++++++++++++++++++
 server/api/node_test.go     | 49 +++++++++++++++++++++++++
 server/helper/helper.go     |  2 ++
 server/route.go             |  1 +
 store/cluster.go            | 75 +++++++++++++++++++++++++++++++++++---
 store/cluster_node.go       | 60 ++++++++++++++++++++++++++++---
 store/cluster_shard.go      |  6 +++-
 store/cluster_shard_test.go | 36 +++++++++++++++++++
 store/cluster_test.go       | 87 +++++++++++++++++++++++++++++++++++++++++++++
 14 files changed, 462 insertions(+), 22 deletions(-)

diff --git a/config/config.go b/config/config.go
index 69558fb..d88862a 100644
--- a/config/config.go
+++ b/config/config.go
@@ -43,6 +43,10 @@ type AdminConfig struct {
 type FailOverConfig struct {
        PingIntervalSeconds int   `yaml:"ping_interval_seconds"`
        MaxPingCount        int64 `yaml:"max_ping_count"`
+       // EnableSlaveHAUpdate controls whether HA logic marks failed slave 
nodes and
+       // propagates the updated topology. Requires kvrocks to support node 
status
+       // modification (new versions only). Defaults to false for backward 
compatibility.
+       EnableSlaveHAUpdate bool `yaml:"enable_slave_ha_update"`
 }
 
 type ControllerConfig struct {
diff --git a/consts/errors.go b/consts/errors.go
index bf7e51d..4d3f1bc 100644
--- a/consts/errors.go
+++ b/consts/errors.go
@@ -38,5 +38,6 @@ var (
        ErrShardIsServicing                 = errors.New("shard is servicing")
        ErrShardSlotIsMigrating             = errors.New("shard slot is 
migrating")
        ErrShardNoMatchNewMaster            = errors.New("no match new master 
in shard")
+       ErrCannotOfflineMaster              = errors.New("cannot take master 
node offline, failover first")
        ErrSlotStartAndStopEqual            = errors.New("start and stop of a 
range cannot be equal")
 )
diff --git a/controller/cluster.go b/controller/cluster.go
index 8d4e70d..22242a9 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -38,8 +38,9 @@ var (
 )
 
 type ClusterCheckOptions struct {
-       pingInterval    time.Duration
-       maxFailureCount int64
+       pingInterval        time.Duration
+       maxFailureCount     int64
+       enableSlaveHAUpdate bool
 }
 
 type ClusterChecker struct {
@@ -104,6 +105,11 @@ func (c *ClusterChecker) WithMaxFailureCount(count int64) 
*ClusterChecker {
        return c
 }
 
+func (c *ClusterChecker) WithSlaveHAUpdate(enable bool) *ClusterChecker {
+       c.options.enableSlaveHAUpdate = enable
+       return c
+}
+
 func (c *ClusterChecker) probeNode(ctx context.Context, node store.Node) 
(int64, error) {
        clusterInfo, err := node.GetClusterInfo(ctx)
        if err != nil {
@@ -132,17 +138,37 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex 
int, node store.Node) i
        count := c.failureCounts[id]
        c.failureMu.Unlock()
 
-       // don't add the node into the failover candidates if it's not a master 
node
        if !node.IsMaster() {
+               if c.options.enableSlaveHAUpdate && count >= 
c.options.maxFailureCount && !node.Failed() {
+                       log := logger.Get().With(
+                               zap.String("cluster_name", c.clusterName),
+                               zap.String("id", node.ID()),
+                               zap.String("addr", node.Addr()),
+                               zap.Int64("failure_count", count))
+                       cluster, err := c.clusterStore.GetCluster(c.ctx, 
c.namespace, c.clusterName)
+                       if err != nil {
+                               log.Error("Failed to get the cluster info", 
zap.Error(err))
+                               return count
+                       }
+                       if err := cluster.SetNodeStatusByID(node.ID(), 
store.NodeStatusFailed); err != nil {
+                               log.Error("Failed to set slave node as failed", 
zap.Error(err))
+                               return count
+                       }
+                       if err := c.clusterStore.UpdateCluster(c.ctx, 
c.namespace, cluster); err != nil {
+                               log.Error("Failed to update the cluster", 
zap.Error(err))
+                               return count
+                       }
+                       log.Info("Marked slave node as failed due to probe 
failures")
+               }
                return count
        }
 
-       log := logger.Get().With(
-               zap.String("cluster_name", c.clusterName),
-               zap.String("id", node.ID()),
-               zap.Bool("is_master", node.IsMaster()),
-               zap.String("addr", node.Addr()))
        if count%c.options.maxFailureCount == 0 || count > 
c.options.maxFailureCount {
+               log := logger.Get().With(
+                       zap.String("cluster_name", c.clusterName),
+                       zap.String("id", node.ID()),
+                       zap.Bool("is_master", node.IsMaster()),
+                       zap.String("addr", node.Addr()))
                cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace, 
c.clusterName)
                if err != nil {
                        log.Error("Failed to get the cluster info", 
zap.Error(err))
@@ -188,6 +214,9 @@ func (c *ClusterChecker) syncClusterToNodes(ctx 
context.Context) error {
        version := clusterInfo.Version.Load()
        for _, shard := range clusterInfo.Shards {
                for _, node := range shard.Nodes {
+                       if node.Failed() {
+                               continue
+                       }
                        go func(n store.Node) {
                                log := logger.Get().With(
                                        zap.String("namespace", c.namespace),
diff --git a/controller/cluster_test.go b/controller/cluster_test.go
index a1a720a..a377444 100644
--- a/controller/cluster_test.go
+++ b/controller/cluster_test.go
@@ -122,8 +122,9 @@ func TestCluster_FailureCount(t *testing.T) {
                namespace:    ns,
                clusterName:  clusterName,
                options: ClusterCheckOptions{
-                       pingInterval:    time.Second,
-                       maxFailureCount: 3,
+                       pingInterval:        time.Second,
+                       maxFailureCount:     3,
+                       enableSlaveHAUpdate: true,
                },
                failureCounts: make(map[string]int64),
                syncCh:        make(chan struct{}, 1),
@@ -144,14 +145,85 @@ func TestCluster_FailureCount(t *testing.T) {
        require.EqualValues(t, 0, cluster.failureCounts[mockNode2.Addr()])
        require.True(t, mockNode2.IsMaster())
 
-       // it will be always increase the failure count until the node is back 
again.
+       // Slave failure count keeps increasing; at threshold the slave is 
auto-marked as failed.
        for i := int64(0); i < cluster.options.maxFailureCount*2; i++ {
                require.EqualValues(t, i+1, cluster.increaseFailureCount(0, 
mockNode3))
        }
+       require.True(t, mockNode3.Failed())
+       require.EqualValues(t, 3, clusterInfo.Version.Load())
        cluster.resetFailureCount(mockNode3.ID())
        require.EqualValues(t, 0, cluster.failureCounts[mockNode3.ID()])
 }
 
+func TestCluster_SlaveFailureAutoOffline(t *testing.T) {
+       ctx := context.Background()
+       ns := "test-ns"
+       clusterName := "test-slave-offline"
+
+       s := NewMockClusterStore()
+       mockMaster := store.NewClusterMockNode()
+       mockMaster.SetRole(store.RoleMaster)
+       mockMaster.Sequence = 100
+
+       mockSlave1 := store.NewClusterMockNode()
+       mockSlave1.SetRole(store.RoleSlave)
+       mockSlave1.Sequence = 90
+
+       mockSlave2 := store.NewClusterMockNode()
+       mockSlave2.SetRole(store.RoleSlave)
+       mockSlave2.Sequence = 80
+
+       clusterInfo := &store.Cluster{
+               Name: clusterName,
+               Shards: []*store.Shard{{
+                       Nodes:            []store.Node{mockMaster, mockSlave1, 
mockSlave2},
+                       SlotRanges:       []store.SlotRange{{Start: 0, Stop: 
16383}},
+                       MigratingSlot:    &store.MigratingSlot{IsMigrating: 
false},
+                       TargetShardIndex: -1,
+               }},
+       }
+       clusterInfo.Version.Store(1)
+       require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo))
+
+       checker := &ClusterChecker{
+               clusterStore: s,
+               namespace:    ns,
+               clusterName:  clusterName,
+               options: ClusterCheckOptions{
+                       pingInterval:        time.Second,
+                       maxFailureCount:     3,
+                       enableSlaveHAUpdate: true,
+               },
+               failureCounts: make(map[string]int64),
+               syncCh:        make(chan struct{}, 1),
+       }
+
+       // Slave should not be marked as failed before reaching threshold
+       require.False(t, mockSlave1.Failed())
+       for i := int64(0); i < checker.options.maxFailureCount-1; i++ {
+               checker.increaseFailureCount(0, mockSlave1)
+       }
+       require.False(t, mockSlave1.Failed())
+       require.EqualValues(t, 1, clusterInfo.Version.Load())
+
+       // Slave should be marked as failed when reaching threshold
+       checker.increaseFailureCount(0, mockSlave1)
+       require.True(t, mockSlave1.Failed())
+       require.EqualValues(t, 2, clusterInfo.Version.Load())
+
+       // Subsequent failures should not trigger another update (already 
failed)
+       checker.increaseFailureCount(0, mockSlave1)
+       require.True(t, mockSlave1.Failed())
+       require.EqualValues(t, 2, clusterInfo.Version.Load())
+
+       // Other slaves are not affected
+       require.False(t, mockSlave2.Failed())
+
+       // Master should not be affected by slave offline logic
+       require.True(t, mockMaster.IsMaster())
+       require.False(t, mockMaster.Failed())
+}
+
 func TestCluster_LoadAndProbe(t *testing.T) {
        ctx := context.Background()
        ns := "test-ns"
diff --git a/controller/controller.go b/controller/controller.go
index 806c34a..5d2c446 100644
--- a/controller/controller.go
+++ b/controller/controller.go
@@ -189,7 +189,8 @@ func (c *Controller) addCluster(namespace, clusterName 
string) {
 
        cluster := NewClusterChecker(c.clusterStore, namespace, clusterName).
                
WithPingInterval(time.Duration(c.config.FailOver.PingIntervalSeconds) * 
time.Second).
-               WithMaxFailureCount(c.config.FailOver.MaxPingCount)
+               WithMaxFailureCount(c.config.FailOver.MaxPingCount).
+               WithSlaveHAUpdate(c.config.FailOver.EnableSlaveHAUpdate)
        cluster.Start()
 
        c.mu.Lock()
diff --git a/server/api/node.go b/server/api/node.go
index 4407108..cf60ec4 100644
--- a/server/api/node.go
+++ b/server/api/node.go
@@ -23,7 +23,10 @@ package api
 import (
        "strconv"
 
+       "go.uber.org/zap"
+
        "github.com/apache/kvrocks-controller/consts"
+       "github.com/apache/kvrocks-controller/logger"
        "github.com/apache/kvrocks-controller/server/helper"
        "github.com/gin-gonic/gin"
 
@@ -82,3 +85,37 @@ func (handler *NodeHandler) Remove(c *gin.Context) {
        }
        helper.ResponseNoContent(c)
 }
+
+func (handler *NodeHandler) SetStatus(c *gin.Context) {
+       ns := c.Param("namespace")
+       cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)
+
+       var req struct {
+               Addrs  []string `json:"addrs" binding:"required"`
+               Online bool     `json:"online"`
+       }
+       if err := c.ShouldBindJSON(&req); err != nil {
+               helper.ResponseBadRequest(c, err)
+               return
+       }
+
+       var err error
+       if req.Online {
+               err = cluster.SetNodesOnline(req.Addrs)
+       } else {
+               err = cluster.SetNodesOffline(req.Addrs)
+       }
+       if err != nil {
+               helper.ResponseError(c, err)
+               return
+       }
+
+       if err := handler.s.UpdateCluster(c, ns, cluster); err != nil {
+               helper.ResponseError(c, err)
+               return
+       }
+       if err := cluster.SyncToNodes(c); err != nil {
+               logger.Get().With(zap.Error(err)).Warn("Failed to sync cluster 
info to nodes after status change")
+       }
+       helper.ResponseOK(c, nil)
+}
diff --git a/server/api/node_test.go b/server/api/node_test.go
index 8629185..a93e0ca 100644
--- a/server/api/node_test.go
+++ b/server/api/node_test.go
@@ -124,3 +124,52 @@ func TestNodeBasics(t *testing.T) {
                runRemove(t, cluster.Shards[0].Nodes[1].ID(), 
http.StatusNoContent)
        })
 }
+
+func TestNodeSetStatus(t *testing.T) {
+       ns := "test-ns"
+       cluster, err := store.NewCluster("test-cluster", 
[]string{"127.0.0.1:1234", "127.0.0.1:1235"}, 2)
+       require.NoError(t, err)
+
+       handler := &NodeHandler{s: store.NewClusterStore(engine.NewMock())}
+       require.NoError(t, handler.s.CreateCluster(context.Background(), ns, 
cluster))
+
+       slaveAddr := cluster.Shards[0].Nodes[1].Addr()
+       masterAddr := cluster.Shards[0].Nodes[0].Addr()
+
+       runSetStatus := func(t *testing.T, addrs []string, online bool, 
expectedCode int) {
+               var req struct {
+                       Addrs  []string `json:"addrs"`
+                       Online bool     `json:"online"`
+               }
+               req.Addrs = addrs
+               req.Online = online
+
+               recorder := httptest.NewRecorder()
+               ctx := GetTestContext(recorder)
+               body, err := json.Marshal(req)
+               require.NoError(t, err)
+
+               ctx.Set(consts.ContextKeyStore, handler.s)
+               ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
+               ctx.Params = []gin.Param{
+                       {Key: "namespace", Value: ns},
+                       {Key: "cluster", Value: cluster.Name},
+               }
+               middleware.RequiredCluster(ctx)
+               require.Equal(t, http.StatusOK, recorder.Code)
+               handler.SetStatus(ctx)
+               require.Equal(t, expectedCode, recorder.Code)
+       }
+
+       t.Run("offline slave", func(t *testing.T) {
+               runSetStatus(t, []string{slaveAddr}, false, http.StatusOK)
+       })
+
+       t.Run("online slave", func(t *testing.T) {
+               runSetStatus(t, []string{slaveAddr}, true, http.StatusOK)
+       })
+
+       t.Run("offline master rejected", func(t *testing.T) {
+               runSetStatus(t, []string{masterAddr}, false, 
http.StatusBadRequest)
+       })
+}
diff --git a/server/helper/helper.go b/server/helper/helper.go
index 896a2a4..88550c0 100644
--- a/server/helper/helper.go
+++ b/server/helper/helper.go
@@ -77,6 +77,8 @@ func ResponseError(c *gin.Context, err error) {
                code = http.StatusForbidden
        } else if errors.Is(err, consts.ErrInvalidArgument) {
                code = http.StatusBadRequest
+       } else if errors.Is(err, consts.ErrCannotOfflineMaster) {
+               code = http.StatusBadRequest
        }
        c.JSON(code, Response{
                Error: &Error{Message: err.Error()},
diff --git a/server/route.go b/server/route.go
index 109f8dd..b5eb94a 100644
--- a/server/route.go
+++ b/server/route.go
@@ -70,6 +70,7 @@ func (srv *Server) initHandlers() {
                        clusters.GET("/:cluster", middleware.RequiredCluster, 
handler.Cluster.Get)
                        clusters.DELETE("/:cluster", 
middleware.RequiredCluster, handler.Cluster.Remove)
                        clusters.POST("/:cluster/migrate", 
handler.Cluster.MigrateSlot)
+                       clusters.POST("/:cluster/nodes/status", 
middleware.RequiredCluster, handler.Node.SetStatus)
                }
 
                shards := clusters.Group("/:cluster/shards")
diff --git a/store/cluster.go b/store/cluster.go
index e33e41a..00c0ee3 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -157,6 +157,64 @@ func (cluster *Cluster) SyncToNodes(ctx context.Context) 
error {
        return nil
 }
 
+func (cluster *Cluster) findNodeByAddr(addr string) Node {
+       for _, shard := range cluster.Shards {
+               for _, node := range shard.Nodes {
+                       if node.Addr() == addr {
+                               return node
+                       }
+               }
+       }
+       return nil
+}
+
+func (cluster *Cluster) SetNodeStatusByID(nodeID string, status NodeStatus) 
error {
+       for _, shard := range cluster.Shards {
+               for _, node := range shard.Nodes {
+                       if node.ID() == nodeID {
+                               node.SetStatus(status)
+                               return nil
+                       }
+               }
+       }
+       return fmt.Errorf("node %s: %w", nodeID, consts.ErrNotFound)
+}
+
+// setNodesStatus finds nodes by address, applies an optional per-node 
validation,
+// and only updates status when all nodes pass — ensuring all-or-nothing 
semantics.
+func (cluster *Cluster) setNodesStatus(addrs []string, status NodeStatus, 
validate func(Node) error) error {
+       nodes := make([]Node, 0, len(addrs))
+       for _, addr := range addrs {
+               node := cluster.findNodeByAddr(addr)
+               if node == nil {
+                       return fmt.Errorf("node %s: %w", addr, 
consts.ErrNotFound)
+               }
+               if validate != nil {
+                       if err := validate(node); err != nil {
+                               return err
+                       }
+               }
+               nodes = append(nodes, node)
+       }
+       for _, node := range nodes {
+               node.SetStatus(status)
+       }
+       return nil
+}
+
+func (cluster *Cluster) SetNodesOffline(addrs []string) error {
+       return cluster.setNodesStatus(addrs, NodeStatusFailed, func(node Node) 
error {
+               if node.IsMaster() {
+                       return fmt.Errorf("node %s: %w", node.Addr(), 
consts.ErrCannotOfflineMaster)
+               }
+               return nil
+       })
+}
+
+func (cluster *Cluster) SetNodesOnline(addrs []string) error {
+       return cluster.setNodesStatus(addrs, NodeStatusNormal, nil)
+}
+
 func (cluster *Cluster) GetNodes() []Node {
        nodes := make([]Node, 0)
        for i := 0; i < len(cluster.Shards); i++ {
@@ -273,10 +331,19 @@ func ParseCluster(clusterStr string) (*Cluster, error) {
                        addr: strings.Split(fields[1], "@")[0],
                }
 
-               if strings.Contains(fields[2], ",") {
-                       node.role = strings.Split(fields[2], ",")[1]
-               } else {
-                       node.role = fields[2]
+               // Parse comma-separated flags (e.g. "slave,fail", 
"myself,master")
+               // to extract role and failed state.
+               roleFlags := strings.Split(fields[2], ",")
+               for _, flag := range roleFlags {
+                       switch flag {
+                       case RoleMaster:
+                               node.role = RoleMaster
+                       case RoleSlave:
+                               node.role = RoleSlave
+                       case "fail":
+                               node.status = NodeStatusFailed
+                       }
+                       // ignore: myself, pfail, handshake, noaddr, 
nofailover, noflags
                }
 
                var err error
diff --git a/store/cluster_node.go b/store/cluster_node.go
index 8fcd106..d0b2523 100755
--- a/store/cluster_node.go
+++ b/store/cluster_node.go
@@ -42,6 +42,21 @@ const (
        NodeIDLen = 40
 )
 
+type NodeStatus string
+
+const (
+       NodeStatusNormal NodeStatus = "normal"
+       NodeStatusFailed NodeStatus = "failed"
+)
+
+func (s NodeStatus) IsValid() bool {
+       switch s {
+       case NodeStatusNormal, NodeStatusFailed:
+               return true
+       }
+       return false
+}
+
 const (
        dialTimeout  = 3200 * time.Millisecond
        readTimeout  = 3 * time.Second
@@ -59,9 +74,12 @@ type Node interface {
        Password() string
        Addr() string
        IsMaster() bool
+       Status() NodeStatus
+       Failed() bool
 
        SetRole(string)
        SetPassword(string)
+       SetStatus(NodeStatus)
 
        Reset(ctx context.Context) error
        GetClusterNodeInfo(ctx context.Context) (*ClusterNodeInfo, error)
@@ -82,6 +100,7 @@ type ClusterNode struct {
        role      string
        password  string
        createdAt int64
+       status    NodeStatus
 }
 
 type ClusterInfo struct {
@@ -101,6 +120,7 @@ func NewClusterNode(addr, password string) *ClusterNode {
                addr:      addr,
                password:  password,
                role:      RoleMaster,
+               status:    NodeStatusNormal,
                createdAt: time.Now().Unix(),
        }
 }
@@ -121,6 +141,26 @@ func (n *ClusterNode) SetRole(role string) {
        n.role = role
 }
 
+func (n *ClusterNode) Status() NodeStatus {
+       return n.status
+}
+
+func (n *ClusterNode) SetStatus(status NodeStatus) {
+       n.status = status
+}
+
+func (n *ClusterNode) Failed() bool {
+       return n.status == NodeStatusFailed
+}
+
+func (n *ClusterNode) SetFailed(failed bool) {
+       if failed {
+               n.status = NodeStatusFailed
+       } else {
+               n.status = NodeStatusNormal
+       }
+}
+
 func (n *ClusterNode) Addr() string {
        return n.addr
 }
@@ -272,16 +312,18 @@ func (n *ClusterNode) MarshalJSON() ([]byte, error) {
                "role":       n.role,
                "password":   n.password,
                "created_at": n.createdAt,
+               "status":     n.status,
        })
 }
 
 func (n *ClusterNode) UnmarshalJSON(bytes []byte) error {
        var data struct {
-               ID        string `json:"id"`
-               Addr      string `json:"addr"`
-               Role      string `json:"role"`
-               Password  string `json:"password"`
-               CreatedAt int64  `json:"created_at"`
+               ID        string     `json:"id"`
+               Addr      string     `json:"addr"`
+               Role      string     `json:"role"`
+               Password  string     `json:"password"`
+               CreatedAt int64      `json:"created_at"`
+               Status    NodeStatus `json:"status"`
        }
        if err := json.Unmarshal(bytes, &data); err != nil {
                return err
@@ -292,5 +334,13 @@ func (n *ClusterNode) UnmarshalJSON(bytes []byte) error {
        n.role = data.Role
        n.password = data.Password
        n.createdAt = data.CreatedAt
+       switch {
+       case data.Status != "" && !data.Status.IsValid():
+               return fmt.Errorf("unknown node status: %q", data.Status)
+       case data.Status != "":
+               n.status = data.Status
+       default:
+               n.status = NodeStatusNormal
+       }
        return nil
 }
diff --git a/store/cluster_shard.go b/store/cluster_shard.go
index 1181283..910943b 100644
--- a/store/cluster_shard.go
+++ b/store/cluster_shard.go
@@ -293,7 +293,11 @@ func (shard *Shard) ToSlotsString() (string, error) {
                                }
                        }
                } else {
-                       builder.WriteString(RoleSlave)
+                       if node.Failed() {
+                               builder.WriteString(RoleSlave + ",fail")
+                       } else {
+                               builder.WriteString(RoleSlave)
+                       }
                        builder.WriteByte(' ')
                        builder.WriteString(shard.Nodes[masterNodeIndex].ID())
                }
diff --git a/store/cluster_shard_test.go b/store/cluster_shard_test.go
index 1406f35..971bde1 100644
--- a/store/cluster_shard_test.go
+++ b/store/cluster_shard_test.go
@@ -78,3 +78,39 @@ func TestShard_IsServicing(t *testing.T) {
        shard.SlotRanges = []SlotRange{{Start: -1, Stop: -1}}
        require.False(t, shard.IsServicing())
 }
+
+func TestToSlotsString_WithFailedSlave(t *testing.T) {
+       shard := NewShard()
+       shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}}
+
+       master := NewClusterNode("127.0.0.1:6379", "")
+       master.SetRole(RoleMaster)
+
+       slave := NewClusterNode("127.0.0.1:6380", "")
+       slave.SetRole(RoleSlave)
+       slave.SetStatus(NodeStatusFailed)
+
+       shard.Nodes = []Node{master, slave}
+
+       result, err := shard.ToSlotsString()
+       require.NoError(t, err)
+       require.Contains(t, result, "slave,fail "+master.ID())
+}
+
+func TestToSlotsString_WithOnlineSlave(t *testing.T) {
+       shard := NewShard()
+       shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}}
+
+       master := NewClusterNode("127.0.0.1:6379", "")
+       master.SetRole(RoleMaster)
+
+       slave := NewClusterNode("127.0.0.1:6380", "")
+       slave.SetRole(RoleSlave)
+
+       shard.Nodes = []Node{master, slave}
+
+       result, err := shard.ToSlotsString()
+       require.NoError(t, err)
+       require.Contains(t, result, "slave "+master.ID())
+       require.NotContains(t, result, "slave,fail")
+}
diff --git a/store/cluster_test.go b/store/cluster_test.go
index 975f03f..31ae905 100644
--- a/store/cluster_test.go
+++ b/store/cluster_test.go
@@ -106,3 +106,90 @@ func TestCluster_PromoteNewMaster(t *testing.T) {
        require.NoError(t, err)
        require.Equal(t, node2.ID(), newMasterID)
 }
+
+func TestCluster_SetNodeStatusByID(t *testing.T) {
+       cluster, err := NewCluster("test", []string{"node1", "node2", "node3"}, 
3)
+       require.NoError(t, err)
+       require.Len(t, cluster.Shards, 1)
+
+       slaveNode := cluster.Shards[0].Nodes[1]
+       require.Equal(t, NodeStatusNormal, slaveNode.Status())
+
+       // Set to failed
+       err = cluster.SetNodeStatusByID(slaveNode.ID(), NodeStatusFailed)
+       require.NoError(t, err)
+       require.Equal(t, NodeStatusFailed, slaveNode.Status())
+
+       // Set back to normal
+       err = cluster.SetNodeStatusByID(slaveNode.ID(), NodeStatusNormal)
+       require.NoError(t, err)
+       require.Equal(t, NodeStatusNormal, slaveNode.Status())
+
+       // Non-existent node ID
+       err = cluster.SetNodeStatusByID("nonexistent-id", NodeStatusFailed)
+       require.ErrorIs(t, err, consts.ErrNotFound)
+}
+
+func TestCluster_SetNodesOffline(t *testing.T) {
+       cluster, err := NewCluster("test", []string{"node1", "node2"}, 2)
+       require.NoError(t, err)
+       require.Len(t, cluster.Shards, 1)
+
+       masterAddr := cluster.Shards[0].Nodes[0].Addr()
+       slaveAddr := cluster.Shards[0].Nodes[1].Addr()
+
+       // Cannot offline master
+       err = cluster.SetNodesOffline([]string{masterAddr})
+       require.ErrorIs(t, err, consts.ErrCannotOfflineMaster)
+
+       // Can offline slave
+       err = cluster.SetNodesOffline([]string{slaveAddr})
+       require.NoError(t, err)
+       require.True(t, cluster.Shards[0].Nodes[1].Failed())
+
+       // Addr not found
+       err = cluster.SetNodesOffline([]string{"nonexistent:1234"})
+       require.ErrorIs(t, err, consts.ErrNotFound)
+
+       // Atomic: if any addr is invalid, none are applied
+       cluster.Shards[0].Nodes[1].SetStatus(NodeStatusNormal)
+       err = cluster.SetNodesOffline([]string{slaveAddr, "nonexistent:1234"})
+       require.ErrorIs(t, err, consts.ErrNotFound)
+       require.False(t, cluster.Shards[0].Nodes[1].Failed()) // not modified
+}
+
+func TestCluster_SetNodesOnline(t *testing.T) {
+       cluster, err := NewCluster("test", []string{"node1", "node2"}, 2)
+       require.NoError(t, err)
+
+       slaveAddr := cluster.Shards[0].Nodes[1].Addr()
+
+       // First offline
+       err = cluster.SetNodesOffline([]string{slaveAddr})
+       require.NoError(t, err)
+       require.True(t, cluster.Shards[0].Nodes[1].Failed())
+
+       // Then online
+       err = cluster.SetNodesOnline([]string{slaveAddr})
+       require.NoError(t, err)
+       require.False(t, cluster.Shards[0].Nodes[1].Failed())
+}
+
+func TestParseCluster_WithFailFlag(t *testing.T) {
+       // Build a cluster nodes string with a failed slave
+       clusterStr := "cfb28ef1deee4e0fa78da86abe5d24c8589b4f09 127.0.0.1:30001 
master - 0 0 1 connected 0-5460\n" +
+               "e44242e22c74bbe4deab41c6a9dfb68e099f2f08 127.0.0.1:30004 
slave,fail cfb28ef1deee4e0fa78da86abe5d24c8589b4f09 0 0 1 connected"
+
+       cluster, err := ParseCluster(clusterStr)
+       require.NoError(t, err)
+       require.Len(t, cluster.Shards, 1)
+       require.Len(t, cluster.Shards[0].Nodes, 2)
+
+       master := cluster.Shards[0].Nodes[0]
+       require.True(t, master.IsMaster())
+       require.False(t, master.Failed())
+
+       slave := cluster.Shards[0].Nodes[1]
+       require.False(t, slave.IsMaster())
+       require.True(t, slave.Failed())
+}

Reply via email to