LindaSummer commented on code in PR #3130:
URL: https://github.com/apache/kvrocks/pull/3130#discussion_r2287308953


##########
src/types/tdigest.h:
##########
@@ -150,3 +150,22 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q) 
{
   diff /= (lc.weight / 2 + rc.weight / 2);
   return Lerp(lc.mean, rc.mean, diff);
 }
+
+template <typename TD>
+inline StatusOr<int> TDigestRevRank(TD&& td, double value) {

Review Comment:
   Hi @donghao526 ,
   
   We need to use a stable way to compare between doubles.
   
   It will be tough to assume that the two double numbers are equal to or 
greater than.
   
   After solving this, we should add some test cases for this corner case.
   
   Best Regards,
   Edward



##########
src/types/tdigest.h:
##########
@@ -150,3 +150,22 @@ inline StatusOr<double> TDigestQuantile(TD&& td, double q) 
{
   diff /= (lc.weight / 2 + rc.weight / 2);
   return Lerp(lc.mean, rc.mean, diff);
 }
+
+template <typename TD>
+inline StatusOr<int> TDigestRevRank(TD&& td, double value) {

Review Comment:
   Hi @donghao526 ,
   
   Since the other code snippets use this way now. You could leave it with the 
current logic.
   
   I will try to create a new PR to solve the unstable comparison problem in 
this file.
   
   Best Regards,
   Edward



##########
src/types/redis_tdigest.cc:
##########
@@ -186,6 +186,67 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const 
Slice& digest_name, con
   return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+rocksdb::Status TDigest::RevRank(engine::Context& ctx, const Slice& 
digest_name, const std::vector<double>& inputs,
+                                 std::vector<int>* result) {
+  auto ns_key = AppendNamespacePrefix(digest_name);
+  TDigestMetadata metadata;
+  {
+    LockGuard guard(storage_->GetLockManager(), ns_key);
+
+    if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); 
!status.ok()) {
+      return status;
+    }
+
+    if (metadata.total_observations == 0) {
+      result->resize(inputs.size(), -2);
+      return rocksdb::Status::OK();
+    }
+
+    if (metadata.unmerged_nodes > 0) {
+      auto batch = storage_->GetWriteBatchBase();
+      WriteBatchLogData log_data(kRedisTDigest);
+      if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
+        return status;
+      }
+
+      if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, &metadata); 
!status.ok()) {
+        return status;
+      }
+
+      std::string metadata_bytes;
+      metadata.Encode(&metadata_bytes);
+      if (auto status = batch->Put(metadata_cf_handle_, ns_key, 
metadata_bytes); !status.ok()) {
+        return status;
+      }
+
+      if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch()); !status.ok()) {
+        return status;
+      }
+
+      ctx.RefreshLatestSnapshot();
+    }
+  }

Review Comment:
   Hi @donghao526 ,
   
   The merge action could be refactored into a function to reduce duplication 
of the same logic.
   
   Best Regards,
   Edward



##########
src/types/redis_tdigest.cc:
##########
@@ -186,6 +186,67 @@ rocksdb::Status TDigest::Add(engine::Context& ctx, const 
Slice& digest_name, con
   return storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch());
 }
 
+rocksdb::Status TDigest::RevRank(engine::Context& ctx, const Slice& 
digest_name, const std::vector<double>& inputs,
+                                 std::vector<int>* result) {
+  auto ns_key = AppendNamespacePrefix(digest_name);
+  TDigestMetadata metadata;
+  {
+    LockGuard guard(storage_->GetLockManager(), ns_key);
+
+    if (auto status = getMetaDataByNsKey(ctx, ns_key, &metadata); 
!status.ok()) {
+      return status;
+    }
+
+    if (metadata.total_observations == 0) {
+      result->resize(inputs.size(), -2);
+      return rocksdb::Status::OK();
+    }
+
+    if (metadata.unmerged_nodes > 0) {
+      auto batch = storage_->GetWriteBatchBase();
+      WriteBatchLogData log_data(kRedisTDigest);
+      if (auto status = batch->PutLogData(log_data.Encode()); !status.ok()) {
+        return status;
+      }
+
+      if (auto status = mergeCurrentBuffer(ctx, ns_key, batch, &metadata); 
!status.ok()) {
+        return status;
+      }
+
+      std::string metadata_bytes;
+      metadata.Encode(&metadata_bytes);
+      if (auto status = batch->Put(metadata_cf_handle_, ns_key, 
metadata_bytes); !status.ok()) {
+        return status;
+      }
+
+      if (auto status = storage_->Write(ctx, storage_->DefaultWriteOptions(), 
batch->GetWriteBatch()); !status.ok()) {
+        return status;
+      }
+
+      ctx.RefreshLatestSnapshot();
+    }
+  }
+
+  std::vector<Centroid> centroids;
+  if (auto status = dumpCentroids(ctx, ns_key, metadata, &centroids); 
!status.ok()) {
+    return status;
+  }
+
+  auto dump_centroids = DummyCentroids(metadata, centroids);
+
+  result->clear();
+  result->reserve(inputs.size());
+
+  for (auto value : inputs) {
+    auto status_or_rank = TDigestRevRank(dump_centroids, value);
+    if (!status_or_rank) {
+      return rocksdb::Status::InvalidArgument(status_or_rank.Msg());
+    }
+    result->push_back(*status_or_rank);
+  }

Review Comment:
   Hi @donghao526 ,
   
   We could sort the inputs and get the ranks with just one scan of the 
centroids since it's sorted.
   
   Best Regards,
   Edward



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to