This is an automated email from the ASF dual-hosted git repository.
twice pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 96dd8cb26 fix(stream): accept `~` modifier in XADD and XTRIM (#3403)
96dd8cb26 is described below
commit 96dd8cb2601b88314567ea6e253e73f01eed7c94
Author: XYenon <[email protected]>
AuthorDate: Sat Mar 28 22:18:19 2026 +0800
fix(stream): accept `~` modifier in XADD and XTRIM (#3403)
Fixes https://github.com/apache/kvrocks/issues/3402
XADD and XTRIM commands only recognized `=` as the optional modifier for
MAXLEN/MINID but not `~`, causing a parse error. Redis supports both
modifiers for compatibility.
---
src/commands/cmd_stream.cc | 28 ++++++++---------
tests/gocase/unit/type/stream/stream_test.go | 47 ++++++++++++++++++++++++++++
2 files changed, 61 insertions(+), 14 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 233d8475f..5a2d52967 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -115,10 +115,10 @@ class CommandXAdd : public Commander {
}
size_t max_len_idx = 0;
- bool eq_sign_found = false;
- if (args[i + 1] == "=") {
+ bool modifier_found = false;
+ if (args[i + 1] == "=" || args[i + 1] == "~") {
max_len_idx = i + 2;
- eq_sign_found = true;
+ modifier_found = true;
} else {
max_len_idx = i + 1;
}
@@ -135,7 +135,7 @@ class CommandXAdd : public Commander {
max_len_ = *parse_result;
with_max_len_ = true;
- i += eq_sign_found ? 3 : 2;
+ i += modifier_found ? 3 : 2;
continue;
}
@@ -145,10 +145,10 @@ class CommandXAdd : public Commander {
}
size_t min_id_idx = 0;
- bool eq_sign_found = false;
- if (args[i + 1] == "=") {
+ bool modifier_found = false;
+ if (args[i + 1] == "=" || args[i + 1] == "~") {
min_id_idx = i + 2;
- eq_sign_found = true;
+ modifier_found = true;
} else {
min_id_idx = i + 1;
}
@@ -161,7 +161,7 @@ class CommandXAdd : public Commander {
if (!s.IsOK()) return s;
with_min_id_ = true;
- i += eq_sign_found ? 3 : 2;
+ i += modifier_found ? 3 : 2;
continue;
}
@@ -1734,18 +1734,18 @@ class CommandXReadGroup : public Commander,
class CommandXTrim : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
- bool eq_sign_found = false;
+ bool modifier_found = false;
auto trim_strategy = util::ToLower(args[2]);
if (trim_strategy == "maxlen") {
strategy_ = StreamTrimStrategy::MaxLen;
size_t max_len_idx = 0;
- if (args[3] != "=") {
+ if (args[3] != "=" && args[3] != "~") {
max_len_idx = 3;
} else {
max_len_idx = 4;
- eq_sign_found = true;
+ modifier_found = true;
}
if (max_len_idx >= args.size()) {
@@ -1762,11 +1762,11 @@ class CommandXTrim : public Commander {
strategy_ = StreamTrimStrategy::MinID;
size_t min_id_idx = 0;
- if (args[3] != "=") {
+ if (args[3] != "=" && args[3] != "~") {
min_id_idx = 3;
} else {
min_id_idx = 4;
- eq_sign_found = true;
+ modifier_found = true;
}
if (min_id_idx >= args.size()) {
@@ -1782,7 +1782,7 @@ class CommandXTrim : public Commander {
}
bool limit_option_found = false;
- if (eq_sign_found) {
+ if (modifier_found) {
if (args.size() > 6 && util::ToLower(args[5]) == "limit") {
limit_option_found = true;
}
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 4245c59ae..4421f2156 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -182,6 +182,29 @@ var streamTests = func(t *testing.T, configs
util.KvrocksServerConfigs) {
require.EqualValues(t, 5, rdb.XLen(ctx, "mystream").Val())
})
+ t.Run("XADD with MAXLEN option and the '~' argument", func(t
*testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ for i := 0; i < 1000; i++ {
+ if rand.Float64() < 0.9 {
+ require.NoError(t, rdb.Do(ctx, "XADD",
"mystream", "MAXLEN", "~", "5", "*", "xitem", "i").Err())
+ } else {
+ require.NoError(t, rdb.Do(ctx, "XADD",
"mystream", "MAXLEN", "~", "5", "*", "yitem", "i").Err())
+ }
+ }
+ require.EqualValues(t, 5, rdb.XLen(ctx, "mystream").Val())
+ })
+
+ t.Run("XADD with MINID option and the '~' argument", func(t *testing.T)
{
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ for i := 1; i <= 10; i++ {
+ require.NoError(t, rdb.XAdd(ctx,
&redis.XAddArgs{Stream: "mystream", ID: strconv.Itoa(i) + "-0", Values:
[]string{"f", "v"}}).Err())
+ }
+ require.NoError(t, rdb.Do(ctx, "XADD", "mystream", "MINID",
"~", "8-0", "11-0", "f", "v").Err())
+ items := rdb.XRange(ctx, "mystream", "-", "+").Val()
+ require.Len(t, items, 4)
+ require.EqualValues(t, "8-0", items[0].ID)
+ })
+
t.Run("XADD with NOMKSTREAM option", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
require.Empty(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", NoMkStream: true, Values: []string{"item", "1", "value",
"a"}}).Val())
@@ -605,6 +628,30 @@ var streamTests = func(t *testing.T, configs
util.KvrocksServerConfigs) {
require.EqualValues(t, 555, rdb.XLen(ctx, "mystream").Val())
})
+ t.Run("XTRIM with MAXLEN option and the '~' argument", func(t
*testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ for i := 0; i < 1000; i++ {
+ require.NoError(t, rdb.XAdd(ctx,
&redis.XAddArgs{Stream: "mystream", Values: map[string]interface{}{"xitem":
i}}).Err())
+ }
+ require.NoError(t, rdb.Do(ctx, "XTRIM", "mystream", "MAXLEN",
"~", "666").Err())
+ require.EqualValues(t, 666, rdb.XLen(ctx, "mystream").Val())
+ })
+
+ t.Run("XTRIM with MINID option and the '~' argument", func(t
*testing.T) {
+ require.NoError(t, rdb.Del(ctx, "mystream").Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "1-0", Values: []string{"f", "v"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "2-0", Values: []string{"f", "v"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "3-0", Values: []string{"f", "v"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "4-0", Values: []string{"f", "v"}}).Err())
+ require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{Stream:
"mystream", ID: "5-0", Values: []string{"f", "v"}}).Err())
+ require.NoError(t, rdb.Do(ctx, "XTRIM", "mystream", "MINID",
"~", "3-0").Err())
+ items := rdb.XRange(ctx, "mystream", "-", "+").Val()
+ require.Len(t, items, 3)
+ require.EqualValues(t, "3-0", items[0].ID)
+ require.EqualValues(t, "4-0", items[1].ID)
+ require.EqualValues(t, "5-0", items[2].ID)
+ })
+
t.Run("XADD with LIMIT consecutive calls", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "mystream").Err())
for i := 0; i < 100; i++ {