This is an automated email from the ASF dual-hosted git repository.

edwardxu pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 505108c35 fix(tdigest): merge into existing dest without OVERRIDE 
(#3412)
505108c35 is described below

commit 505108c35fbce3eae16db870141bb34e9d3654bf
Author: Reilly.tang <[email protected]>
AuthorDate: Wed Apr 15 10:42:04 2026 +0800

    fix(tdigest): merge into existing dest without OVERRIDE (#3412)
    
    Co-authored-by: 纪华裕 <[email protected]>
    Co-authored-by: Edward Xu <[email protected]>
---
 src/types/redis_tdigest.cc                     |  76 ++++--
 src/types/redis_tdigest.h                      |  16 ++
 src/types/tdigest.cc                           |   2 +-
 tests/cppunit/types/tdigest_test.cc            | 346 +++++++++++++++++++++++++
 tests/gocase/unit/type/tdigest/tdigest_test.go | 126 ++++++++-
 5 files changed, 543 insertions(+), 23 deletions(-)

diff --git a/src/types/redis_tdigest.cc b/src/types/redis_tdigest.cc
index dcdc0f44c..4d0325f3d 100644
--- a/src/types/redis_tdigest.cc
+++ b/src/types/redis_tdigest.cc
@@ -437,9 +437,8 @@ rocksdb::Status TDigest::Merge(engine::Context& ctx, const 
Slice& dest_digest,
     return status;
   } else if (status.ok()) {
     dest_digest_existed = true;
-    if (!options.override_flag) {
-      return rocksdb::Status::InvalidArgument(fmt::format("{}: {}", 
errKeyAlreadyExists, dest_digest.ToString()));
-    }
+    // When dest exists without OVERRIDE flag, we should merge dest's data 
with sources
+    // (Redis behavior: merge into existing sketch instead of returning error)
   }
 
   auto batch = storage_->GetWriteBatchBase();
@@ -473,21 +472,8 @@ rocksdb::Status TDigest::Merge(engine::Context& ctx, const 
Slice& dest_digest,
       return status;
     }
 
-    if (metadata.unmerged_nodes > 0) {
-      if (auto status = mergeCurrentBuffer(ctx, source_ns_key, batch, 
&metadata, nullptr, &source_centroids);
-          !status.ok()) {
-        return status;
-      }
-
-      std::string metadata_bytes;
-      metadata.Encode(&metadata_bytes);
-      if (auto status = batch->Put(metadata_cf_handle_, source_ns_key, 
metadata_bytes); !status.ok()) {
-        return status;
-      }
-    } else if (metadata.merged_nodes > 0) {
-      if (auto status = dumpCentroids(ctx, source_ns_key, metadata, 
&source_centroids); !status.ok()) {
-        return status;
-      }
+    if (auto status = getCentroidsForMerge(ctx, source_ns_key, batch, 
&metadata, &source_centroids); !status.ok()) {
+      return status;
     }
 
     if (!source_centroids.empty()) {
@@ -506,6 +492,40 @@ rocksdb::Status TDigest::Merge(engine::Context& ctx, const 
Slice& dest_digest,
     compression = std::max(compression, metadata.compression);
   }
 
+  // Merge dest's existing data if dest exists without OVERRIDE flag
+  // (Redis behavior: dest is merged first, then source list is merged,
+  // so if dest is in source list, it gets merged twice)
+  bool should_merge_dest = dest_digest_existed && !options.override_flag;
+
+  if (should_merge_dest) {
+    std::vector<Centroid> dest_centroids;
+
+    if (auto status = getCentroidsForMerge(ctx, dest_ns_key, batch, 
&dest_metadata, &dest_centroids); !status.ok()) {
+      return status;
+    }
+
+    if (!dest_centroids.empty()) {
+      source_centroids_data.emplace_back(CentroidsWithDelta{
+          .centroids = std::move(dest_centroids),
+          .delta = dest_metadata.compression,
+          .min = dest_metadata.minimum,
+          .max = dest_metadata.maximum,
+          .total_weight = static_cast<double>(dest_metadata.merged_weight),
+      });
+      total_observations += dest_metadata.total_observations;
+    }
+    // Redis behavior: when merging into existing dest without OVERRIDE,
+    // keep dest's compression value (ignore sources' compression)
+    compression = dest_metadata.compression;
+  }
+
+  /*
+   * refer to: https://redis.io/docs/latest/commands/tdigest.merge/
+   * When COMPRESSION is not specified:
+   * - If destination-key does not exist or if OVERRIDE is specified, the 
compression is set to the maximum value among
+   * all source sketches.
+   * - If destination-key already exists and OVERRIDE is not specified, its 
compression is not changed.
+   */
   if (options.compression != 0) {
     compression = options.compression;
   }
@@ -744,6 +764,26 @@ rocksdb::Status 
TDigest::dumpCentroidsAndBuffer(engine::Context& ctx, const std:
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status TDigest::getCentroidsForMerge(engine::Context& ctx, const 
std::string& ns_key,
+                                              
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
+                                              TDigestMetadata* metadata, 
std::vector<Centroid>* centroids) {
+  if (metadata->unmerged_nodes > 0) {
+    if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, metadata, 
nullptr, centroids); !status.ok()) {
+      return status;
+    }
+    std::string metadata_bytes;
+    metadata->Encode(&metadata_bytes);
+    if (auto status = batch->Put(metadata_cf_handle_, ns_key, metadata_bytes); 
!status.ok()) {
+      return status;
+    }
+  } else if (metadata->merged_nodes > 0) {
+    if (auto status = dumpCentroids(ctx, ns_key, *metadata, centroids); 
!status.ok()) {
+      return status;
+    }
+  }
+  return rocksdb::Status::OK();
+}
+
 rocksdb::Status 
TDigest::applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch,
                                            const std::string& ns_key, const 
TDigestMetadata& metadata,
                                            const std::vector<Centroid>& 
centroids) {
diff --git a/src/types/redis_tdigest.h b/src/types/redis_tdigest.h
index a949c5832..8cf4562d2 100644
--- a/src/types/redis_tdigest.h
+++ b/src/types/redis_tdigest.h
@@ -124,6 +124,22 @@ class TDigest : public SubKeyScanner {
                                          const TDigestMetadata& metadata, 
std::vector<Centroid>* centroids,
                                          std::vector<double>* buffer,
                                          
ObserverOrUniquePtr<rocksdb::WriteBatchBase>* clean_after_dump_batch);
+
+  /**
+   * @brief Get centroids for merge operation.
+   *
+   * If the tdigest has unmerged buffer, merge it first and update metadata to 
batch.
+   * Otherwise, just dump existing centroids.
+   * @param ctx The context of the operation.
+   * @param ns_key The namespace key of the t-digest.
+   * @param batch The write batch to store metadata updates.
+   * @param metadata The metadata of the t-digest (may be updated if buffer is 
merged).
+   * @param centroids The output vector to store the centroids.
+   */
+  rocksdb::Status getCentroidsForMerge(engine::Context& ctx, const 
std::string& ns_key,
+                                       
ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, TDigestMetadata* metadata,
+                                       std::vector<Centroid>* centroids);
+
   rocksdb::Status 
applyNewCentroids(ObserverOrUniquePtr<rocksdb::WriteBatchBase>& batch, const 
std::string& ns_key,
                                     const TDigestMetadata& metadata, const 
std::vector<Centroid>& centroids);
 
diff --git a/src/types/tdigest.cc b/src/types/tdigest.cc
index afbc45ab1..2f5ef689a 100644
--- a/src/types/tdigest.cc
+++ b/src/types/tdigest.cc
@@ -362,7 +362,7 @@ class TDigest {
   TDigestImpl impl_;
 };
 
-TDigest::TDigest(uint64_t delta) : impl_(TDigestImpl(delta)) { Reset({}); }
+TDigest::TDigest(uint64_t delta) : impl_(TDigestImpl(delta)) { Reset(); }
 
 void TDigest::Merge(const std::vector<TDigest>& others) {
   if (others.empty()) {
diff --git a/tests/cppunit/types/tdigest_test.cc 
b/tests/cppunit/types/tdigest_test.cc
index 9fc1fb1f7..f82519d2f 100644
--- a/tests/cppunit/types/tdigest_test.cc
+++ b/tests/cppunit/types/tdigest_test.cc
@@ -602,3 +602,349 @@ TEST_F(RedisTDigestTest, TrimmedMeanComplexInput) {
   ASSERT_FALSE(std::isnan(*result.mean));
   EXPECT_NEAR(*result.mean, 5.0 / 6.0, 0.01);
 }
+
+TEST_F(RedisTDigestTest, MergeIntoExistingDestWithoutOverride) {
+  // Test: When dest exists without OVERRIDE flag, merge dest + sources 
together (Redis behavior)
+  std::string ts = std::to_string(util::GetTimeStampMS());
+  std::string src1 = "tdigest_merge_src1_" + ts;
+  std::string src2 = "tdigest_merge_src2_" + ts;
+  std::string dest = "tdigest_merge_dest_" + ts;
+
+  bool exists = false;
+  // Create source1 with values: 1, 2, 3
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src1, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src1, {1.0, 2.0, 3.0}).ok());
+
+  // Create source2 with values: 4, 5, 6, 100, -200
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src2, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src2, {4.0, 5.0, 6.0, 100.0, -200.0}).ok());
+
+  // Create dest with values: 7, 8, 9
+  ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {7.0, 8.0, 9.0}).ok());
+
+  // Merge sources into existing dest without OVERRIDE
+  // Should merge dest(3) + src1(3) + src2(5) = 11 observations
+  redis::TDigestMergeOptions options;
+  options.override_flag = false;
+  auto status = tdigest_->Merge(*ctx_, dest, {src1, src2}, options);
+  ASSERT_TRUE(status.ok()) << status.ToString();
+
+  // Verify total observations: 3 + 3 + 5 = 11
+  TDigestMetadata metadata;
+  ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+  EXPECT_EQ(metadata.total_observations, 11);
+
+  // Verify min: -200
+  std::vector<double> qs = {0.0};
+  redis::TDigestQuantitleResult result;
+  ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+  ASSERT_TRUE(result.quantiles.has_value());
+  EXPECT_NEAR((*result.quantiles)[0], -200.0, 1.0);
+
+  // Verify max: 100 (quantile 1.0)
+  qs = {1.0};
+  ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+  ASSERT_TRUE(result.quantiles.has_value());
+  EXPECT_NEAR((*result.quantiles)[0], 100.0, 1.0);
+}
+
+TEST_F(RedisTDigestTest, MergeIntoExistingDestWithOverride) {
+  // Test: When dest exists with OVERRIDE flag, overwrite dest data
+  std::string ts = std::to_string(util::GetTimeStampMS());
+  std::string src1 = "tdigest_merge_override_src1_" + ts;
+  std::string src2 = "tdigest_merge_override_src2_" + ts;
+  std::string dest = "tdigest_merge_override_dest_" + ts;
+
+  bool exists = false;
+  // Create source1 with values: 1, 2, 3
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src1, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src1, {1.0, 2.0, 3.0}).ok());
+
+  // Create source2 with values: 4, 5, 6, 100, -200
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src2, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src2, {4.0, 5.0, 6.0, 100.0, -200.0}).ok());
+
+  // Create dest with values: 7, 8, 9 (will be overwritten)
+  ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {7.0, 8.0, 9.0}).ok());
+
+  // Merge sources into existing dest WITH OVERRIDE
+  // Should only have src1(3) + src2(5) = 8 observations (dest data 
overwritten)
+  redis::TDigestMergeOptions options;
+  options.override_flag = true;
+  auto status = tdigest_->Merge(*ctx_, dest, {src1, src2}, options);
+  ASSERT_TRUE(status.ok()) << status.ToString();
+
+  // Verify total observations: 3 + 5 = 8 (dest data was overwritten)
+  TDigestMetadata metadata;
+  ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+  EXPECT_EQ(metadata.total_observations, 8);
+
+  // Verify min: -200
+  std::vector<double> qs = {0.0};
+  redis::TDigestQuantitleResult result;
+  ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+  ASSERT_TRUE(result.quantiles.has_value());
+  EXPECT_NEAR((*result.quantiles)[0], -200.0, 1.0);
+
+  // Verify max: 100
+  qs = {1.0};
+  ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+  ASSERT_TRUE(result.quantiles.has_value());
+  EXPECT_NEAR((*result.quantiles)[0], 100.0, 1.0);
+}
+
+TEST_F(RedisTDigestTest, MergeDestInSourceListWithoutOverride) {
+  // Test: dest in source list without OVERRIDE - dest data is merged twice 
(Redis behavior)
+  std::string ts = std::to_string(util::GetTimeStampMS());
+  std::string dest = "tdigest_dest_in_src_" + ts;
+  std::string src = "tdigest_src_for_dest_" + ts;
+
+  bool exists = false;
+  // Create dest with values: 1, 2, 3
+  ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {1.0, 2.0, 3.0}).ok());
+
+  // Create src with values: 10, 20
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src, {10.0, 20.0}).ok());
+
+  // Verify dest has 3 observations before merge
+  TDigestMetadata metadata_before;
+  ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata_before).ok());
+  EXPECT_EQ(metadata_before.total_observations, 3);
+
+  // Merge: TDIGEST.MERGE dest 2 dest src
+  // dest is both the destination AND in the source list
+  // Redis behavior: dest's existing data + dest(in source) + src = 3+3+2 = 8
+  redis::TDigestMergeOptions options;
+  options.override_flag = false;
+  auto status = tdigest_->Merge(*ctx_, dest, {dest, src}, options);
+  ASSERT_TRUE(status.ok()) << status.ToString();
+
+  // Verify: should have dest(3) + dest(3) + src(2) = 8 observations
+  // (dest is double-counted when in source list without OVERRIDE)
+  TDigestMetadata metadata_after;
+  ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata_after).ok());
+  EXPECT_EQ(metadata_after.total_observations, 8);
+
+  // Verify min: 1 (from dest)
+  std::vector<double> qs = {0.0};
+  redis::TDigestQuantitleResult result;
+  ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+  ASSERT_TRUE(result.quantiles.has_value());
+  EXPECT_NEAR((*result.quantiles)[0], 1.0, 0.1);
+
+  // Verify max: 20 (from src)
+  qs = {1.0};
+  ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+  ASSERT_TRUE(result.quantiles.has_value());
+  EXPECT_NEAR((*result.quantiles)[0], 20.0, 0.1);
+}
+
+TEST_F(RedisTDigestTest, MergeDestInSourceListWithOverride) {
+  // Test: dest in source list WITH OVERRIDE - dest in source counted once
+  std::string ts = std::to_string(util::GetTimeStampMS());
+  std::string dest = "tdigest_dest_in_src_override_" + ts;
+  std::string src = "tdigest_src_for_override_" + ts;
+
+  bool exists = false;
+  // Create dest with values: 1, 2, 3
+  ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {1.0, 2.0, 3.0}).ok());
+
+  // Create src with values: 10, 20
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src, {10.0, 20.0}).ok());
+
+  // Merge with OVERRIDE: dest in source list should be counted once
+  // Result: dest(3) + src(2) = 5 observations
+  redis::TDigestMergeOptions options;
+  options.override_flag = true;
+  auto status = tdigest_->Merge(*ctx_, dest, {dest, src}, options);
+  ASSERT_TRUE(status.ok()) << status.ToString();
+
+  // Verify: should have dest(3) + src(2) = 5 observations
+  TDigestMetadata metadata;
+  ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+  EXPECT_EQ(metadata.total_observations, 5);
+
+  // Verify min: 1 (from dest in source list)
+  std::vector<double> qs = {0.0};
+  redis::TDigestQuantitleResult result;
+  ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+  ASSERT_TRUE(result.quantiles.has_value());
+  EXPECT_NEAR((*result.quantiles)[0], 1.0, 0.1);
+
+  // Verify max: 20 (from src)
+  qs = {1.0};
+  ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+  ASSERT_TRUE(result.quantiles.has_value());
+  EXPECT_NEAR((*result.quantiles)[0], 20.0, 0.1);
+}
+
+TEST_F(RedisTDigestTest, MergeIntoNewDest) {
+  // Test: Merge into a new (non-existing) destination
+  std::string ts = std::to_string(util::GetTimeStampMS());
+  std::string src1 = "tdigest_new_dest_src1_" + ts;
+  std::string src2 = "tdigest_new_dest_src2_" + ts;
+  std::string dest = "tdigest_new_dest_" + ts;
+
+  bool exists = false;
+  // Create source1 with values: 1, 2, 3
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src1, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src1, {1.0, 2.0, 3.0}).ok());
+
+  // Create source2 with values: 4, 5, 6
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src2, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src2, {4.0, 5.0, 6.0}).ok());
+
+  // Merge into new dest (dest does not exist)
+  // Result: src1(3) + src2(3) = 6 observations
+  redis::TDigestMergeOptions options;
+  options.override_flag = false;
+  auto status = tdigest_->Merge(*ctx_, dest, {src1, src2}, options);
+  ASSERT_TRUE(status.ok()) << status.ToString();
+
+  // Verify total observations: 3 + 3 = 6
+  TDigestMetadata metadata;
+  ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+  EXPECT_EQ(metadata.total_observations, 6);
+
+  // Verify min: 1
+  std::vector<double> qs = {0.0};
+  redis::TDigestQuantitleResult result;
+  ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+  ASSERT_TRUE(result.quantiles.has_value());
+  EXPECT_NEAR((*result.quantiles)[0], 1.0, 0.1);
+
+  // Verify max: 6
+  qs = {1.0};
+  ASSERT_TRUE(tdigest_->Quantile(*ctx_, dest, qs, &result).ok());
+  ASSERT_TRUE(result.quantiles.has_value());
+  EXPECT_NEAR((*result.quantiles)[0], 6.0, 0.1);
+}
+
+TEST_F(RedisTDigestTest, MergeIntoExistingDestKeepsCompression) {
+  // Test: When merging into existing dest without OVERRIDE, dest's 
compression should be preserved
+  // (Redis behavior: compression is not overwritten by source's compression)
+  std::string ts = std::to_string(util::GetTimeStampMS());
+  std::string src = "tdigest_compression_src_" + ts;
+  std::string dest = "tdigest_compression_dest_" + ts;
+
+  bool exists = false;
+  // Create source with COMPRESSION 200
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src, {200}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src, {1.0}).ok());
+
+  // Create dest with COMPRESSION 100
+  ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {2.0}).ok());
+
+  // Verify dest compression before merge
+  TDigestMetadata metadata_before;
+  ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata_before).ok());
+  EXPECT_EQ(metadata_before.compression, 100);
+
+  // Merge source into dest without OVERRIDE
+  // dest's compression (100) should be preserved, not overwritten by source's 
compression (200)
+  redis::TDigestMergeOptions options;
+  options.override_flag = false;
+  auto status = tdigest_->Merge(*ctx_, dest, {src}, options);
+  ASSERT_TRUE(status.ok()) << status.ToString();
+
+  // Verify dest compression is still 100 (not 200)
+  TDigestMetadata metadata_after;
+  ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata_after).ok());
+  EXPECT_EQ(metadata_after.compression, 100) << "dest compression should be 
preserved when merging without OVERRIDE";
+
+  // Verify total observations: dest(1) + src(1) = 2
+  EXPECT_EQ(metadata_after.total_observations, 2);
+}
+
+TEST_F(RedisTDigestTest, MergeWithOverrideTakesMaxCompression) {
+  // Test: When merging with OVERRIDE, compression should be max of all sources
+  std::string ts = std::to_string(util::GetTimeStampMS());
+  std::string src1 = "tdigest_override_compression_src1_" + ts;
+  std::string src2 = "tdigest_override_compression_src2_" + ts;
+  std::string dest = "tdigest_override_compression_dest_" + ts;
+
+  bool exists = false;
+  // Create source1 with COMPRESSION 200
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src1, {200}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src1, {1.0}).ok());
+
+  // Create source2 with COMPRESSION 300
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src2, {300}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src2, {2.0}).ok());
+
+  // Create dest with COMPRESSION 100
+  ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {3.0}).ok());
+
+  // Merge with OVERRIDE - compression should be max(src1, src2) = 300
+  redis::TDigestMergeOptions options;
+  options.override_flag = true;
+  auto status = tdigest_->Merge(*ctx_, dest, {src1, src2}, options);
+  ASSERT_TRUE(status.ok()) << status.ToString();
+
+  // Verify dest compression is 300 (max of sources)
+  TDigestMetadata metadata;
+  ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+  EXPECT_EQ(metadata.compression, 300) << "with OVERRIDE, compression should 
be max of sources";
+
+  // Verify total observations: src1(1) + src2(1) = 2 (dest data was 
overwritten)
+  EXPECT_EQ(metadata.total_observations, 2);
+}
+
+TEST_F(RedisTDigestTest, MergeWithUserSpecifiedCompression) {
+  // Test: User-specified compression overrides all other compression values
+  std::string ts = std::to_string(util::GetTimeStampMS());
+  std::string src = "tdigest_user_compression_src_" + ts;
+  std::string dest = "tdigest_user_compression_dest_" + ts;
+
+  bool exists = false;
+  // Create source with COMPRESSION 200
+  ASSERT_TRUE(tdigest_->Create(*ctx_, src, {200}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, src, {1.0}).ok());
+
+  // Create dest with COMPRESSION 100
+  ASSERT_TRUE(tdigest_->Create(*ctx_, dest, {100}, &exists).ok());
+  ASSERT_FALSE(exists);
+  ASSERT_TRUE(tdigest_->Add(*ctx_, dest, {2.0}).ok());
+
+  // Merge with user-specified COMPRESSION 50
+  // User-specified compression should override both dest and source 
compression
+  redis::TDigestMergeOptions options;
+  options.override_flag = false;
+  options.compression = 50;
+  auto status = tdigest_->Merge(*ctx_, dest, {src}, options);
+  ASSERT_TRUE(status.ok()) << status.ToString();
+
+  // Verify dest compression is 50 (user-specified)
+  TDigestMetadata metadata;
+  ASSERT_TRUE(tdigest_->GetMetaData(*ctx_, dest, &metadata).ok());
+  EXPECT_EQ(metadata.compression, 50) << "user-specified compression should 
override all";
+
+  // Verify total observations: dest(1) + src(1) = 2
+  EXPECT_EQ(metadata.total_observations, 2);
+}
diff --git a/tests/gocase/unit/type/tdigest/tdigest_test.go 
b/tests/gocase/unit/type/tdigest/tdigest_test.go
index 48721b24b..f61eb9ffc 100644
--- a/tests/gocase/unit/type/tdigest/tdigest_test.go
+++ b/tests/gocase/unit/type/tdigest/tdigest_test.go
@@ -462,16 +462,134 @@ func tdigestTests(t *testing.T, configs 
util.KvrocksServerConfigs) {
                require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", sourceKey2, 
"compression", "30").Err())
                require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", sourceKey2, 
"4.0", "5.0", "6.0", "100", "-200").Err())
 
-               // create a destination digest
+               // create a destination digest and add some data (for testing 
merge into existing dest)
                destKey := keyPrefix + "dest"
                require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", destKey, 
"compression", "100").Err())
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", destKey, "7.0", 
"8.0", "9.0").Err())
 
-               // merge the source into the destination without override
-               require.ErrorContains(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 
2, sourceKey1, sourceKey2).Err(), errMsgKeyAlreadyExists)
+               // merge the source into the existing destination without 
override
+               // should merge dest + sources together (not return error)
+               require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 2, 
sourceKey1, sourceKey2).Err())
 
-               // merge the source into the destination with override
+               // verify the merged result contains data from both dest and 
sources
+               rsp := rdb.Do(ctx, "TDIGEST.INFO", destKey)
+               require.NoError(t, rsp.Err())
+               info := toTdigestInfo(t, rsp.Val())
+               // dest(3) + source1(3) + source2(5) = 11 observations
+               require.EqualValues(t, 11, info.Observations)
+               // dest compression (100) should be preserved when merging 
without OVERRIDE
+               // (sourceKey1 has compression 101, sourceKey2 has compression 
30)
+               require.EqualValues(t, 100, info.Compression)
+
+               rsp = rdb.Do(ctx, "TDIGEST.MIN", destKey)
+               require.NoError(t, rsp.Err())
+               minVal, err := rsp.Float64()
+               require.NoError(t, err)
+               require.InEpsilon(t, -200, minVal, 0.001)
+
+               rsp = rdb.Do(ctx, "TDIGEST.MAX", destKey)
+               require.NoError(t, rsp.Err())
+               maxVal, err := rsp.Float64()
+               require.NoError(t, err)
+               require.InEpsilon(t, 100, maxVal, 0.001)
+
+               // merge the source into the destination with override (should 
overwrite existing dest data)
                require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", destKey, 2, 
sourceKey1, sourceKey2, "override").Err())
 
+               // verify override result: should only contain source data (not 
previous dest data)
+               rsp = rdb.Do(ctx, "TDIGEST.INFO", destKey)
+               require.NoError(t, rsp.Err())
+               infoOverride := toTdigestInfo(t, rsp.Val())
+               // source1(3) + source2(5) = 8 observations (dest data was 
overwritten)
+               require.EqualValues(t, 8, infoOverride.Observations)
+               // with OVERRIDE, compression should be max of sources (101 
from sourceKey1)
+               require.EqualValues(t, 101, infoOverride.Compression)
+
+               // Test: dest in source list without OVERRIDE - dest data is 
merged twice (Redis behavior)
+               // When dest is both destination and in source list without 
OVERRIDE:
+               // Redis merges dest's existing data first, then merges the 
source list (including dest)
+               t.Run("dest in source list without OVERRIDE double-counts dest 
(Redis behavior)", func(t *testing.T) {
+                       destKey := keyPrefix + "dest_in_source"
+                       srcKey := keyPrefix + "src_for_dest_in_source"
+
+                       // Create dest with 3 values: 1, 2, 3
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", 
destKey, "compression", "100").Err())
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", destKey, 
"1", "2", "3").Err())
+
+                       // Create source with 2 values: 10, 20
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", 
srcKey, "compression", "100").Err())
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", srcKey, 
"10", "20").Err())
+
+                       // Record observations before merge
+                       rsp := rdb.Do(ctx, "TDIGEST.INFO", destKey)
+                       require.NoError(t, rsp.Err())
+                       infoBefore := toTdigestInfo(t, rsp.Val())
+                       require.EqualValues(t, 3, infoBefore.Observations)
+
+                       // Merge: TDIGEST.MERGE dest 2 dest src
+                       // dest is both the destination AND in the source list
+                       // Redis behavior: dest's existing data + dest(in 
source) + src = 3+3+2 = 8
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", 
destKey, 2, destKey, srcKey).Err())
+
+                       // Verify: should have dest(3) + dest(3) + src(2) = 8 
observations
+                       rsp = rdb.Do(ctx, "TDIGEST.INFO", destKey)
+                       require.NoError(t, rsp.Err())
+                       infoAfter := toTdigestInfo(t, rsp.Val())
+                       require.EqualValues(t, 8, infoAfter.Observations, "dest 
is double-counted when in source list without OVERRIDE (Redis behavior)")
+
+                       // Verify min/max: min should be 1 (from dest), max 
should be 20 (from src)
+                       rsp = rdb.Do(ctx, "TDIGEST.MIN", destKey)
+                       require.NoError(t, rsp.Err())
+                       minVal, err := rsp.Float64()
+                       require.NoError(t, err)
+                       require.InEpsilon(t, 1.0, minVal, 0.001)
+
+                       rsp = rdb.Do(ctx, "TDIGEST.MAX", destKey)
+                       require.NoError(t, rsp.Err())
+                       maxVal, err := rsp.Float64()
+                       require.NoError(t, err)
+                       require.InEpsilon(t, 20.0, maxVal, 0.001)
+               })
+
+               // Test: dest in source list WITH OVERRIDE
+               t.Run("dest in source list WITH OVERRIDE should replace dest", 
func(t *testing.T) {
+                       destKey := keyPrefix + "dest_in_source_override"
+                       srcKey := keyPrefix + "src_for_override"
+
+                       // Create dest with 3 values: 1, 2, 3
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", 
destKey, "compression", "100").Err())
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", destKey, 
"1", "2", "3").Err())
+
+                       // Create source with 2 values: 10, 20
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.CREATE", 
srcKey, "compression", "100").Err())
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.ADD", srcKey, 
"10", "20").Err())
+
+                       // Merge with OVERRIDE: TDIGEST.MERGE dest 2 dest src 
OVERRIDE
+                       // With OVERRIDE, dest's existing data should be 
replaced
+                       // dest in source list should still be included once
+                       require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", 
destKey, 2, destKey, srcKey, "OVERRIDE").Err())
+
+                       // Verify: should have dest(3) + src(2) = 5 observations
+                       // (dest is included from source list, plus src)
+                       rsp := rdb.Do(ctx, "TDIGEST.INFO", destKey)
+                       require.NoError(t, rsp.Err())
+                       info := toTdigestInfo(t, rsp.Val())
+                       require.EqualValues(t, 5, info.Observations, "with 
OVERRIDE, dest in source should be counted once")
+
+                       // Verify min/max
+                       rsp = rdb.Do(ctx, "TDIGEST.MIN", destKey)
+                       require.NoError(t, rsp.Err())
+                       minVal, err := rsp.Float64()
+                       require.NoError(t, err)
+                       require.InEpsilon(t, 1.0, minVal, 0.001)
+
+                       rsp = rdb.Do(ctx, "TDIGEST.MAX", destKey)
+                       require.NoError(t, rsp.Err())
+                       maxVal, err := rsp.Float64()
+                       require.NoError(t, err)
+                       require.InEpsilon(t, 20.0, maxVal, 0.001)
+               })
+
                // merge to a new destination key
                newDestKey1 := keyPrefix + "new_dest"
                require.NoError(t, rdb.Do(ctx, "TDIGEST.MERGE", newDestKey1, 2, 
sourceKey1, sourceKey2).Err())

Reply via email to