This is an automated email from the ASF dual-hosted git repository.
git-hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks-controller.git
The following commit(s) were added to refs/heads/unstable by this push:
new ae16132 Implement node status synchronization in Controller,
including downtime notifications and online/offline management (#388)
ae16132 is described below
commit ae16132f082ab34a50ac02bddb73611751ac0112
Author: Ruifeng Guo <[email protected]>
AuthorDate: Fri Apr 24 16:42:22 2026 +0800
Implement node status synchronization in Controller, including downtime
notifications and online/offline management (#388)
---
config/config.go | 4 +++
consts/errors.go | 1 +
controller/cluster.go | 45 ++++++++++++++++++-----
controller/cluster_test.go | 78 ++++++++++++++++++++++++++++++++++++++--
controller/controller.go | 3 +-
server/api/node.go | 37 +++++++++++++++++++
server/api/node_test.go | 49 +++++++++++++++++++++++++
server/helper/helper.go | 2 ++
server/route.go | 1 +
store/cluster.go | 75 +++++++++++++++++++++++++++++++++++---
store/cluster_node.go | 60 ++++++++++++++++++++++++++++---
store/cluster_shard.go | 6 +++-
store/cluster_shard_test.go | 36 +++++++++++++++++++
store/cluster_test.go | 87 +++++++++++++++++++++++++++++++++++++++++++++
14 files changed, 462 insertions(+), 22 deletions(-)
diff --git a/config/config.go b/config/config.go
index 69558fb..d88862a 100644
--- a/config/config.go
+++ b/config/config.go
@@ -43,6 +43,10 @@ type AdminConfig struct {
type FailOverConfig struct {
PingIntervalSeconds int `yaml:"ping_interval_seconds"`
MaxPingCount int64 `yaml:"max_ping_count"`
+ // EnableSlaveHAUpdate controls whether HA logic marks failed slave
nodes and
+ // propagates the updated topology. Requires kvrocks to support node
status
+ // modification (new versions only). Defaults to false for backward
compatibility.
+ EnableSlaveHAUpdate bool `yaml:"enable_slave_ha_update"`
}
type ControllerConfig struct {
diff --git a/consts/errors.go b/consts/errors.go
index bf7e51d..4d3f1bc 100644
--- a/consts/errors.go
+++ b/consts/errors.go
@@ -38,5 +38,6 @@ var (
ErrShardIsServicing = errors.New("shard is servicing")
ErrShardSlotIsMigrating = errors.New("shard slot is
migrating")
ErrShardNoMatchNewMaster = errors.New("no match new master
in shard")
+ ErrCannotOfflineMaster = errors.New("cannot take master
node offline, failover first")
ErrSlotStartAndStopEqual = errors.New("start and stop of a
range cannot be equal")
)
diff --git a/controller/cluster.go b/controller/cluster.go
index 8d4e70d..22242a9 100755
--- a/controller/cluster.go
+++ b/controller/cluster.go
@@ -38,8 +38,9 @@ var (
)
type ClusterCheckOptions struct {
- pingInterval time.Duration
- maxFailureCount int64
+ pingInterval time.Duration
+ maxFailureCount int64
+ enableSlaveHAUpdate bool
}
type ClusterChecker struct {
@@ -104,6 +105,11 @@ func (c *ClusterChecker) WithMaxFailureCount(count int64)
*ClusterChecker {
return c
}
+func (c *ClusterChecker) WithSlaveHAUpdate(enable bool) *ClusterChecker {
+ c.options.enableSlaveHAUpdate = enable
+ return c
+}
+
func (c *ClusterChecker) probeNode(ctx context.Context, node store.Node)
(int64, error) {
clusterInfo, err := node.GetClusterInfo(ctx)
if err != nil {
@@ -132,17 +138,37 @@ func (c *ClusterChecker) increaseFailureCount(shardIndex
int, node store.Node) i
count := c.failureCounts[id]
c.failureMu.Unlock()
- // don't add the node into the failover candidates if it's not a master
node
if !node.IsMaster() {
+ if c.options.enableSlaveHAUpdate && count >=
c.options.maxFailureCount && !node.Failed() {
+ log := logger.Get().With(
+ zap.String("cluster_name", c.clusterName),
+ zap.String("id", node.ID()),
+ zap.String("addr", node.Addr()),
+ zap.Int64("failure_count", count))
+ cluster, err := c.clusterStore.GetCluster(c.ctx,
c.namespace, c.clusterName)
+ if err != nil {
+ log.Error("Failed to get the cluster info",
zap.Error(err))
+ return count
+ }
+ if err := cluster.SetNodeStatusByID(node.ID(),
store.NodeStatusFailed); err != nil {
+ log.Error("Failed to set slave node as failed",
zap.Error(err))
+ return count
+ }
+ if err := c.clusterStore.UpdateCluster(c.ctx,
c.namespace, cluster); err != nil {
+ log.Error("Failed to update the cluster",
zap.Error(err))
+ return count
+ }
+ log.Info("Marked slave node as failed due to probe
failures")
+ }
return count
}
- log := logger.Get().With(
- zap.String("cluster_name", c.clusterName),
- zap.String("id", node.ID()),
- zap.Bool("is_master", node.IsMaster()),
- zap.String("addr", node.Addr()))
if count%c.options.maxFailureCount == 0 || count >
c.options.maxFailureCount {
+ log := logger.Get().With(
+ zap.String("cluster_name", c.clusterName),
+ zap.String("id", node.ID()),
+ zap.Bool("is_master", node.IsMaster()),
+ zap.String("addr", node.Addr()))
cluster, err := c.clusterStore.GetCluster(c.ctx, c.namespace,
c.clusterName)
if err != nil {
log.Error("Failed to get the cluster info",
zap.Error(err))
@@ -188,6 +214,9 @@ func (c *ClusterChecker) syncClusterToNodes(ctx
context.Context) error {
version := clusterInfo.Version.Load()
for _, shard := range clusterInfo.Shards {
for _, node := range shard.Nodes {
+ if node.Failed() {
+ continue
+ }
go func(n store.Node) {
log := logger.Get().With(
zap.String("namespace", c.namespace),
diff --git a/controller/cluster_test.go b/controller/cluster_test.go
index a1a720a..a377444 100644
--- a/controller/cluster_test.go
+++ b/controller/cluster_test.go
@@ -122,8 +122,9 @@ func TestCluster_FailureCount(t *testing.T) {
namespace: ns,
clusterName: clusterName,
options: ClusterCheckOptions{
- pingInterval: time.Second,
- maxFailureCount: 3,
+ pingInterval: time.Second,
+ maxFailureCount: 3,
+ enableSlaveHAUpdate: true,
},
failureCounts: make(map[string]int64),
syncCh: make(chan struct{}, 1),
@@ -144,14 +145,85 @@ func TestCluster_FailureCount(t *testing.T) {
require.EqualValues(t, 0, cluster.failureCounts[mockNode2.Addr()])
require.True(t, mockNode2.IsMaster())
- // it will be always increase the failure count until the node is back
again.
+ // Slave failure count keeps increasing; at threshold the slave is
auto-marked as failed.
for i := int64(0); i < cluster.options.maxFailureCount*2; i++ {
require.EqualValues(t, i+1, cluster.increaseFailureCount(0,
mockNode3))
}
+ require.True(t, mockNode3.Failed())
+ require.EqualValues(t, 3, clusterInfo.Version.Load())
cluster.resetFailureCount(mockNode3.ID())
require.EqualValues(t, 0, cluster.failureCounts[mockNode3.ID()])
}
+func TestCluster_SlaveFailureAutoOffline(t *testing.T) {
+ ctx := context.Background()
+ ns := "test-ns"
+ clusterName := "test-slave-offline"
+
+ s := NewMockClusterStore()
+ mockMaster := store.NewClusterMockNode()
+ mockMaster.SetRole(store.RoleMaster)
+ mockMaster.Sequence = 100
+
+ mockSlave1 := store.NewClusterMockNode()
+ mockSlave1.SetRole(store.RoleSlave)
+ mockSlave1.Sequence = 90
+
+ mockSlave2 := store.NewClusterMockNode()
+ mockSlave2.SetRole(store.RoleSlave)
+ mockSlave2.Sequence = 80
+
+ clusterInfo := &store.Cluster{
+ Name: clusterName,
+ Shards: []*store.Shard{{
+ Nodes: []store.Node{mockMaster, mockSlave1,
mockSlave2},
+ SlotRanges: []store.SlotRange{{Start: 0, Stop:
16383}},
+ MigratingSlot: &store.MigratingSlot{IsMigrating:
false},
+ TargetShardIndex: -1,
+ }},
+ }
+ clusterInfo.Version.Store(1)
+ require.NoError(t, s.CreateCluster(ctx, ns, clusterInfo))
+
+ checker := &ClusterChecker{
+ clusterStore: s,
+ namespace: ns,
+ clusterName: clusterName,
+ options: ClusterCheckOptions{
+ pingInterval: time.Second,
+ maxFailureCount: 3,
+ enableSlaveHAUpdate: true,
+ },
+ failureCounts: make(map[string]int64),
+ syncCh: make(chan struct{}, 1),
+ }
+
+ // Slave should not be marked as failed before reaching threshold
+ require.False(t, mockSlave1.Failed())
+ for i := int64(0); i < checker.options.maxFailureCount-1; i++ {
+ checker.increaseFailureCount(0, mockSlave1)
+ }
+ require.False(t, mockSlave1.Failed())
+ require.EqualValues(t, 1, clusterInfo.Version.Load())
+
+ // Slave should be marked as failed when reaching threshold
+ checker.increaseFailureCount(0, mockSlave1)
+ require.True(t, mockSlave1.Failed())
+ require.EqualValues(t, 2, clusterInfo.Version.Load())
+
+ // Subsequent failures should not trigger another update (already
failed)
+ checker.increaseFailureCount(0, mockSlave1)
+ require.True(t, mockSlave1.Failed())
+ require.EqualValues(t, 2, clusterInfo.Version.Load())
+
+ // Other slaves are not affected
+ require.False(t, mockSlave2.Failed())
+
+ // Master should not be affected by slave offline logic
+ require.True(t, mockMaster.IsMaster())
+ require.False(t, mockMaster.Failed())
+}
+
func TestCluster_LoadAndProbe(t *testing.T) {
ctx := context.Background()
ns := "test-ns"
diff --git a/controller/controller.go b/controller/controller.go
index 806c34a..5d2c446 100644
--- a/controller/controller.go
+++ b/controller/controller.go
@@ -189,7 +189,8 @@ func (c *Controller) addCluster(namespace, clusterName
string) {
cluster := NewClusterChecker(c.clusterStore, namespace, clusterName).
WithPingInterval(time.Duration(c.config.FailOver.PingIntervalSeconds) *
time.Second).
- WithMaxFailureCount(c.config.FailOver.MaxPingCount)
+ WithMaxFailureCount(c.config.FailOver.MaxPingCount).
+ WithSlaveHAUpdate(c.config.FailOver.EnableSlaveHAUpdate)
cluster.Start()
c.mu.Lock()
diff --git a/server/api/node.go b/server/api/node.go
index 4407108..cf60ec4 100644
--- a/server/api/node.go
+++ b/server/api/node.go
@@ -23,7 +23,10 @@ package api
import (
"strconv"
+ "go.uber.org/zap"
+
"github.com/apache/kvrocks-controller/consts"
+ "github.com/apache/kvrocks-controller/logger"
"github.com/apache/kvrocks-controller/server/helper"
"github.com/gin-gonic/gin"
@@ -82,3 +85,37 @@ func (handler *NodeHandler) Remove(c *gin.Context) {
}
helper.ResponseNoContent(c)
}
+
+func (handler *NodeHandler) SetStatus(c *gin.Context) {
+ ns := c.Param("namespace")
+ cluster, _ := c.MustGet(consts.ContextKeyCluster).(*store.Cluster)
+
+ var req struct {
+ Addrs []string `json:"addrs" binding:"required"`
+ Online bool `json:"online"`
+ }
+ if err := c.ShouldBindJSON(&req); err != nil {
+ helper.ResponseBadRequest(c, err)
+ return
+ }
+
+ var err error
+ if req.Online {
+ err = cluster.SetNodesOnline(req.Addrs)
+ } else {
+ err = cluster.SetNodesOffline(req.Addrs)
+ }
+ if err != nil {
+ helper.ResponseError(c, err)
+ return
+ }
+
+ if err := handler.s.UpdateCluster(c, ns, cluster); err != nil {
+ helper.ResponseError(c, err)
+ return
+ }
+ if err := cluster.SyncToNodes(c); err != nil {
+ logger.Get().With(zap.Error(err)).Warn("Failed to sync cluster
info to nodes after status change")
+ }
+ helper.ResponseOK(c, nil)
+}
diff --git a/server/api/node_test.go b/server/api/node_test.go
index 8629185..a93e0ca 100644
--- a/server/api/node_test.go
+++ b/server/api/node_test.go
@@ -124,3 +124,52 @@ func TestNodeBasics(t *testing.T) {
runRemove(t, cluster.Shards[0].Nodes[1].ID(),
http.StatusNoContent)
})
}
+
+func TestNodeSetStatus(t *testing.T) {
+ ns := "test-ns"
+ cluster, err := store.NewCluster("test-cluster",
[]string{"127.0.0.1:1234", "127.0.0.1:1235"}, 2)
+ require.NoError(t, err)
+
+ handler := &NodeHandler{s: store.NewClusterStore(engine.NewMock())}
+ require.NoError(t, handler.s.CreateCluster(context.Background(), ns,
cluster))
+
+ slaveAddr := cluster.Shards[0].Nodes[1].Addr()
+ masterAddr := cluster.Shards[0].Nodes[0].Addr()
+
+ runSetStatus := func(t *testing.T, addrs []string, online bool,
expectedCode int) {
+ var req struct {
+ Addrs []string `json:"addrs"`
+ Online bool `json:"online"`
+ }
+ req.Addrs = addrs
+ req.Online = online
+
+ recorder := httptest.NewRecorder()
+ ctx := GetTestContext(recorder)
+ body, err := json.Marshal(req)
+ require.NoError(t, err)
+
+ ctx.Set(consts.ContextKeyStore, handler.s)
+ ctx.Request.Body = io.NopCloser(bytes.NewBuffer(body))
+ ctx.Params = []gin.Param{
+ {Key: "namespace", Value: ns},
+ {Key: "cluster", Value: cluster.Name},
+ }
+ middleware.RequiredCluster(ctx)
+ require.Equal(t, http.StatusOK, recorder.Code)
+ handler.SetStatus(ctx)
+ require.Equal(t, expectedCode, recorder.Code)
+ }
+
+ t.Run("offline slave", func(t *testing.T) {
+ runSetStatus(t, []string{slaveAddr}, false, http.StatusOK)
+ })
+
+ t.Run("online slave", func(t *testing.T) {
+ runSetStatus(t, []string{slaveAddr}, true, http.StatusOK)
+ })
+
+ t.Run("offline master rejected", func(t *testing.T) {
+ runSetStatus(t, []string{masterAddr}, false,
http.StatusBadRequest)
+ })
+}
diff --git a/server/helper/helper.go b/server/helper/helper.go
index 896a2a4..88550c0 100644
--- a/server/helper/helper.go
+++ b/server/helper/helper.go
@@ -77,6 +77,8 @@ func ResponseError(c *gin.Context, err error) {
code = http.StatusForbidden
} else if errors.Is(err, consts.ErrInvalidArgument) {
code = http.StatusBadRequest
+ } else if errors.Is(err, consts.ErrCannotOfflineMaster) {
+ code = http.StatusBadRequest
}
c.JSON(code, Response{
Error: &Error{Message: err.Error()},
diff --git a/server/route.go b/server/route.go
index 109f8dd..b5eb94a 100644
--- a/server/route.go
+++ b/server/route.go
@@ -70,6 +70,7 @@ func (srv *Server) initHandlers() {
clusters.GET("/:cluster", middleware.RequiredCluster,
handler.Cluster.Get)
clusters.DELETE("/:cluster",
middleware.RequiredCluster, handler.Cluster.Remove)
clusters.POST("/:cluster/migrate",
handler.Cluster.MigrateSlot)
+ clusters.POST("/:cluster/nodes/status",
middleware.RequiredCluster, handler.Node.SetStatus)
}
shards := clusters.Group("/:cluster/shards")
diff --git a/store/cluster.go b/store/cluster.go
index e33e41a..00c0ee3 100644
--- a/store/cluster.go
+++ b/store/cluster.go
@@ -157,6 +157,64 @@ func (cluster *Cluster) SyncToNodes(ctx context.Context)
error {
return nil
}
+func (cluster *Cluster) findNodeByAddr(addr string) Node {
+ for _, shard := range cluster.Shards {
+ for _, node := range shard.Nodes {
+ if node.Addr() == addr {
+ return node
+ }
+ }
+ }
+ return nil
+}
+
+func (cluster *Cluster) SetNodeStatusByID(nodeID string, status NodeStatus)
error {
+ for _, shard := range cluster.Shards {
+ for _, node := range shard.Nodes {
+ if node.ID() == nodeID {
+ node.SetStatus(status)
+ return nil
+ }
+ }
+ }
+ return fmt.Errorf("node %s: %w", nodeID, consts.ErrNotFound)
+}
+
+// setNodesStatus finds nodes by address, applies an optional per-node
validation,
+// and only updates status when all nodes pass — ensuring all-or-nothing
semantics.
+func (cluster *Cluster) setNodesStatus(addrs []string, status NodeStatus,
validate func(Node) error) error {
+ nodes := make([]Node, 0, len(addrs))
+ for _, addr := range addrs {
+ node := cluster.findNodeByAddr(addr)
+ if node == nil {
+ return fmt.Errorf("node %s: %w", addr,
consts.ErrNotFound)
+ }
+ if validate != nil {
+ if err := validate(node); err != nil {
+ return err
+ }
+ }
+ nodes = append(nodes, node)
+ }
+ for _, node := range nodes {
+ node.SetStatus(status)
+ }
+ return nil
+}
+
+func (cluster *Cluster) SetNodesOffline(addrs []string) error {
+ return cluster.setNodesStatus(addrs, NodeStatusFailed, func(node Node)
error {
+ if node.IsMaster() {
+ return fmt.Errorf("node %s: %w", node.Addr(),
consts.ErrCannotOfflineMaster)
+ }
+ return nil
+ })
+}
+
+func (cluster *Cluster) SetNodesOnline(addrs []string) error {
+ return cluster.setNodesStatus(addrs, NodeStatusNormal, nil)
+}
+
func (cluster *Cluster) GetNodes() []Node {
nodes := make([]Node, 0)
for i := 0; i < len(cluster.Shards); i++ {
@@ -273,10 +331,19 @@ func ParseCluster(clusterStr string) (*Cluster, error) {
addr: strings.Split(fields[1], "@")[0],
}
- if strings.Contains(fields[2], ",") {
- node.role = strings.Split(fields[2], ",")[1]
- } else {
- node.role = fields[2]
+ // Parse comma-separated flags (e.g. "slave,fail",
"myself,master")
+ // to extract role and failed state.
+ roleFlags := strings.Split(fields[2], ",")
+ for _, flag := range roleFlags {
+ switch flag {
+ case RoleMaster:
+ node.role = RoleMaster
+ case RoleSlave:
+ node.role = RoleSlave
+ case "fail":
+ node.status = NodeStatusFailed
+ }
+ // ignore: myself, pfail, handshake, noaddr,
nofailover, noflags
}
var err error
diff --git a/store/cluster_node.go b/store/cluster_node.go
index 8fcd106..d0b2523 100755
--- a/store/cluster_node.go
+++ b/store/cluster_node.go
@@ -42,6 +42,21 @@ const (
NodeIDLen = 40
)
+type NodeStatus string
+
+const (
+ NodeStatusNormal NodeStatus = "normal"
+ NodeStatusFailed NodeStatus = "failed"
+)
+
+func (s NodeStatus) IsValid() bool {
+ switch s {
+ case NodeStatusNormal, NodeStatusFailed:
+ return true
+ }
+ return false
+}
+
const (
dialTimeout = 3200 * time.Millisecond
readTimeout = 3 * time.Second
@@ -59,9 +74,12 @@ type Node interface {
Password() string
Addr() string
IsMaster() bool
+ Status() NodeStatus
+ Failed() bool
SetRole(string)
SetPassword(string)
+ SetStatus(NodeStatus)
Reset(ctx context.Context) error
GetClusterNodeInfo(ctx context.Context) (*ClusterNodeInfo, error)
@@ -82,6 +100,7 @@ type ClusterNode struct {
role string
password string
createdAt int64
+ status NodeStatus
}
type ClusterInfo struct {
@@ -101,6 +120,7 @@ func NewClusterNode(addr, password string) *ClusterNode {
addr: addr,
password: password,
role: RoleMaster,
+ status: NodeStatusNormal,
createdAt: time.Now().Unix(),
}
}
@@ -121,6 +141,26 @@ func (n *ClusterNode) SetRole(role string) {
n.role = role
}
+func (n *ClusterNode) Status() NodeStatus {
+ return n.status
+}
+
+func (n *ClusterNode) SetStatus(status NodeStatus) {
+ n.status = status
+}
+
+func (n *ClusterNode) Failed() bool {
+ return n.status == NodeStatusFailed
+}
+
+func (n *ClusterNode) SetFailed(failed bool) {
+ if failed {
+ n.status = NodeStatusFailed
+ } else {
+ n.status = NodeStatusNormal
+ }
+}
+
func (n *ClusterNode) Addr() string {
return n.addr
}
@@ -272,16 +312,18 @@ func (n *ClusterNode) MarshalJSON() ([]byte, error) {
"role": n.role,
"password": n.password,
"created_at": n.createdAt,
+ "status": n.status,
})
}
func (n *ClusterNode) UnmarshalJSON(bytes []byte) error {
var data struct {
- ID string `json:"id"`
- Addr string `json:"addr"`
- Role string `json:"role"`
- Password string `json:"password"`
- CreatedAt int64 `json:"created_at"`
+ ID string `json:"id"`
+ Addr string `json:"addr"`
+ Role string `json:"role"`
+ Password string `json:"password"`
+ CreatedAt int64 `json:"created_at"`
+ Status NodeStatus `json:"status"`
}
if err := json.Unmarshal(bytes, &data); err != nil {
return err
@@ -292,5 +334,13 @@ func (n *ClusterNode) UnmarshalJSON(bytes []byte) error {
n.role = data.Role
n.password = data.Password
n.createdAt = data.CreatedAt
+ switch {
+ case data.Status != "" && !data.Status.IsValid():
+ return fmt.Errorf("unknown node status: %q", data.Status)
+ case data.Status != "":
+ n.status = data.Status
+ default:
+ n.status = NodeStatusNormal
+ }
return nil
}
diff --git a/store/cluster_shard.go b/store/cluster_shard.go
index 1181283..910943b 100644
--- a/store/cluster_shard.go
+++ b/store/cluster_shard.go
@@ -293,7 +293,11 @@ func (shard *Shard) ToSlotsString() (string, error) {
}
}
} else {
- builder.WriteString(RoleSlave)
+ if node.Failed() {
+ builder.WriteString(RoleSlave + ",fail")
+ } else {
+ builder.WriteString(RoleSlave)
+ }
builder.WriteByte(' ')
builder.WriteString(shard.Nodes[masterNodeIndex].ID())
}
diff --git a/store/cluster_shard_test.go b/store/cluster_shard_test.go
index 1406f35..971bde1 100644
--- a/store/cluster_shard_test.go
+++ b/store/cluster_shard_test.go
@@ -78,3 +78,39 @@ func TestShard_IsServicing(t *testing.T) {
shard.SlotRanges = []SlotRange{{Start: -1, Stop: -1}}
require.False(t, shard.IsServicing())
}
+
+func TestToSlotsString_WithFailedSlave(t *testing.T) {
+ shard := NewShard()
+ shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}}
+
+ master := NewClusterNode("127.0.0.1:6379", "")
+ master.SetRole(RoleMaster)
+
+ slave := NewClusterNode("127.0.0.1:6380", "")
+ slave.SetRole(RoleSlave)
+ slave.SetStatus(NodeStatusFailed)
+
+ shard.Nodes = []Node{master, slave}
+
+ result, err := shard.ToSlotsString()
+ require.NoError(t, err)
+ require.Contains(t, result, "slave,fail "+master.ID())
+}
+
+func TestToSlotsString_WithOnlineSlave(t *testing.T) {
+ shard := NewShard()
+ shard.SlotRanges = []SlotRange{{Start: 0, Stop: 100}}
+
+ master := NewClusterNode("127.0.0.1:6379", "")
+ master.SetRole(RoleMaster)
+
+ slave := NewClusterNode("127.0.0.1:6380", "")
+ slave.SetRole(RoleSlave)
+
+ shard.Nodes = []Node{master, slave}
+
+ result, err := shard.ToSlotsString()
+ require.NoError(t, err)
+ require.Contains(t, result, "slave "+master.ID())
+ require.NotContains(t, result, "slave,fail")
+}
diff --git a/store/cluster_test.go b/store/cluster_test.go
index 975f03f..31ae905 100644
--- a/store/cluster_test.go
+++ b/store/cluster_test.go
@@ -106,3 +106,90 @@ func TestCluster_PromoteNewMaster(t *testing.T) {
require.NoError(t, err)
require.Equal(t, node2.ID(), newMasterID)
}
+
+func TestCluster_SetNodeStatusByID(t *testing.T) {
+ cluster, err := NewCluster("test", []string{"node1", "node2", "node3"},
3)
+ require.NoError(t, err)
+ require.Len(t, cluster.Shards, 1)
+
+ slaveNode := cluster.Shards[0].Nodes[1]
+ require.Equal(t, NodeStatusNormal, slaveNode.Status())
+
+ // Set to failed
+ err = cluster.SetNodeStatusByID(slaveNode.ID(), NodeStatusFailed)
+ require.NoError(t, err)
+ require.Equal(t, NodeStatusFailed, slaveNode.Status())
+
+ // Set back to normal
+ err = cluster.SetNodeStatusByID(slaveNode.ID(), NodeStatusNormal)
+ require.NoError(t, err)
+ require.Equal(t, NodeStatusNormal, slaveNode.Status())
+
+ // Non-existent node ID
+ err = cluster.SetNodeStatusByID("nonexistent-id", NodeStatusFailed)
+ require.ErrorIs(t, err, consts.ErrNotFound)
+}
+
+func TestCluster_SetNodesOffline(t *testing.T) {
+ cluster, err := NewCluster("test", []string{"node1", "node2"}, 2)
+ require.NoError(t, err)
+ require.Len(t, cluster.Shards, 1)
+
+ masterAddr := cluster.Shards[0].Nodes[0].Addr()
+ slaveAddr := cluster.Shards[0].Nodes[1].Addr()
+
+ // Cannot offline master
+ err = cluster.SetNodesOffline([]string{masterAddr})
+ require.ErrorIs(t, err, consts.ErrCannotOfflineMaster)
+
+ // Can offline slave
+ err = cluster.SetNodesOffline([]string{slaveAddr})
+ require.NoError(t, err)
+ require.True(t, cluster.Shards[0].Nodes[1].Failed())
+
+ // Addr not found
+ err = cluster.SetNodesOffline([]string{"nonexistent:1234"})
+ require.ErrorIs(t, err, consts.ErrNotFound)
+
+ // Atomic: if any addr is invalid, none are applied
+ cluster.Shards[0].Nodes[1].SetStatus(NodeStatusNormal)
+ err = cluster.SetNodesOffline([]string{slaveAddr, "nonexistent:1234"})
+ require.ErrorIs(t, err, consts.ErrNotFound)
+ require.False(t, cluster.Shards[0].Nodes[1].Failed()) // not modified
+}
+
+func TestCluster_SetNodesOnline(t *testing.T) {
+ cluster, err := NewCluster("test", []string{"node1", "node2"}, 2)
+ require.NoError(t, err)
+
+ slaveAddr := cluster.Shards[0].Nodes[1].Addr()
+
+ // First offline
+ err = cluster.SetNodesOffline([]string{slaveAddr})
+ require.NoError(t, err)
+ require.True(t, cluster.Shards[0].Nodes[1].Failed())
+
+ // Then online
+ err = cluster.SetNodesOnline([]string{slaveAddr})
+ require.NoError(t, err)
+ require.False(t, cluster.Shards[0].Nodes[1].Failed())
+}
+
+func TestParseCluster_WithFailFlag(t *testing.T) {
+ // Build a cluster nodes string with a failed slave
+ clusterStr := "cfb28ef1deee4e0fa78da86abe5d24c8589b4f09 127.0.0.1:30001
master - 0 0 1 connected 0-5460\n" +
+ "e44242e22c74bbe4deab41c6a9dfb68e099f2f08 127.0.0.1:30004
slave,fail cfb28ef1deee4e0fa78da86abe5d24c8589b4f09 0 0 1 connected"
+
+ cluster, err := ParseCluster(clusterStr)
+ require.NoError(t, err)
+ require.Len(t, cluster.Shards, 1)
+ require.Len(t, cluster.Shards[0].Nodes, 2)
+
+ master := cluster.Shards[0].Nodes[0]
+ require.True(t, master.IsMaster())
+ require.False(t, master.Failed())
+
+ slave := cluster.Shards[0].Nodes[1]
+ require.False(t, slave.IsMaster())
+ require.True(t, slave.Failed())
+}