This is an automated email from the ASF dual-hosted git repository.
jihuayu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new fdcfd437f fix(stream): correct subkey prefix encoding in DestroyGroup
(#3420)
fdcfd437f is described below
commit fdcfd437fa835df6c919099aae9a3160d389ed11
Author: Songqing Zhang <[email protected]>
AuthorDate: Wed Apr 8 10:25:44 2026 +0800
fix(stream): correct subkey prefix encoding in DestroyGroup (#3420)
DestroyGroup was constructing scan prefixes using only
PutFixed64(group_name.size()) + group_name, which does not match the
actual internal key format used by
[internalKeyFromGroupName](https://github.com/apache/kvrocks/blob/da5e46307047b3e89753f0c9c02996430e758a5f/src/types/redis_stream.cc#L171),
internalKeyFromConsumerName, and internalPelKeyFromGroupAndEntryId.
The real keys are prefixed with PutFixed64(UINT64_MAX) + PutFixed8(type)
before the group name length and name. The wrong prefix meant the scan
range could miss all group/consumer/PEL keys or match unrelated stream
entry keys, causing data leaks or corruption on XGROUP DESTROY.
Fix by iterating over the three subkey types (GroupMetadata,
ConsumerMetadata, PelEntry) and building the correct prefix for each,
consistent with the internal key encoding helpers.
Besides, add two test cases verifying that DestroyGroup correctly cleans
up all
group-related subkeys (group metadata, consumer metadata, PEL entries):
- DestroyGroupCleansUpConsumersAndPelEntries: verifies full cleanup
including that stream entries are preserved and the group can be
re-created cleanly.
- DestroyGroupDoesNotAffectOtherGroups: verifies that destroying one
group does not affect another group's consumers or PEL entries.
---
src/commands/cmd_stream.cc | 10 +--
src/types/redis_stream.cc | 55 ++++++------
src/types/redis_stream.h | 2 +-
tests/cppunit/types/stream_test.cc | 178 +++++++++++++++++++++++++++++++++++--
4 files changed, 205 insertions(+), 40 deletions(-)
diff --git a/src/commands/cmd_stream.cc b/src/commands/cmd_stream.cc
index 5a2d52967..e98f324f5 100644
--- a/src/commands/cmd_stream.cc
+++ b/src/commands/cmd_stream.cc
@@ -566,17 +566,13 @@ class CommandXGroup : public Commander {
}
if (subcommand_ == "destroy") {
- uint64_t delete_cnt = 0;
- auto s = stream_db.DestroyGroup(ctx, stream_name_, group_name_,
&delete_cnt);
+ bool destroyed = false;
+ auto s = stream_db.DestroyGroup(ctx, stream_name_, group_name_,
&destroyed);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
- if (delete_cnt > 0) {
- *output = redis::Integer(1);
- } else {
- *output = redis::Integer(0);
- }
+ *output = redis::Integer(destroyed ? 1 : 0);
}
if (subcommand_ == "createconsumer") {
diff --git a/src/types/redis_stream.cc b/src/types/redis_stream.cc
index e7082648b..60606df08 100644
--- a/src/types/redis_stream.cc
+++ b/src/types/redis_stream.cc
@@ -27,6 +27,7 @@
#include <vector>
#include "db_util.h"
+#include "string_util.h"
#include "time_util.h"
namespace redis {
@@ -737,8 +738,8 @@ rocksdb::Status Stream::CreateGroup(engine::Context &ctx,
const Slice &stream_na
}
rocksdb::Status Stream::DestroyGroup(engine::Context &ctx, const Slice
&stream_name, const std::string &group_name,
- uint64_t *delete_cnt) {
- *delete_cnt = 0;
+ bool *destroyed) {
+ *destroyed = false;
std::string ns_key = AppendNamespacePrefix(stream_name);
StreamMetadata metadata;
@@ -751,42 +752,44 @@ rocksdb::Status Stream::DestroyGroup(engine::Context
&ctx, const Slice &stream_n
return
rocksdb::Status::InvalidArgument(errXGroupSubcommandRequiresKeyExist);
}
+ std::string group_key = internalKeyFromGroupName(ns_key, metadata,
group_name);
+ std::string val;
+ s = storage_->Get(ctx, ctx.GetReadOptions(), stream_cf_handle_, group_key,
&val);
+ if (s.IsNotFound()) {
+ return rocksdb::Status::OK();
+ }
+ if (!s.ok()) return s;
+
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisStream);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
- std::string sub_key_prefix;
- PutFixed64(&sub_key_prefix, group_name.size());
- sub_key_prefix += group_name;
- std::string next_version_prefix_key =
- InternalKey(ns_key, sub_key_prefix, metadata.version + 1,
storage_->IsSlotIdEncoded()).Encode();
- std::string prefix_key = InternalKey(ns_key, sub_key_prefix,
metadata.version, storage_->IsSlotIdEncoded()).Encode();
+ for (auto type : {StreamSubkeyType::StreamConsumerGroupMetadata,
StreamSubkeyType::StreamConsumerMetadata,
+ StreamSubkeyType::StreamPelEntry}) {
+ std::string sub_key_prefix;
+ PutFixed64(&sub_key_prefix, UINT64_MAX);
+ PutFixed8(&sub_key_prefix, static_cast<uint8_t>(type));
+ PutFixed64(&sub_key_prefix, group_name.size());
+ sub_key_prefix += group_name;
- rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
- rocksdb::Slice upper_bound(next_version_prefix_key);
- read_options.iterate_upper_bound = &upper_bound;
- rocksdb::Slice lower_bound(prefix_key);
- read_options.iterate_lower_bound = &lower_bound;
+ std::string prefix_key =
+ InternalKey(ns_key, sub_key_prefix, metadata.version,
storage_->IsSlotIdEncoded()).Encode();
+ std::string end_key =
+ InternalKey(ns_key, util::StringNext(sub_key_prefix),
metadata.version, storage_->IsSlotIdEncoded()).Encode();
- auto iter = util::UniqueIterator(ctx, read_options, stream_cf_handle_);
- for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
- s = batch->Delete(stream_cf_handle_, iter->key());
+ s = batch->DeleteRange(stream_cf_handle_, prefix_key, end_key);
if (!s.ok()) return s;
- *delete_cnt += 1;
- }
-
- if (auto s = iter->status(); !s.ok()) {
- return s;
}
- if (*delete_cnt != 0) {
+ *destroyed = true;
+ if (metadata.group_number > 0) {
metadata.group_number -= 1;
- std::string metadata_bytes;
- metadata.Encode(&metadata_bytes);
- s = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
- if (!s.ok()) return s;
}
+ std::string metadata_bytes;
+ metadata.Encode(&metadata_bytes);
+ s = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
+ if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(),
batch->GetWriteBatch());
}
diff --git a/src/types/redis_stream.h b/src/types/redis_stream.h
index 5496de59a..08cae0b1d 100644
--- a/src/types/redis_stream.h
+++ b/src/types/redis_stream.h
@@ -42,7 +42,7 @@ class Stream : public SubKeyScanner {
rocksdb::Status CreateGroup(engine::Context &ctx, const Slice &stream_name,
const StreamXGroupCreateOptions &options,
const std::string &group_name);
rocksdb::Status DestroyGroup(engine::Context &ctx, const Slice &stream_name,
const std::string &group_name,
- uint64_t *delete_cnt);
+ bool *destroyed);
rocksdb::Status CreateConsumer(engine::Context &ctx, const Slice
&stream_name, const std::string &group_name,
const std::string &consumer_name, int
*created_number);
rocksdb::Status DestroyConsumer(engine::Context &ctx, const Slice
&stream_name, const std::string &group_name,
diff --git a/tests/cppunit/types/stream_test.cc
b/tests/cppunit/types/stream_test.cc
index c09574dc0..96c22c543 100644
--- a/tests/cppunit/types/stream_test.cc
+++ b/tests/cppunit/types/stream_test.cc
@@ -2518,10 +2518,176 @@ TEST_F(RedisStreamTest,
StreamConsumerGroupCreateAndDestroy) {
std::string group_name = "TestGroup";
auto s = stream_->CreateGroup(*ctx_, stream_name, create_options,
group_name);
EXPECT_TRUE(s.ok());
- uint64_t delete_cnt = 0;
- s = stream_->DestroyGroup(*ctx_, stream_name, group_name, &delete_cnt);
- EXPECT_TRUE(delete_cnt != 0);
- delete_cnt = 0;
- s = stream_->DestroyGroup(*ctx_, stream_name, group_name, &delete_cnt);
- EXPECT_TRUE(delete_cnt == 0);
+ std::vector<std::pair<std::string, redis::StreamConsumerGroupMetadata>>
group_metadata;
+ s = stream_->GetGroupInfo(*ctx_, stream_name, group_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(group_metadata.size(), 1);
+ EXPECT_EQ(group_metadata[0].first, group_name);
+
+ bool destroyed = false;
+ s = stream_->DestroyGroup(*ctx_, stream_name, group_name, &destroyed);
+ EXPECT_TRUE(s.ok());
+ EXPECT_TRUE(destroyed);
+ group_metadata.clear();
+ s = stream_->GetGroupInfo(*ctx_, stream_name, group_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(group_metadata.size(), 0);
+
+ destroyed = false;
+ s = stream_->DestroyGroup(*ctx_, stream_name, group_name, &destroyed);
+ EXPECT_FALSE(destroyed);
+ group_metadata.clear();
+ s = stream_->GetGroupInfo(*ctx_, stream_name, group_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(group_metadata.size(), 0);
+}
+
+TEST_F(RedisStreamTest, DestroyGroupCleansUpConsumersAndPelEntries) {
+ std::string stream_name = "test_destroy_stream";
+ std::string group_name = "test_group";
+ std::string consumer_name = "consumer1";
+
+ redis::StreamAddOptions add_options;
+ add_options.next_id_strategy = *ParseNextStreamEntryIDStrategy("1-0");
+ redis::StreamEntryID id1;
+ auto s = stream_->Add(*ctx_, stream_name, add_options, {"key1", "val1"},
&id1);
+ EXPECT_TRUE(s.ok());
+
+ add_options.next_id_strategy = *ParseNextStreamEntryIDStrategy("2-0");
+ redis::StreamEntryID id2;
+ s = stream_->Add(*ctx_, stream_name, add_options, {"key2", "val2"}, &id2);
+ EXPECT_TRUE(s.ok());
+
+ redis::StreamXGroupCreateOptions create_options = {false, 0, "0-0"};
+ s = stream_->CreateGroup(*ctx_, stream_name, create_options, group_name);
+ EXPECT_TRUE(s.ok());
+ std::vector<std::pair<std::string, redis::StreamConsumerGroupMetadata>>
group_metadata;
+ s = stream_->GetGroupInfo(*ctx_, stream_name, group_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(group_metadata.size(), 1);
+ EXPECT_EQ(group_metadata[0].first, group_name);
+
+ redis::StreamRangeOptions range_options;
+ range_options.start = redis::StreamEntryID::Minimum();
+ range_options.end = redis::StreamEntryID::Maximum();
+ range_options.count = 10;
+ range_options.with_count = true;
+ range_options.exclude_start = true;
+ std::vector<redis::StreamEntry> entries;
+ s = stream_->RangeWithPending(*ctx_, stream_name, range_options, &entries,
group_name, consumer_name, false, true);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(entries.size(), 2);
+
+ bool destroyed = false;
+ s = stream_->DestroyGroup(*ctx_, stream_name, group_name, &destroyed);
+ EXPECT_TRUE(s.ok());
+ EXPECT_TRUE(destroyed);
+ group_metadata.clear();
+ s = stream_->GetGroupInfo(*ctx_, stream_name, group_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(group_metadata.size(), 0);
+
+ // Verify re-destroying the same group deletes nothing
+ destroyed = false;
+ s = stream_->DestroyGroup(*ctx_, stream_name, group_name, &destroyed);
+ EXPECT_FALSE(destroyed);
+ group_metadata.clear();
+ s = stream_->GetGroupInfo(*ctx_, stream_name, group_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(group_metadata.size(), 0);
+
+ // Verify stream entries are still intact after group destruction
+ std::vector<redis::StreamEntry> remaining;
+ redis::StreamRangeOptions range_options2;
+ range_options2.start = redis::StreamEntryID::Minimum();
+ range_options2.end = redis::StreamEntryID::Maximum();
+ s = stream_->Range(*ctx_, stream_name, range_options2, &remaining);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(remaining.size(), 2);
+
+ // Verify we can create the same group again after destroy
+ s = stream_->CreateGroup(*ctx_, stream_name, create_options, group_name);
+ EXPECT_TRUE(s.ok());
+ group_metadata.clear();
+ s = stream_->GetGroupInfo(*ctx_, stream_name, group_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(group_metadata.size(), 1);
+ EXPECT_EQ(group_metadata[0].first, group_name);
+
+ // Verify the re-created group has no consumers or pending entries
+ std::vector<std::pair<std::string, redis::StreamConsumerMetadata>>
consumer_metadata;
+ s = stream_->GetConsumerInfo(*ctx_, stream_name, group_name,
consumer_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(consumer_metadata.size(), 0);
+
+ // Clean up
+ s = stream_->Del(*ctx_, stream_name);
+ EXPECT_TRUE(s.ok());
+}
+
+TEST_F(RedisStreamTest, DestroyGroupDoesNotAffectOtherGroups) {
+ std::string stream_name = "test_destroy_multi_group";
+ std::string group1 = "group_alpha";
+ std::string group2 = "group_beta";
+
+ redis::StreamAddOptions add_options;
+ add_options.next_id_strategy = *ParseNextStreamEntryIDStrategy("1-0");
+ redis::StreamEntryID id1;
+ auto s = stream_->Add(*ctx_, stream_name, add_options, {"k", "v"}, &id1);
+ EXPECT_TRUE(s.ok());
+
+ redis::StreamXGroupCreateOptions create_options = {false, 0, "0-0"};
+ s = stream_->CreateGroup(*ctx_, stream_name, create_options, group1);
+ EXPECT_TRUE(s.ok());
+ s = stream_->CreateGroup(*ctx_, stream_name, create_options, group2);
+ EXPECT_TRUE(s.ok());
+ std::vector<std::pair<std::string, redis::StreamConsumerGroupMetadata>>
group_metadata;
+ s = stream_->GetGroupInfo(*ctx_, stream_name, group_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(group_metadata.size(), 2);
+ int found1 = 0, found2 = 0;
+ for (const auto &item : group_metadata) {
+ if (item.first == group1) found1++;
+ if (item.first == group2) found2++;
+ }
+ EXPECT_EQ(found1, 1);
+ EXPECT_EQ(found2, 1);
+
+ // Consume entries in both groups to create PEL entries
+ redis::StreamRangeOptions range_options;
+ range_options.start = redis::StreamEntryID::Minimum();
+ range_options.end = redis::StreamEntryID::Maximum();
+ range_options.count = 10;
+ range_options.with_count = true;
+ range_options.exclude_start = true;
+ std::vector<redis::StreamEntry> entries;
+ std::string c1 = "c1";
+ std::string c2 = "c2";
+ s = stream_->RangeWithPending(*ctx_, stream_name, range_options, &entries,
group1, c1, false, true);
+ EXPECT_TRUE(s.ok());
+ s = stream_->RangeWithPending(*ctx_, stream_name, range_options, &entries,
group2, c2, false, true);
+ EXPECT_TRUE(s.ok());
+
+ // Destroy group1
+ bool destroyed = false;
+ s = stream_->DestroyGroup(*ctx_, stream_name, group1, &destroyed);
+ EXPECT_TRUE(s.ok());
+ EXPECT_TRUE(destroyed);
+
+ // Verify group2 still exists with its consumer
+ group_metadata.clear();
+ s = stream_->GetGroupInfo(*ctx_, stream_name, group_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(group_metadata.size(), 1);
+ EXPECT_EQ(group_metadata[0].first, group2);
+
+ std::vector<std::pair<std::string, redis::StreamConsumerMetadata>>
consumer_metadata;
+ s = stream_->GetConsumerInfo(*ctx_, stream_name, group2, consumer_metadata);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(consumer_metadata.size(), 1);
+ EXPECT_EQ(consumer_metadata[0].first, "c2");
+
+ // Clean up
+ s = stream_->Del(*ctx_, stream_name);
+ EXPECT_TRUE(s.ok());
}