This is an automated email from the ASF dual-hosted git repository.

jihuayu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new edb033e5e fix(stream): align XPENDING ID parsing and PEL range with 
XRANGE (incomplete IDs, exclusive bounds) (#3437)
edb033e5e is described below

commit edb033e5eb910c8663b69432096a704885430354
Author: Songqing Zhang <[email protected]>
AuthorDate: Tue Apr 28 10:25:15 2026 +0800

    fix(stream): align XPENDING ID parsing and PEL range with XRANGE 
(incomplete IDs, exclusive bounds) (#3437)
    
    XPENDING extended form must use the same start/end ID rules as XRANGE:
    parse the end with ParseRangeEnd so a bare millisecond includes all
    sequence numbers in that ms; accept '(' for exclusive start/end and
    honor them when scanning the pending entry list.
    
    Refs: https://redis.io/docs/latest/commands/xpending/ ("in a similar way
    we do it with XRANGE")
    Refs: https://redis.io/docs/latest/commands/xrange/ ("closed interval" /
    inclusive unless '(' prefix)
    
    ---------
    
    Co-authored-by: 纪华裕 <[email protected]>
---
 src/commands/cmd_stream.cc                   |  41 ++++++--
 src/types/redis_stream.cc                    |  35 ++++++-
 src/types/redis_stream_base.h                |   2 +
 tests/gocase/unit/type/stream/stream_test.go | 140 +++++++++++++++++++++++++++
 4 files changed, 208 insertions(+), 10 deletions(-)

diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index ae84e4aca..73a5279c8 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -853,21 +853,48 @@ class CommandXPending : public Commander {
     }
 
     if (parser.Good()) {
-      std::string start_id, end_id;
-      start_id = GET_OR_RET(parser.TakeStr());
-      end_id = GET_OR_RET(parser.TakeStr());
-      if (start_id != "-") {
-        auto s = ParseStreamEntryID(start_id, &options_.start_id);
+      const std::string start_str = GET_OR_RET(parser.TakeStr());
+      const std::string end_str = GET_OR_RET(parser.TakeStr());
+
+      // Extended XPENDING uses the same ID range rules as XRANGE (see Redis 
documentation).
+      if (start_str == "-") {
+        options_.start_id = StreamEntryID::Minimum();
+        options_.exclude_start = false;
+      } else if (!start_str.empty() && start_str[0] == '(') {
+        options_.exclude_start = true;
+        auto s = ParseRangeStart(start_str.substr(1), &options_.start_id);
         if (!s.IsOK()) {
           return s;
         }
+      } else if (start_str == "+") {
+        options_.start_id = StreamEntryID::Maximum();
+        options_.exclude_start = false;
+      } else {
+        auto s = ParseRangeStart(start_str, &options_.start_id);
+        if (!s.IsOK()) {
+          return s;
+        }
+        options_.exclude_start = false;
       }
 
-      if (end_id != "+") {
-        auto s = ParseStreamEntryID(start_id, &options_.end_id);
+      if (end_str == "+") {
+        options_.end_id = StreamEntryID::Maximum();
+        options_.exclude_end = false;
+      } else if (!end_str.empty() && end_str[0] == '(') {
+        options_.exclude_end = true;
+        auto s = ParseRangeEnd(end_str.substr(1), &options_.end_id);
+        if (!s.IsOK()) {
+          return s;
+        }
+      } else if (end_str == "-") {
+        options_.end_id = StreamEntryID::Minimum();
+        options_.exclude_end = false;
+      } else {
+        auto s = ParseRangeEnd(end_str, &options_.end_id);
         if (!s.IsOK()) {
           return s;
         }
+        options_.exclude_end = false;
       }
 
       options_.count = GET_OR_RET(parser.TakeInt<uint64_t>());
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index 6dc47fba2..580629c9f 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -1751,11 +1751,40 @@ rocksdb::Status 
Stream::GetPendingEntries(engine::Context &ctx, StreamPendingOpt
     return s.IsNotFound() ? rocksdb::Status::OK() : s;
   }
 
-  std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, options.start_id);
-  std::string end_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, options.end_id);
+  StreamEntryID scan_start_id = options.start_id;
+  if (options.exclude_start) {
+    Status inc_st = IncrementStreamEntryID(&scan_start_id);
+    if (!inc_st.IsOK()) {
+      pending_infos.pending_number = 0;
+      pending_infos.first_entry_id = StreamEntryID::Maximum();
+      pending_infos.last_entry_id = StreamEntryID::Minimum();
+      return rocksdb::Status::OK();
+    }
+  }
+  std::string prefix_key = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, scan_start_id);
+  // XPENDING extended form follows XRANGE-style ranges (closed interval by 
default; '(' excludes an endpoint).
+  // RocksDB iterate_upper_bound is exclusive.
+  std::string end_key_exclusive;
+  if (options.end_id == StreamEntryID::Maximum()) {
+    // "+" means no upper ID limit; place the bound after all keys in this 
stream metadata version
+    // (same pattern as Stream::trim). A PEL-specific key cannot be "past" 
maximum ID.
+    end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
+  } else if (options.exclude_end) {
+    // Exclusive end: iterator keys must be strictly before end_id.
+    end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, options.end_id);
+  } else {
+    StreamEntryID end_next = options.end_id;
+    Status inc_st = IncrementStreamEntryID(&end_next);
+    if (!inc_st.IsOK()) {
+      end_key_exclusive = InternalKey(ns_key, "", metadata.version + 1, 
storage_->IsSlotIdEncoded()).Encode();
+    } else {
+      // Next ID after end_id in PEL key space — exclusive iterator upper 
bound (XRANGE closed end).
+      end_key_exclusive = internalPelKeyFromGroupAndEntryId(ns_key, metadata, 
group_name, end_next);
+    }
+  }
 
   rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
-  rocksdb::Slice upper_bound(end_key);
+  rocksdb::Slice upper_bound(end_key_exclusive);
   read_options.iterate_upper_bound = &upper_bound;
   rocksdb::Slice lower_bound(prefix_key);
   read_options.iterate_lower_bound = &lower_bound;
diff --git a/src/types/redis_stream_base.h b/src/types/redis_stream_base.h
index e2d1998dc..f7991f573 100644
--- a/src/types/redis_stream_base.h
+++ b/src/types/redis_stream_base.h
@@ -245,6 +245,8 @@ struct StreamPendingOptions {
 
   StreamEntryID start_id{StreamEntryID::Minimum()};
   StreamEntryID end_id{StreamEntryID::Maximum()};
+  bool exclude_start = false;
+  bool exclude_end = false;
 
   uint64_t count;
   bool with_count = false;
diff --git a/tests/gocase/unit/type/stream/stream_test.go 
b/tests/gocase/unit/type/stream/stream_test.go
index 1590847af..00354848c 100644
--- a/tests/gocase/unit/type/stream/stream_test.go
+++ b/tests/gocase/unit/type/stream/stream_test.go
@@ -2503,6 +2503,146 @@ func TestStreamOffset(t *testing.T) {
                require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", 
"mystream").Err(), "wrong number of arguments")
                require.ErrorContains(t, rdb.Do(ctx, "XREVRANGE", "mystream", 
"+").Err(), "wrong number of arguments")
        })
+
+       t.Run("XPENDING with specific end ID should filter correctly", func(t 
*testing.T) {
+               streamKey := "xpending-endid-test"
+               group := "grp"
+               consumer := "con"
+               require.NoError(t, rdb.Del(ctx, streamKey).Err())
+
+               id1, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, 
Values: []string{"f", "1"}}).Result()
+               require.NoError(t, err)
+               id2, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, 
Values: []string{"f", "2"}}).Result()
+               require.NoError(t, err)
+               id3, err := rdb.XAdd(ctx, &redis.XAddArgs{Stream: streamKey, 
Values: []string{"f", "3"}}).Result()
+               require.NoError(t, err)
+
+               require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, 
group, "0").Err())
+               // Read all entries so they become pending
+               _, err = rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: 
group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 
10}).Result()
+               require.NoError(t, err)
+
+               // XPENDING extended form: each row is [id, consumer, idle_ms, 
delivery_count].
+               assertXPendingExtRow := func(t *testing.T, row interface{}, 
wantID string) {
+                       t.Helper()
+                       fields, ok := row.([]interface{})
+                       require.True(t, ok)
+                       require.Len(t, fields, 4)
+                       gotID, ok := fields[0].(string)
+                       require.True(t, ok)
+                       require.Equal(t, wantID, gotID)
+                       gotConsumer, ok := fields[1].(string)
+                       require.True(t, ok)
+                       require.Equal(t, consumer, gotConsumer)
+                       require.GreaterOrEqual(t, fields[2], int64(0))
+                       require.EqualValues(t, 1, fields[3])
+               }
+
+               // XPENDING extended form: same ID range rules as XRANGE (see 
Redis docs). Use XPENDING with end_id = id1.
+               result, err := rdb.Do(ctx, "XPENDING", streamKey, group, id1, 
id1, "10").Result()
+               require.NoError(t, err)
+               entries, ok := result.([]interface{})
+               require.True(t, ok)
+               require.Len(t, entries, 1, "XPENDING with end_id=id1 should 
return only 1 entry")
+               assertXPendingExtRow(t, entries[0], id1)
+
+               // Use XPENDING with range [id1, id2] (should return 2 entries).
+               result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, 
id2, "10").Result()
+               require.NoError(t, err)
+               entries, ok = result.([]interface{})
+               require.True(t, ok)
+               require.Len(t, entries, 2, "XPENDING with range [id1,id2] 
should return 2 entries")
+               assertXPendingExtRow(t, entries[0], id1)
+               assertXPendingExtRow(t, entries[1], id2)
+
+               // Use XPENDING with range [id1, id3] (should return all 3 
entries).
+               result, err = rdb.Do(ctx, "XPENDING", streamKey, group, id1, 
id3, "10").Result()
+               require.NoError(t, err)
+               entries, ok = result.([]interface{})
+               require.True(t, ok)
+               require.Len(t, entries, 3, "XPENDING with range [id1,id3] 
should return 3 entries")
+               assertXPendingExtRow(t, entries[0], id1)
+               assertXPendingExtRow(t, entries[1], id2)
+               assertXPendingExtRow(t, entries[2], id3)
+
+               require.NoError(t, rdb.Del(ctx, streamKey).Err())
+       })
+
+       t.Run("XPENDING with incomplete end ID should include the whole 
millisecond", func(t *testing.T) {
+               streamKey := "xpending-incomplete-end-test"
+               group := "grp"
+               consumer := "con"
+               ids := []string{"1-0", "1-1", "1-2"}
+
+               require.NoError(t, rdb.Del(ctx, streamKey).Err())
+               for i, id := range ids {
+                       require.NoError(t, rdb.XAdd(ctx, 
&redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", 
strconv.Itoa(i)}}).Err())
+               }
+
+               require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, 
group, "0").Err())
+               _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: 
group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 
10}).Result()
+               require.NoError(t, err)
+
+               result, err := rdb.Do(ctx, "XPENDING", streamKey, group, "1", 
"1", "10").Result()
+               require.NoError(t, err)
+               entries, ok := result.([]interface{})
+               require.True(t, ok)
+               require.Len(t, entries, 3, "XPENDING 1 1 should include every 
pending entry in millisecond 1, matching Redis")
+
+               for i, entry := range entries {
+                       fields, ok := entry.([]interface{})
+                       require.True(t, ok)
+                       require.Len(t, fields, 4)
+                       gotID, ok := fields[0].(string)
+                       require.True(t, ok)
+                       require.Equal(t, ids[i], gotID)
+                       gotConsumer, ok := fields[1].(string)
+                       require.True(t, ok)
+                       require.Equal(t, consumer, gotConsumer)
+                       require.GreaterOrEqual(t, fields[2], int64(0))
+                       require.EqualValues(t, 1, fields[3])
+               }
+
+               require.NoError(t, rdb.Del(ctx, streamKey).Err())
+       })
+
+       t.Run("XPENDING with exclusive start should match Redis", func(t 
*testing.T) {
+               streamKey := "xpending-exclusive-start-test"
+               group := "grp"
+               consumer := "con"
+               ids := []string{"1-0", "1-1", "1-2"}
+
+               require.NoError(t, rdb.Del(ctx, streamKey).Err())
+               for i, id := range ids {
+                       require.NoError(t, rdb.XAdd(ctx, 
&redis.XAddArgs{Stream: streamKey, ID: id, Values: []string{"f", 
strconv.Itoa(i)}}).Err())
+               }
+
+               require.NoError(t, rdb.XGroupCreateMkStream(ctx, streamKey, 
group, "0").Err())
+               _, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{Group: 
group, Consumer: consumer, Streams: []string{streamKey, ">"}, Count: 
10}).Result()
+               require.NoError(t, err)
+
+               result, err := rdb.Do(ctx, "XPENDING", streamKey, group, 
"(1-0", "+", "10").Result()
+               require.NoError(t, err)
+               entries, ok := result.([]interface{})
+               require.True(t, ok)
+               require.Len(t, entries, 2, "XPENDING (1-0 + 10 should exclude 
the first pending entry, matching Redis")
+
+               for i, entry := range entries {
+                       fields, ok := entry.([]interface{})
+                       require.True(t, ok)
+                       require.Len(t, fields, 4)
+                       gotID, ok := fields[0].(string)
+                       require.True(t, ok)
+                       require.Equal(t, ids[i+1], gotID)
+                       gotConsumer, ok := fields[1].(string)
+                       require.True(t, ok)
+                       require.Equal(t, consumer, gotConsumer)
+                       require.GreaterOrEqual(t, fields[2], int64(0))
+                       require.EqualValues(t, 1, fields[3])
+               }
+
+               require.NoError(t, rdb.Del(ctx, streamKey).Err())
+       })
 }
 
 func parseStreamEntryID(id string) (ts int64, seqNum int64) {

Reply via email to