This is an automated email from the ASF dual-hosted git repository.

twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 96dd8cb26 fix(stream): accept `~` modifier in XADD and XTRIM (#3403)
96dd8cb26 is described below

commit 96dd8cb2601b88314567ea6e253e73f01eed7c94
Author: XYenon <[email protected]>
AuthorDate: Sat Mar 28 22:18:19 2026 +0800

    fix(stream): accept `~` modifier in XADD and XTRIM (#3403)
    
    Fixes https://github.com/apache/kvrocks/issues/3402
    
    XADD and XTRIM commands only recognized `=` as the optional modifier for
    MAXLEN/MINID but not `~`, causing a parse error. Redis supports both
    modifiers for compatibility.
---
 src/commands/cmd_stream.cc                   | 28 ++++++++---------
 tests/gocase/unit/type/stream/stream_test.go | 47 ++++++++++++++++++++++++++++
 2 files changed, 61 insertions(+), 14 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 233d8475f..5a2d52967 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -115,10 +115,10 @@ class CommandXAdd : public Commander {
         }
 
         size_t max_len_idx = 0;
-        bool eq_sign_found = false;
-        if (args[i + 1] == "=") {
+        bool modifier_found = false;
+        if (args[i + 1] == "=" || args[i + 1] == "~") {
           max_len_idx = i + 2;
-          eq_sign_found = true;
+          modifier_found = true;
         } else {
           max_len_idx = i + 1;
         }
@@ -135,7 +135,7 @@ class CommandXAdd : public Commander {
         max_len_ = *parse_result;
         with_max_len_ = true;
 
-        i += eq_sign_found ? 3 : 2;
+        i += modifier_found ? 3 : 2;
         continue;
       }
 
@@ -145,10 +145,10 @@ class CommandXAdd : public Commander {
         }
 
         size_t min_id_idx = 0;
-        bool eq_sign_found = false;
-        if (args[i + 1] == "=") {
+        bool modifier_found = false;
+        if (args[i + 1] == "=" || args[i + 1] == "~") {
           min_id_idx = i + 2;
-          eq_sign_found = true;
+          modifier_found = true;
         } else {
           min_id_idx = i + 1;
         }
@@ -161,7 +161,7 @@ class CommandXAdd : public Commander {
         if (!s.IsOK()) return s;
 
         with_min_id_ = true;
-        i += eq_sign_found ? 3 : 2;
+        i += modifier_found ? 3 : 2;
         continue;
       }
 
@@ -1734,18 +1734,18 @@ class CommandXReadGroup : public Commander,
 class CommandXTrim : public Commander {
  public:
   Status Parse(const std::vector<std::string> &args) override {
-    bool eq_sign_found = false;
+    bool modifier_found = false;
 
     auto trim_strategy = util::ToLower(args[2]);
     if (trim_strategy == "maxlen") {
       strategy_ = StreamTrimStrategy::MaxLen;
 
       size_t max_len_idx = 0;
-      if (args[3] != "=") {
+      if (args[3] != "=" && args[3] != "~") {
         max_len_idx = 3;
       } else {
         max_len_idx = 4;
-        eq_sign_found = true;
+        modifier_found = true;
       }
 
       if (max_len_idx >= args.size()) {
@@ -1762,11 +1762,11 @@ class CommandXTrim : public Commander {
       strategy_ = StreamTrimStrategy::MinID;
 
       size_t min_id_idx = 0;
-      if (args[3] != "=") {
+      if (args[3] != "=" && args[3] != "~") {
         min_id_idx = 3;
       } else {
         min_id_idx = 4;
-        eq_sign_found = true;
+        modifier_found = true;
       }
 
       if (min_id_idx >= args.size()) {
@@ -1782,7 +1782,7 @@ class CommandXTrim : public Commander {
     }
 
     bool limit_option_found = false;
-    if (eq_sign_found) {
+    if (modifier_found) {
       if (args.size() > 6 && util::ToLower(args[5]) == "limit") {
         limit_option_found = true;
       }
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 4245c59ae..4421f2156 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -182,6 +182,29 @@ var streamTests = func(t *testing.T, configs 
util.KvrocksServerConfigs) {
                require.EqualValues(t, 5, rdb.XLen(ctx, "mystream").Val())
        })
 
+       t.Run("XADD with MAXLEN option and the '~' argument", func(t 
*testing.T) {
+               require.NoError(t, rdb.Del(ctx, "mystream").Err())
+               for i := 0; i < 1000; i++ {
+                       if rand.Float64() < 0.9 {
+                               require.NoError(t, rdb.Do(ctx, "XADD", 
"mystream", "MAXLEN", "~", "5", "*", "xitem", "i").Err())
+                       } else {
+                               require.NoError(t, rdb.Do(ctx, "XADD", 
"mystream", "MAXLEN", "~", "5", "*", "yitem", "i").Err())
+                       }
+               }
+               require.EqualValues(t, 5, rdb.XLen(ctx, "mystream").Val())
+       })
+
+       t.Run("XADD with MINID option and the '~' argument", func(t *testing.T) 
{
+               require.NoError(t, rdb.Del(ctx, "mystream").Err())
+               for i := 1; i <= 10; i++ {
+                       require.NoError(t, rdb.XAdd(ctx, 
&redis.XAddArgs{Stream: "mystream", ID: strconv.Itoa(i) + "-0", Values: 
[]string{"f", "v"}}).Err())
+               }
+               require.NoError(t, rdb.Do(ctx, "XADD", "mystream", "MINID", 
"~", "8-0", "11-0", "f", "v").Err())
+               items := rdb.XRange(ctx, "mystream", "-", "+").Val()
+               require.Len(t, items, 4)
+               require.EqualValues(t, "8-0", items[0].ID)
+       })
+
        t.Run("XADD with NOMKSTREAM option", func(t *testing.T) {
                require.NoError(t, rdb.Del(ctx, "mystream").Err())
                require.Empty(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"mystream", NoMkStream: true, Values: []string{"item", "1", "value", 
"a"}}).Val())
@@ -605,6 +628,30 @@ var streamTests = func(t *testing.T, configs 
util.KvrocksServerConfigs) {
                require.EqualValues(t, 555, rdb.XLen(ctx, "mystream").Val())
        })
 
+       t.Run("XTRIM with MAXLEN option and the '~' argument", func(t 
*testing.T) {
+               require.NoError(t, rdb.Del(ctx, "mystream").Err())
+               for i := 0; i < 1000; i++ {
+                       require.NoError(t, rdb.XAdd(ctx, 
&redis.XAddArgs{Stream: "mystream", Values: map[string]interface{}{"xitem": 
i}}).Err())
+               }
+               require.NoError(t, rdb.Do(ctx, "XTRIM", "mystream", "MAXLEN", 
"~", "666").Err())
+               require.EqualValues(t, 666, rdb.XLen(ctx, "mystream").Val())
+       })
+
+       t.Run("XTRIM with MINID option and the '~' argument", func(t 
*testing.T) {
+               require.NoError(t, rdb.Del(ctx, "mystream").Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"mystream", ID: "1-0", Values: []string{"f", "v"}}).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"mystream", ID: "2-0", Values: []string{"f", "v"}}).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"mystream", ID: "3-0", Values: []string{"f", "v"}}).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"mystream", ID: "4-0", Values: []string{"f", "v"}}).Err())
+               require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream: 
"mystream", ID: "5-0", Values: []string{"f", "v"}}).Err())
+               require.NoError(t, rdb.Do(ctx, "XTRIM", "mystream", "MINID", 
"~", "3-0").Err())
+               items := rdb.XRange(ctx, "mystream", "-", "+").Val()
+               require.Len(t, items, 3)
+               require.EqualValues(t, "3-0", items[0].ID)
+               require.EqualValues(t, "4-0", items[1].ID)
+               require.EqualValues(t, "5-0", items[2].ID)
+       })
+
        t.Run("XADD with LIMIT consecutive calls", func(t *testing.T) {
                require.NoError(t, rdb.Del(ctx, "mystream").Err())
                for i := 0; i < 100; i++ {

Reply via email to