This is an automated email from the ASF dual-hosted git repository.
edwardxu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 505108c35 fix(tdigest): merge into existing dest without OVERRIDE
(#3412)
505108c35 is described below
commit 505108c35fbce3eae16db870141bb34e9d3654bf
Author: Reilly.tang <[email protected]>
AuthorDate: Wed Apr 15 10:42:04 2026 +0800
fix(tdigest): merge into existing dest without OVERRIDE (#3412)
Co-authored-by: 纪华裕 <[email protected]>
Co-authored-by: Edward Xu <[email protected]>
---
src/types/redis_tdigest.cc | 76 ++++--
src/types/redis_tdigest.h | 16 ++
src/types/tdigest.cc | 2 +-
tests/cppunit/types/tdigest_test.cc | 346 +++++++++++++++++++++++++
tests/gocase/unit/type/tdigest/tdigest_test.go | 126 ++++++++-
5 files changed, 543 insertions(+), 23 deletions(-)
diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index dcdc0f44c..4d0325f3d 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -437,9 +437,8 @@ rocksdb::Status TDigest::Merge(engine::Context& ctx, const
Slice& dest_digest,
return status;
} else if (status.ok()) {
dest_digest_existed = true;
- if (!options.override_flag) {
- return rocksdb::Status::InvalidArgument(fmt::format("{}: {}",
errKeyAlreadyExists, dest_digest.ToString()));
- }
+ // When dest exists without OVERRIDE flag, we should merge dest's data
with sources
+ // (Redis behavior: merge into existing sketch instead of returning error)
}
auto batch = storage_->GetWriteBatchBase();
@@ -473,21 +472,8 @@ rocksdb::Status TDigest::Merge(engine::Context& ctx, const
Slice& dest_digest,
return status;
}
- if (metadata.unmerged_nodes > 0) {
- if (auto status = mergeCurrentBuffer(ctx, source_ns_key, batch,
&metadata, nullptr, &source_centroids);
- !status.ok()) {
- return status;
- }
-
- std::string metadata_bytes;
- metadata.Encode(&metadata_bytes);
- if (auto status = batch->Put(metadata_cf_handle_, source_ns_key,
metadata_bytes); !status.ok()) {
- return status;
- }
- } else if (metadata.merged_nodes > 0) {
- if (auto status = dumpCentroids(ctx, source_ns_key, metadata,
&source_centroids); !status.ok()) {
- return status;
- }
+ if (auto status = getCentroidsForMerge(ctx, source_ns_key, batch,
&metadata, &source_centroids); !status.ok()) {
+ return status;
}
if (!source_centroids.empty()) {
@@ -506,6 +492,40 @@ rocksdb::Status TDigest::Merge(engine::Context& ctx, const
Slice& dest_digest,
compression = std::max(compression, metadata.compression);
}
+ // Merge dest's existing data if dest exists without OVERRIDE flag
+ // (Redis behavior: dest is merged first, then source list is merged,
+ // so if dest is in source list, it gets merged twice)
+ bool should_merge_dest = dest_digest_existed && !options.override_flag;
+
+ if (should_merge_dest) {
+ std::vector<Centroid> dest_centroids;
+
+ if (auto status = getCentroidsForMerge(ctx, dest_ns_key, batch,
&dest_metadata, &dest_centroids); !status.ok()) {
+ return status;
+ }
+
+ if (!dest_centroids.empty()) {
+ source_centroids_data.emplace_back(CentroidsWithDelta{
+ .centroids = std::move(dest_centroids),
+ .delta = dest_metadata.compression,
+ .min = dest_metadata.minimum,
+ .max = dest_metadata.maximum,
+ .total_weight = static_cast<double>(dest_metadata.merged_weight),
+ });
+ total_observations += dest_metadata.total_observations;
+ }
+ // Redis behavior: when merging into existing dest without OVERRIDE,
+ // keep dest's compression value (ignore sources' compression)
+ compression = dest_metadata.compression;
+ }
+
+ /*
+ * refer to: https://redis.io/docs/latest/commands/tdigest.merge/
+ * When COMPRESSION is not specified:
+ * - If destination-key does not exist or if OVERRIDE is specified, the
compression is set to the maximum value among
+ * all source sketches.
+ * - If destination-key already exists and OVERRIDE is not specified, its
compression is not changed.
+ */
if (options.compression != 0) {
compression = options.compression;
}
@@ -744,6 +764,26 @@ rocksdb::Status
TDigest::dumpCentroidsAndBuffer(engine::Context& ctx, const std:
return rocksdb::Status::OK();
}
+rocksdb::Status TDigest::getCentroidsForMerge(engine::Context& ctx, const
std::string& ns_key,
+
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
+ TDigestMetadata* metadata,
std::vector<Centroid>* centroids) {
+ if (metadata->unmerged_nodes > 0) {
+ if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, metadata,
nullptr, centroids); !status.ok()) {
+ return status;
+ }
+ std::string metadata_bytes;
+ metadata->Encode(&metadata_bytes);
+ if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes);
!status.ok()) {
+ return status;
+ }
+ } else if (metadata->merged_nodes > 0) {
+ if (auto status = dumpCentroids(ctx, ns_key, *metadata, centroids);
!status.ok()) {
+ return status;
+ }
+ }
+ return rocksdb::Status::OK();
+}
+
rocksdb::Status
TDigest::applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
const std::string& ns_key, const
TDigestMetadata& metadata,
const std::vector<Centroid>&
centroids) {
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index a949c5832..8cf4562d2 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -124,6 +124,22 @@ class TDigest : public SubKeyScanner {
const TDigestMetadata& metadata,
std::vector<Centroid>* centroids,
std::vector<double>* buffer,
ObserverOrUniquePtr<rocksdb::WriteBatchBase>* clean_after_dump_batch);
+
+ /**
+ * @brief Get centroids for merge operation.
+ *
+ * If the tdigest has unmerged buffer, merge it first and update metadata to
batch.
+ * Otherwise, just dump existing centroids.
+ * @param ctx The context of the operation.
+ * @param ns_key The namespace key of the t-digest.
+ * @param batch The write batch to store metadata updates.
+ * @param metadata The metadata of the t-digest (may be updated if buffer is
merged).
+ * @param centroids The output vector to store the centroids.
+ */
+ rocksdb::Status getCentroidsForMerge(engine::Context& ctx, const
std::string& ns_key,
+
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, TDigestMetadata* metadata,
+ std::vector<Centroid>* centroids);
+
rocksdb::Status
applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, const
std::string& ns_key,
const TDigestMetadata& metadata, const
std::vector<Centroid>& centroids);
diff --git a/src/types/tdigest.cc b/src/types/tdigest.cc
index afbc45ab1..2f5ef689a 100644
--- a/src/types/tdigest.cc
+++ b/src/types/tdigest.cc
@@ -362,7 +362,7 @@ class TDigest {
TDigestImpl impl_;
};
-TDigest::TDigest(uint64_t delta) : impl_(TDigestImpl(delta)) { Reset({}); }
+TDigest::TDigest(uint64_t delta) : impl_(TDigestImpl(delta)) { Reset(); }
void TDigest::Merge(const std::vector<TDigest>& others) {
if (others.empty()) {
diff --git a/tests/cppunit/types/tdigest_test.cc
b/tests/cppunit/types/tdigest_test.cc
index 9fc1fb1f7..f82519d2f 100644
--- a/tests/cppunit/types/tdigest_test.cc
+++ b/tests/cppunit/types/tdigest_test.cc
@@ -602,3 +602,349 @@ TEST_F(RedisTDigestTest, TrimmedMeanComplexInput) {
ASSERT_FALSE(std::isnan(*result.mean));
EXPECT_NEAR(*result.mean, 5.0 / 6.0, 0.01);
}
+
+TEST_F(RedisTDigestTest, MergeIntoExistingDestWithoutOverride) {
+ // Test: When dest exists without OVERRIDE flag, merge dest + sources
together (Redis behavior)
+ std::string ts = std::to_string(util::GetTimeStampMS());
+ std::string src1 = "tdigest_merge_src1_" + ts;
+ std::string src2 = "tdigest_merge_src2_" + ts;
+ std::string dest = "tdigest_merge_dest_" + ts;
+
+ bool exists = false;
+ // Create source1 with values: 1, 2, 3
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src1, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src1, {1.0, 2.0, 3.0}).ok());
+
+ // Create source2 with values: 4, 5, 6, 100, -200
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src2, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src2, {4.0, 5.0, 6.0, 100.0, -200.0}).ok());
+
+ // Create dest with values: 7, 8, 9
+ ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {7.0, 8.0, 9.0}).ok());
+
+ // Merge sources into existing dest without OVERRIDE
+ // Should merge dest(3) + src1(3) + src2(5) = 11 observations
+ redis::TDigestMergeOptions options;
+ options.override_flag = false;
+ auto status = tdigest_->Merge(*ctx_, dest, {src1, src2}, options);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ // Verify total observations: 3 + 3 + 5 = 11
+ TDigestMetadata metadata;
+ ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+ EXPECT_EQ(metadata.total_observations, 11);
+
+ // Verify min: -200
+ std::vector<double> qs = {0.0};
+ redis::TDigestQuantitleResult result;
+ ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+ ASSERT_TRUE(result.quantiles.has_value());
+ EXPECT_NEAR((*result.quantiles)[0], -200.0, 1.0);
+
+ // Verify max: 100 (quantile 1.0)
+ qs = {1.0};
+ ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+ ASSERT_TRUE(result.quantiles.has_value());
+ EXPECT_NEAR((*result.quantiles)[0], 100.0, 1.0);
+}
+
+TEST_F(RedisTDigestTest, MergeIntoExistingDestWithOverride) {
+ // Test: When dest exists with OVERRIDE flag, overwrite dest data
+ std::string ts = std::to_string(util::GetTimeStampMS());
+ std::string src1 = "tdigest_merge_override_src1_" + ts;
+ std::string src2 = "tdigest_merge_override_src2_" + ts;
+ std::string dest = "tdigest_merge_override_dest_" + ts;
+
+ bool exists = false;
+ // Create source1 with values: 1, 2, 3
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src1, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src1, {1.0, 2.0, 3.0}).ok());
+
+ // Create source2 with values: 4, 5, 6, 100, -200
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src2, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src2, {4.0, 5.0, 6.0, 100.0, -200.0}).ok());
+
+ // Create dest with values: 7, 8, 9 (will be overwritten)
+ ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {7.0, 8.0, 9.0}).ok());
+
+ // Merge sources into existing dest WITH OVERRIDE
+ // Should only have src1(3) + src2(5) = 8 observations (dest data
overwritten)
+ redis::TDigestMergeOptions options;
+ options.override_flag = true;
+ auto status = tdigest_->Merge(*ctx_, dest, {src1, src2}, options);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ // Verify total observations: 3 + 5 = 8 (dest data was overwritten)
+ TDigestMetadata metadata;
+ ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+ EXPECT_EQ(metadata.total_observations, 8);
+
+ // Verify min: -200
+ std::vector<double> qs = {0.0};
+ redis::TDigestQuantitleResult result;
+ ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+ ASSERT_TRUE(result.quantiles.has_value());
+ EXPECT_NEAR((*result.quantiles)[0], -200.0, 1.0);
+
+ // Verify max: 100
+ qs = {1.0};
+ ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+ ASSERT_TRUE(result.quantiles.has_value());
+ EXPECT_NEAR((*result.quantiles)[0], 100.0, 1.0);
+}
+
+TEST_F(RedisTDigestTest, MergeDestInSourceListWithoutOverride) {
+ // Test: dest in source list without OVERRIDE - dest data is merged twice
(Redis behavior)
+ std::string ts = std::to_string(util::GetTimeStampMS());
+ std::string dest = "tdigest_dest_in_src_" + ts;
+ std::string src = "tdigest_src_for_dest_" + ts;
+
+ bool exists = false;
+ // Create dest with values: 1, 2, 3
+ ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {1.0, 2.0, 3.0}).ok());
+
+ // Create src with values: 10, 20
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src, {10.0, 20.0}).ok());
+
+ // Verify dest has 3 observations before merge
+ TDigestMetadata metadata_before;
+ ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata_before).ok());
+ EXPECT_EQ(metadata_before.total_observations, 3);
+
+ // Merge: TDIGEST.MERGE dest 2 dest src
+ // dest is both the destination AND in the source list
+ // Redis behavior: dest's existing data + dest(in source) + src = 3+3+2 = 8
+ redis::TDigestMergeOptions options;
+ options.override_flag = false;
+ auto status = tdigest_->Merge(*ctx_, dest, {dest, src}, options);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ // Verify: should have dest(3) + dest(3) + src(2) = 8 observations
+ // (dest is double-counted when in source list without OVERRIDE)
+ TDigestMetadata metadata_after;
+ ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata_after).ok());
+ EXPECT_EQ(metadata_after.total_observations, 8);
+
+ // Verify min: 1 (from dest)
+ std::vector<double> qs = {0.0};
+ redis::TDigestQuantitleResult result;
+ ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+ ASSERT_TRUE(result.quantiles.has_value());
+ EXPECT_NEAR((*result.quantiles)[0], 1.0, 0.1);
+
+ // Verify max: 20 (from src)
+ qs = {1.0};
+ ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+ ASSERT_TRUE(result.quantiles.has_value());
+ EXPECT_NEAR((*result.quantiles)[0], 20.0, 0.1);
+}
+
+TEST_F(RedisTDigestTest, MergeDestInSourceListWithOverride) {
+ // Test: dest in source list WITH OVERRIDE - dest in source counted once
+ std::string ts = std::to_string(util::GetTimeStampMS());
+ std::string dest = "tdigest_dest_in_src_override_" + ts;
+ std::string src = "tdigest_src_for_override_" + ts;
+
+ bool exists = false;
+ // Create dest with values: 1, 2, 3
+ ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {1.0, 2.0, 3.0}).ok());
+
+ // Create src with values: 10, 20
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src, {10.0, 20.0}).ok());
+
+ // Merge with OVERRIDE: dest in source list should be counted once
+ // Result: dest(3) + src(2) = 5 observations
+ redis::TDigestMergeOptions options;
+ options.override_flag = true;
+ auto status = tdigest_->Merge(*ctx_, dest, {dest, src}, options);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ // Verify: should have dest(3) + src(2) = 5 observations
+ TDigestMetadata metadata;
+ ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+ EXPECT_EQ(metadata.total_observations, 5);
+
+ // Verify min: 1 (from dest in source list)
+ std::vector<double> qs = {0.0};
+ redis::TDigestQuantitleResult result;
+ ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+ ASSERT_TRUE(result.quantiles.has_value());
+ EXPECT_NEAR((*result.quantiles)[0], 1.0, 0.1);
+
+ // Verify max: 20 (from src)
+ qs = {1.0};
+ ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+ ASSERT_TRUE(result.quantiles.has_value());
+ EXPECT_NEAR((*result.quantiles)[0], 20.0, 0.1);
+}
+
+TEST_F(RedisTDigestTest, MergeIntoNewDest) {
+ // Test: Merge into a new (non-existing) destination
+ std::string ts = std::to_string(util::GetTimeStampMS());
+ std::string src1 = "tdigest_new_dest_src1_" + ts;
+ std::string src2 = "tdigest_new_dest_src2_" + ts;
+ std::string dest = "tdigest_new_dest_" + ts;
+
+ bool exists = false;
+ // Create source1 with values: 1, 2, 3
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src1, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src1, {1.0, 2.0, 3.0}).ok());
+
+ // Create source2 with values: 4, 5, 6
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src2, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src2, {4.0, 5.0, 6.0}).ok());
+
+ // Merge into new dest (dest does not exist)
+ // Result: src1(3) + src2(3) = 6 observations
+ redis::TDigestMergeOptions options;
+ options.override_flag = false;
+ auto status = tdigest_->Merge(*ctx_, dest, {src1, src2}, options);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ // Verify total observations: 3 + 3 = 6
+ TDigestMetadata metadata;
+ ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+ EXPECT_EQ(metadata.total_observations, 6);
+
+ // Verify min: 1
+ std::vector<double> qs = {0.0};
+ redis::TDigestQuantitleResult result;
+ ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+ ASSERT_TRUE(result.quantiles.has_value());
+ EXPECT_NEAR((*result.quantiles)[0], 1.0, 0.1);
+
+ // Verify max: 6
+ qs = {1.0};
+ ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+ ASSERT_TRUE(result.quantiles.has_value());
+ EXPECT_NEAR((*result.quantiles)[0], 6.0, 0.1);
+}
+
+TEST_F(RedisTDigestTest, MergeIntoExistingDestKeepsCompression) {
+ // Test: When merging into existing dest without OVERRIDE, dest's
compression should be preserved
+ // (Redis behavior: compression is not overwritten by source's compression)
+ std::string ts = std::to_string(util::GetTimeStampMS());
+ std::string src = "tdigest_compression_src_" + ts;
+ std::string dest = "tdigest_compression_dest_" + ts;
+
+ bool exists = false;
+ // Create source with COMPRESSION 200
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src, {200}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src, {1.0}).ok());
+
+ // Create dest with COMPRESSION 100
+ ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {2.0}).ok());
+
+ // Verify dest compression before merge
+ TDigestMetadata metadata_before;
+ ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata_before).ok());
+ EXPECT_EQ(metadata_before.compression, 100);
+
+ // Merge source into dest without OVERRIDE
+ // dest's compression (100) should be preserved, not overwritten by source's
compression (200)
+ redis::TDigestMergeOptions options;
+ options.override_flag = false;
+ auto status = tdigest_->Merge(*ctx_, dest, {src}, options);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ // Verify dest compression is still 100 (not 200)
+ TDigestMetadata metadata_after;
+ ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata_after).ok());
+ EXPECT_EQ(metadata_after.compression, 100) << "dest compression should be
preserved when merging without OVERRIDE";
+
+ // Verify total observations: dest(1) + src(1) = 2
+ EXPECT_EQ(metadata_after.total_observations, 2);
+}
+
+TEST_F(RedisTDigestTest, MergeWithOverrideTakesMaxCompression) {
+ // Test: When merging with OVERRIDE, compression should be max of all sources
+ std::string ts = std::to_string(util::GetTimeStampMS());
+ std::string src1 = "tdigest_override_compression_src1_" + ts;
+ std::string src2 = "tdigest_override_compression_src2_" + ts;
+ std::string dest = "tdigest_override_compression_dest_" + ts;
+
+ bool exists = false;
+ // Create source1 with COMPRESSION 200
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src1, {200}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src1, {1.0}).ok());
+
+ // Create source2 with COMPRESSION 300
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src2, {300}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src2, {2.0}).ok());
+
+ // Create dest with COMPRESSION 100
+ ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {3.0}).ok());
+
+ // Merge with OVERRIDE - compression should be max(src1, src2) = 300
+ redis::TDigestMergeOptions options;
+ options.override_flag = true;
+ auto status = tdigest_->Merge(*ctx_, dest, {src1, src2}, options);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ // Verify dest compression is 300 (max of sources)
+ TDigestMetadata metadata;
+ ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+ EXPECT_EQ(metadata.compression, 300) << "with OVERRIDE, compression should
be max of sources";
+
+ // Verify total observations: src1(1) + src2(1) = 2 (dest data was
overwritten)
+ EXPECT_EQ(metadata.total_observations, 2);
+}
+
+TEST_F(RedisTDigestTest, MergeWithUserSpecifiedCompression) {
+ // Test: User-specified compression overrides all other compression values
+ std::string ts = std::to_string(util::GetTimeStampMS());
+ std::string src = "tdigest_user_compression_src_" + ts;
+ std::string dest = "tdigest_user_compression_dest_" + ts;
+
+ bool exists = false;
+ // Create source with COMPRESSION 200
+ ASSERT_TRUE(tdigest_->Create(*ctx_, src, {200}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, src, {1.0}).ok());
+
+ // Create dest with COMPRESSION 100
+ ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+ ASSERT_FALSE(exists);
+ ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {2.0}).ok());
+
+ // Merge with user-specified COMPRESSION 50
+ // User-specified compression should override both dest and source
compression
+ redis::TDigestMergeOptions options;
+ options.override_flag = false;
+ options.compression = 50;
+ auto status = tdigest_->Merge(*ctx_, dest, {src}, options);
+ ASSERT_TRUE(status.ok()) << status.ToString();
+
+ // Verify dest compression is 50 (user-specified)
+ TDigestMetadata metadata;
+ ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+ EXPECT_EQ(metadata.compression, 50) << "user-specified compression should
override all";
+
+ // Verify total observations: dest(1) + src(1) = 2
+ EXPECT_EQ(metadata.total_observations, 2);
+}
diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go
b/tests/gocase/unit/type/tdigest/tdigest_test.go
index 48721b24b..f61eb9ffc 100644
--- a/tests/gocase/unit/type/tdigest/tdigest_test.go
+++ b/tests/gocase/unit/type/tdigest/tdigest_test.go
@@ -462,16 +462,134 @@ func tdigestTests(t *testing.T, configs
util.KvrocksServerConfigs) {
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", sourceKey2,
"compression", "30").Err())
require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", sourceKey2,
"4.0", "5.0", "6.0", "100", "-200").Err())
- // create a destination digest
+ // create a destination digest and add some data (for testing
merge into existing dest)
destKey := keyPrefix + "dest"
require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", destKey,
"compression", "100").Err())
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", destKey, "7.0",
"8.0", "9.0").Err())
- // merge the source into the destination without override
- require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey,
2, sourceKey1, sourceKey2).Err(), errMsgKeyAlreadyExists)
+ // merge the source into the existing destination without
override
+ // should merge dest + sources together (not return error)
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 2,
sourceKey1, sourceKey2).Err())
- // merge the source into the destination with override
+ // verify the merged result contains data from both dest and
sources
+ rsp := rdb.Do(ctx, "TDIGEST.INFO", destKey)
+ require.NoError(t, rsp.Err())
+ info := toTdigestInfo(t, rsp.Val())
+ // dest(3) + source1(3) + source2(5) = 11 observations
+ require.EqualValues(t, 11, info.Observations)
+ // dest compression (100) should be preserved when merging
without OVERRIDE
+ // (sourceKey1 has compression 101, sourceKey2 has compression
30)
+ require.EqualValues(t, 100, info.Compression)
+
+ rsp = rdb.Do(ctx, "TDIGEST.MIN", destKey)
+ require.NoError(t, rsp.Err())
+ minVal, err := rsp.Float64()
+ require.NoError(t, err)
+ require.InEpsilon(t, -200, minVal, 0.001)
+
+ rsp = rdb.Do(ctx, "TDIGEST.MAX", destKey)
+ require.NoError(t, rsp.Err())
+ maxVal, err := rsp.Float64()
+ require.NoError(t, err)
+ require.InEpsilon(t, 100, maxVal, 0.001)
+
+ // merge the source into the destination with override (should
overwrite existing dest data)
require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 2,
sourceKey1, sourceKey2, "override").Err())
+ // verify override result: should only contain source data (not
previous dest data)
+ rsp = rdb.Do(ctx, "TDIGEST.INFO", destKey)
+ require.NoError(t, rsp.Err())
+ infoOverride := toTdigestInfo(t, rsp.Val())
+ // source1(3) + source2(5) = 8 observations (dest data was
overwritten)
+ require.EqualValues(t, 8, infoOverride.Observations)
+ // with OVERRIDE, compression should be max of sources (101
from sourceKey1)
+ require.EqualValues(t, 101, infoOverride.Compression)
+
+ // Test: dest in source list without OVERRIDE - dest data is
merged twice (Redis behavior)
+ // When dest is both destination and in source list without
OVERRIDE:
+ // Redis merges dest's existing data first, then merges the
source list (including dest)
+ t.Run("dest in source list without OVERRIDE double-counts dest
(Redis behavior)", func(t *testing.T) {
+ destKey := keyPrefix + "dest_in_source"
+ srcKey := keyPrefix + "src_for_dest_in_source"
+
+ // Create dest with 3 values: 1, 2, 3
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE",
destKey, "compression", "100").Err())
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", destKey,
"1", "2", "3").Err())
+
+ // Create source with 2 values: 10, 20
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE",
srcKey, "compression", "100").Err())
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", srcKey,
"10", "20").Err())
+
+ // Record observations before merge
+ rsp := rdb.Do(ctx, "TDIGEST.INFO", destKey)
+ require.NoError(t, rsp.Err())
+ infoBefore := toTdigestInfo(t, rsp.Val())
+ require.EqualValues(t, 3, infoBefore.Observations)
+
+ // Merge: TDIGEST.MERGE dest 2 dest src
+ // dest is both the destination AND in the source list
+ // Redis behavior: dest's existing data + dest(in
source) + src = 3+3+2 = 8
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE",
destKey, 2, destKey, srcKey).Err())
+
+ // Verify: should have dest(3) + dest(3) + src(2) = 8
observations
+ rsp = rdb.Do(ctx, "TDIGEST.INFO", destKey)
+ require.NoError(t, rsp.Err())
+ infoAfter := toTdigestInfo(t, rsp.Val())
+ require.EqualValues(t, 8, infoAfter.Observations, "dest
is double-counted when in source list without OVERRIDE (Redis behavior)")
+
+ // Verify min/max: min should be 1 (from dest), max
should be 20 (from src)
+ rsp = rdb.Do(ctx, "TDIGEST.MIN", destKey)
+ require.NoError(t, rsp.Err())
+ minVal, err := rsp.Float64()
+ require.NoError(t, err)
+ require.InEpsilon(t, 1.0, minVal, 0.001)
+
+ rsp = rdb.Do(ctx, "TDIGEST.MAX", destKey)
+ require.NoError(t, rsp.Err())
+ maxVal, err := rsp.Float64()
+ require.NoError(t, err)
+ require.InEpsilon(t, 20.0, maxVal, 0.001)
+ })
+
+ // Test: dest in source list WITH OVERRIDE
+ t.Run("dest in source list WITH OVERRIDE should replace dest",
func(t *testing.T) {
+ destKey := keyPrefix + "dest_in_source_override"
+ srcKey := keyPrefix + "src_for_override"
+
+ // Create dest with 3 values: 1, 2, 3
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE",
destKey, "compression", "100").Err())
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", destKey,
"1", "2", "3").Err())
+
+ // Create source with 2 values: 10, 20
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE",
srcKey, "compression", "100").Err())
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", srcKey,
"10", "20").Err())
+
+ // Merge with OVERRIDE: TDIGEST.MERGE dest 2 dest src
OVERRIDE
+ // With OVERRIDE, dest's existing data should be
replaced
+ // dest in source list should still be included once
+ require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE",
destKey, 2, destKey, srcKey, "OVERRIDE").Err())
+
+ // Verify: should have dest(3) + src(2) = 5 observations
+ // (dest is included from source list, plus src)
+ rsp := rdb.Do(ctx, "TDIGEST.INFO", destKey)
+ require.NoError(t, rsp.Err())
+ info := toTdigestInfo(t, rsp.Val())
+ require.EqualValues(t, 5, info.Observations, "with
OVERRIDE, dest in source should be counted once")
+
+ // Verify min/max
+ rsp = rdb.Do(ctx, "TDIGEST.MIN", destKey)
+ require.NoError(t, rsp.Err())
+ minVal, err := rsp.Float64()
+ require.NoError(t, err)
+ require.InEpsilon(t, 1.0, minVal, 0.001)
+
+ rsp = rdb.Do(ctx, "TDIGEST.MAX", destKey)
+ require.NoError(t, rsp.Err())
+ maxVal, err := rsp.Float64()
+ require.NoError(t, err)
+ require.InEpsilon(t, 20.0, maxVal, 0.001)
+ })
+
// merge to a new destination key
newDestKey1 := keyPrefix + "new_dest"
require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", newDestKey1, 2,
sourceKey1, sourceKey2).Err())