This is an automated email from the ASF dual-hosted git repository.
jihuayu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new edb033e5e fix(stream): align XPENDING ID parsing and PEL range with
XRANGE (incomplete IDs, exclusive bounds) (#3437)
edb033e5e is described below
commit edb033e5eb910c8663b69432096a704885430354
Author: Songqing Zhang <[email protected]>
AuthorDate: Tue Apr 28 10:25:15 2026 +0800
fix(stream): align XPENDING ID parsing and PEL range with XRANGE
(incomplete IDs, exclusive bounds) (#3437)
XPENDING extended form must use the same start/end ID rules as XRANGE:
parse the end with ParseRangeEnd so a bare millisecond includes all
sequence numbers in that ms; accept '(' for exclusive start/end and
honor them when scanning the pending entry list.
Refs: https://redis.io/docs/latest/commands/xpending/ ("in a similar way
we do it with XRANGE")
Refs: https://redis.io/docs/latest/commands/xrange/ ("closed interval" /
inclusive unless '(' prefix)
---------
Co-authored-by: 纪华裕 <[email protected]>
---
src/commands/cmd_stream.cc | 41 ++++++--
src/types/redis_stream.cc | 35 ++++++-
src/types/redis_stream_base.h | 2 +
tests/gocase/unit/type/stream/stream_test.go | 140 +++++++++++++++++++++++++++
4 files changed, 208 insertions(+), 10 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index ae84e4aca..73a5279c8 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -853,21 +853,48 @@ class CommandXPending : public Commander {
}
if (parser.Good()) {
- std::string start_id, end_id;
- start_id = GET_OR_RET(parser.TakeStr());
- end_id = GET_OR_RET(parser.TakeStr());
- if (start_id != "-") {
- auto s = ParseStreamEntryID(start_id, &options_.start_id);
+ const std::string start_str = GET_OR_RET(parser.TakeStr());
+ const std::string end_str = GET_OR_RET(parser.TakeStr());
+
+ // Extended XPENDING uses the same ID range rules as XRANGE (see Redis
documentation).
+ if (start_str == "-") {
+ options_.start_id = StreamEntryID::Minimum();
+ options_.exclude_start = false;
+ } else if (!start_str.empty() && start_str[0] == '(') {
+ options_.exclude_start = true;
+ auto s = ParseRangeStart(start_str.substr(1), &options_.start_id);
if (!s.IsOK()) {
return s;
}
+ } else if (start_str == "+") {
+ options_.start_id = StreamEntryID::Maximum();
+ options_.exclude_start = false;
+ } else {
+ auto s = ParseRangeStart(start_str, &options_.start_id);
+ if (!s.IsOK()) {
+ return s;
+ }
+ options_.exclude_start = false;
}
- if (end_id != "+") {
- auto s = ParseStreamEntryID(start_id, &options_.end_id);
+ if (end_str == "+") {
+ options_.end_id = StreamEntryID::Maximum();
+ options_.exclude_end = false;
+ } else if (!end_str.empty() && end_str[0] == '(') {
+ options_.exclude_end = true;
+ auto s = ParseRangeEnd(end_str.substr(1), &options_.end_id);
+ if (!s.IsOK()) {
+ return s;
+ }
+ } else if (end_str == "-") {
+ options_.end_id = StreamEntryID::Minimum();
+ options_.exclude_end = false;
+ } else {
+ auto s = ParseRangeEnd(end_str, &options_.end_id);
if (!s.IsOK()) {
return s;
}
+ options_.exclude_end = false;
}
options_.count = GET_OR_RET(parser.TakeInt<uint64_t>());
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 6dc47fba2..580629c9f 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -1751,11 +1751,40 @@ rocksdb::Status
Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt
return s.IsNotFound() ? rocksdb::Status::OK() : s;
}
- std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, options.start_id);
- std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, options.end_id);
+ StreamEntryID scan_start_id = options.start_id;
+ if (options.exclude_start) {
+ Status inc_st = IncrementStreamEntryID(&scan_start_id);
+ if (!inc_st.IsOK()) {
+ pending_infos.pending_number = 0;
+ pending_infos.first_entry_id = StreamEntryID::Maximum();
+ pending_infos.last_entry_id = StreamEntryID::Minimum();
+ return rocksdb::Status::OK();
+ }
+ }
+ std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, scan_start_id);
+ // XPENDING extended form follows XRANGE-style ranges (closed interval by
default; '(' excludes an endpoint).
+ // RocksDB iterate_upper_bound is exclusive.
+ std::string end_key_exclusive;
+ if (options.end_id == StreamEntryID::Maximum()) {
+ // "+" means no upper ID limit; place the bound after all keys in this
stream metadata version
+ // (same pattern as Stream::trim). A PEL-specific key cannot be "past"
maximum ID.
+ end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ } else if (options.exclude_end) {
+ // Exclusive end: iterator keys must be strictly before end_id.
+ end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, options.end_id);
+ } else {
+ StreamEntryID end_next = options.end_id;
+ Status inc_st = IncrementStreamEntryID(&end_next);
+ if (!inc_st.IsOK()) {
+ end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
+ } else {
+ // Next ID after end_id in PEL key space — exclusive iterator upper
bound (XRANGE closed end).
+ end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata,
group_name, end_next);
+ }
+ }
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
- rocksdb::Slice upper_bound(end_key);
+ rocksdb::Slice upper_bound(end_key_exclusive);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix_key);
read_options.iterate_lower_bound = &lower_bound;
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index e2d1998dc..f7991f573 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -245,6 +245,8 @@ struct StreamPendingOptions {
StreamEntryID start_id{StreamEntryID::Minimum()};
StreamEntryID end_id{StreamEntryID::Maximum()};
+ bool exclude_start = false;
+ bool exclude_end = false;
uint64_t count;
bool with_count = false;
diff --git a/tests/gocase/unit/type/stream/stream_test.go
b/tests/gocase/unit/type/stream/stream_test.go
index 1590847af..00354848c 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -2503,6 +2503,146 @@ func TestStreamOffset(t *testing.T) {
require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE",
"mystream").Err(), "wrong number of arguments")
require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream",
"+").Err(), "wrong number of arguments")
})
+
+ t.Run("XPENDING with specific end ID should filter correctly", func(t
*testing.T) {
+ streamKey := "xpending-endid-test"
+ group := "grp"
+ consumer := "con"
+ require.NoError(t, rdb.Del(ctx, streamKey).Err())
+
+ id1, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey,
Values: []string{"f", "1"}}).Result()
+ require.NoError(t, err)
+ id2, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey,
Values: []string{"f", "2"}}).Result()
+ require.NoError(t, err)
+ id3, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey,
Values: []string{"f", "3"}}).Result()
+ require.NoError(t, err)
+
+ require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey,
group, "0").Err())
+ // Read all entries so they become pending
+ _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group:
group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count:
10}).Result()
+ require.NoError(t, err)
+
+ // XPENDING extended form: each row is [id, consumer, idle_ms,
delivery_count].
+ assertXPendingExtRow := func(t *testing.T, row interface{},
wantID string) {
+ t.Helper()
+ fields, ok := row.([]interface{})
+ require.True(t, ok)
+ require.Len(t, fields, 4)
+ gotID, ok := fields[0].(string)
+ require.True(t, ok)
+ require.Equal(t, wantID, gotID)
+ gotConsumer, ok := fields[1].(string)
+ require.True(t, ok)
+ require.Equal(t, consumer, gotConsumer)
+ require.GreaterOrEqual(t, fields[2], int64(0))
+ require.EqualValues(t, 1, fields[3])
+ }
+
+ // XPENDING extended form: same ID range rules as XRANGE (see
Redis docs). Use XPENDING with end_id = id1.
+ result, err := rdb.Do(ctx, "XPENDING", streamKey, group, id1,
id1, "10").Result()
+ require.NoError(t, err)
+ entries, ok := result.([]interface{})
+ require.True(t, ok)
+ require.Len(t, entries, 1, "XPENDING with end_id=id1 should
return only 1 entry")
+ assertXPendingExtRow(t, entries[0], id1)
+
+ // Use XPENDING with range [id1, id2] (should return 2 entries).
+ result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1,
id2, "10").Result()
+ require.NoError(t, err)
+ entries, ok = result.([]interface{})
+ require.True(t, ok)
+ require.Len(t, entries, 2, "XPENDING with range [id1,id2]
should return 2 entries")
+ assertXPendingExtRow(t, entries[0], id1)
+ assertXPendingExtRow(t, entries[1], id2)
+
+ // Use XPENDING with range [id1, id3] (should return all 3
entries).
+ result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1,
id3, "10").Result()
+ require.NoError(t, err)
+ entries, ok = result.([]interface{})
+ require.True(t, ok)
+ require.Len(t, entries, 3, "XPENDING with range [id1,id3]
should return 3 entries")
+ assertXPendingExtRow(t, entries[0], id1)
+ assertXPendingExtRow(t, entries[1], id2)
+ assertXPendingExtRow(t, entries[2], id3)
+
+ require.NoError(t, rdb.Del(ctx, streamKey).Err())
+ })
+
+ t.Run("XPENDING with incomplete end ID should include the whole
millisecond", func(t *testing.T) {
+ streamKey := "xpending-incomplete-end-test"
+ group := "grp"
+ consumer := "con"
+ ids := []string{"1-0", "1-1", "1-2"}
+
+ require.NoError(t, rdb.Del(ctx, streamKey).Err())
+ for i, id := range ids {
+ require.NoError(t, rdb.XAdd(ctx,
&redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f",
strconv.Itoa(i)}}).Err())
+ }
+
+ require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey,
group, "0").Err())
+ _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group:
group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count:
10}).Result()
+ require.NoError(t, err)
+
+ result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "1",
"1", "10").Result()
+ require.NoError(t, err)
+ entries, ok := result.([]interface{})
+ require.True(t, ok)
+ require.Len(t, entries, 3, "XPENDING 1 1 should include every
pending entry in millisecond 1, matching Redis")
+
+ for i, entry := range entries {
+ fields, ok := entry.([]interface{})
+ require.True(t, ok)
+ require.Len(t, fields, 4)
+ gotID, ok := fields[0].(string)
+ require.True(t, ok)
+ require.Equal(t, ids[i], gotID)
+ gotConsumer, ok := fields[1].(string)
+ require.True(t, ok)
+ require.Equal(t, consumer, gotConsumer)
+ require.GreaterOrEqual(t, fields[2], int64(0))
+ require.EqualValues(t, 1, fields[3])
+ }
+
+ require.NoError(t, rdb.Del(ctx, streamKey).Err())
+ })
+
+ t.Run("XPENDING with exclusive start should match Redis", func(t
*testing.T) {
+ streamKey := "xpending-exclusive-start-test"
+ group := "grp"
+ consumer := "con"
+ ids := []string{"1-0", "1-1", "1-2"}
+
+ require.NoError(t, rdb.Del(ctx, streamKey).Err())
+ for i, id := range ids {
+ require.NoError(t, rdb.XAdd(ctx,
&redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f",
strconv.Itoa(i)}}).Err())
+ }
+
+ require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey,
group, "0").Err())
+ _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group:
group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count:
10}).Result()
+ require.NoError(t, err)
+
+ result, err := rdb.Do(ctx, "XPENDING", streamKey, group,
"(1-0", "+", "10").Result()
+ require.NoError(t, err)
+ entries, ok := result.([]interface{})
+ require.True(t, ok)
+ require.Len(t, entries, 2, "XPENDING (1-0 + 10 should exclude
the first pending entry, matching Redis")
+
+ for i, entry := range entries {
+ fields, ok := entry.([]interface{})
+ require.True(t, ok)
+ require.Len(t, fields, 4)
+ gotID, ok := fields[0].(string)
+ require.True(t, ok)
+ require.Equal(t, ids[i+1], gotID)
+ gotConsumer, ok := fields[1].(string)
+ require.True(t, ok)
+ require.Equal(t, consumer, gotConsumer)
+ require.GreaterOrEqual(t, fields[2], int64(0))
+ require.EqualValues(t, 1, fields[3])
+ }
+
+ require.NoError(t, rdb.Del(ctx, streamKey).Err())
+ })
}
func parseStreamEntryID(id string) (ts int64, seqNum int64) {