This is an automated email from the ASF dual-hosted git repository.

git-hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git


The following commit(s) were added to refs/heads/unstable by this push:
     new 855bad3c1 feat(server): implement LATENCY command set (Redis 7.0+ 
compatibility) (#3461)
855bad3c1 is described below

commit 855bad3c13b6fd9fc9d8397ade8aa1a05dc747b5
Author: gongna-au <[email protected]>
AuthorDate: Mon Apr 27 21:20:35 2026 +0800

    feat(server): implement LATENCY command set (Redis 7.0+ compatibility) 
(#3461)
    
    ## Summary
    This PR implements the `LATENCY` command for Kvrocks, focusing on the
    `LATENCY HISTOGRAM` subcommand introduced in Redis 7.0. It leverages the
    existing histogram infrastructure (`histogram-bucket-boundaries` config
    + `CommandHistogram` stats) to expose per-command latency distributions
    via the standard Redis protocol.
    
    ## Changes
    ### New File: `src/commands/cmd_latency.cc`
    
    Implements `CommandLatency` with three subcommands:
    
    - **`LATENCY HISTOGRAM [command ...]`** — Returns a cumulative
    distribution of command latencies.
    - Output uses the Redis 7.0 format: outer map keyed by command name,
    each value is a map with `calls` (integer) and `histogram_usec` (map of
    boundary → cumulative count).
    - Frequency counts from `CommandHistogram::buckets` are converted to
    cumulative sums at output time.
    - Empty leading buckets are skipped (matching Redis behavior of omitting
    zero-count ranges).
      - The last bucket (overflow) is rendered as `"inf"`.
    - Returns an empty map if `histogram-bucket-boundaries` is not
    configured.
    
    - **`LATENCY RESET [event ...]`** — Returns `(integer) 0`.
    - In Redis, this resets the spike-sampling event system (`LATENCY
    LATEST`/`HISTORY`/`GRAPH`), which Kvrocks does not have. Per Redis docs,
    histogram data is reset via `CONFIG RESETSTAT`, not `LATENCY RESET`.
    Returning 0 correctly indicates no event classes were reset.
    
    - **`LATENCY HELP`** — Returns usage information for all supported
    subcommands.
    
    ### Build System
    No changes needed. The file is automatically picked up by
    `file(GLOB_RECURSE KVROCKS_SRCS src/*.cc)` in root `CMakeLists.txt`.
    
    ### Command Registration
    Registered under `CommandCategory::Server` via
    `REDIS_REGISTER_COMMANDS(Server, ...)`, same category as
    `cmd_server.cc`. This is safe because the macro generates unique static
    variables per translation unit using `__LINE__`.
    
    ## Compatibility
    
    | Subcommand | Redis | Kvrocks | Note |
    | --- | --- | --- | --- |
    | `LATENCY HISTOGRAM [cmd ...]` | 7.0+ | **Yes** | Cumulative
    distribution, `histogram_usec` field, empty buckets skipped |
    | `LATENCY HELP` | 7.0+ | **Yes** | |
    | `LATENCY RESET [event ...]` | 2.8.13+ | **Yes** | Always returns 0 (no
    spike-sampling infrastructure) |
    | `LATENCY LATEST` | 2.8.13+ | No | Requires spike-sampling event system
    |
    | `LATENCY HISTORY event` | 2.8.13+ | No | Requires spike-sampling event
    system |
    | `LATENCY GRAPH event` | 2.8.13+ | No | Requires spike-sampling event
    system |
    | `LATENCY DOCTOR` | 2.8.13+ | No | Requires spike-sampling event system
    |
    
    ## Configuration Prerequisite
    Histograms are only tracked when `histogram-bucket-boundaries` is set in
    `kvrocks.conf`. Example:
    ```
    histogram-bucket-boundaries 1,5,10,25,50,100,250,500,1000,2500,5000,10000
    ```
    If empty (default), `LATENCY HISTOGRAM` returns an empty map — this is
    expected behavior, not an error.
    
    ## RESP Protocol Details
    - Uses RESP3 Map (`%`) via `conn->HeaderOfMap()`, which auto-degrades to
    interleaved RESP2 arrays.
    - Per-command entry structure:
      ```
      command_name => {
        "calls"          => (integer) N,
        "histogram_usec" => {
          boundary_1 => (integer) cumulative_count_1,
          boundary_2 => (integer) cumulative_count_2,
          ...
          "inf"      => (integer) total_calls
        }
      }
      ```
    
    Co-authored-by: gongna-au <[email protected]>
    Co-authored-by: Sisyphus <[email protected]>
    Co-authored-by: 纪华裕 <[email protected]>
---
 src/commands/cmd_server.cc                |  99 +++++++++++++-
 tests/gocase/unit/latency/latency_test.go | 217 ++++++++++++++++++++++++++++++
 2 files changed, 315 insertions(+), 1 deletion(-)

diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index d5f0d75ca..35a85c442 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1627,6 +1627,102 @@ class CommandFlushBlockCache : public Commander {
   }
 };
 
+class CommandLatency : public Commander {
+ public:
+  Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, 
Connection *conn, std::string *output) override {
+    if (args_.size() < 2) {
+      return {Status::RedisParseErr, errWrongNumOfArguments};
+    }
+
+    std::string subcommand = util::ToLower(args_[1]);
+    if (subcommand == "histogram") {
+      return getHistogram(srv, conn, output);
+    } else if (subcommand == "reset") {
+      *output = redis::Integer(0);
+      return Status::OK();
+    } else if (subcommand == "help") {
+      std::vector<std::string> help = {
+          "HELP",
+          "    Print this help message.",
+          "HISTOGRAM [command ...]",
+          "    Return a cumulative distribution of latencies in the format of 
a histogram for the specified "
+          "command(s).",
+          "    If no commands are specified, all commands with latency 
statistics are returned.",
+          "RESET [event ...]",
+          "    Return the number of reset event classes (Kvrocks has no 
spike-sampling infrastructure, always returns "
+          "0)."};
+      *output = ArrayOfBulkStrings(help);
+      return Status::OK();
+    }
+
+    return {Status::RedisParseErr, "Unknown LATENCY subcommand or wrong number 
of arguments"};
+  }
+
+ private:
+  Status getHistogram(Server *srv, Connection *conn, std::string *output) {
+    if (srv->stats.bucket_boundaries.empty()) {
+      *output = conn->HeaderOfMap(0);
+      return Status::OK();
+    }
+
+    std::vector<const std::pair<const std::string, CommandHistogram> *> 
target_histograms;
+    if (args_.size() > 2) {
+      for (size_t i = 2; i < args_.size(); i++) {
+        auto it = srv->stats.commands_histogram.find(util::ToLower(args_[i]));
+        if (it != srv->stats.commands_histogram.end() && it->second.calls > 0) 
{
+          target_histograms.push_back(&(*it));
+        }
+      }
+    } else {
+      for (const auto &iter : srv->stats.commands_histogram) {
+        if (iter.second.calls > 0) {
+          target_histograms.push_back(&iter);
+        }
+      }
+    }
+
+    *output = conn->HeaderOfMap(target_histograms.size());
+    for (const auto *pair_ptr : target_histograms) {
+      const auto &cmd_name = pair_ptr->first;
+      const auto &hist = pair_ptr->second;
+
+      std::vector<std::pair<int64_t, uint64_t>> cumulative_buckets;
+      uint64_t cumulative = 0;
+      for (size_t i = 0; i < hist.buckets.size(); i++) {
+        cumulative += hist.buckets[i]->load(std::memory_order_relaxed);
+        if (cumulative == 0) continue;
+
+        int64_t boundary = 0;
+        if (i < srv->stats.bucket_boundaries.size()) {
+          boundary = static_cast<int64_t>(srv->stats.bucket_boundaries[i]);
+        } else {
+          boundary = -1;
+        }
+        cumulative_buckets.emplace_back(boundary, cumulative);
+      }
+
+      *output += redis::BulkString(cmd_name);
+      *output += conn->HeaderOfMap(2);
+
+      *output += redis::BulkString("calls");
+      *output += redis::Integer(hist.calls.load(std::memory_order_relaxed));
+
+      *output += redis::BulkString("histogram_usec");
+      *output += conn->HeaderOfMap(cumulative_buckets.size());
+      for (const auto &[boundary, count] : cumulative_buckets) {
+        if (boundary < 0) {
+          *output += redis::BulkString("inf");
+        } else {
+          *output += redis::Integer(boundary);
+        }
+        *output += redis::Integer(count);
+      }
+    }
+
+    return Status::OK();
+  }
+};
+
 REDIS_REGISTER_COMMANDS(
     Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loading auth", 
NO_KEY),
     MakeCmdAttr<CommandPing>("ping", -1, "read-only", NO_KEY),
@@ -1670,5 +1766,6 @@ REDIS_REGISTER_COMMANDS(
     MakeCmdAttr<CommandPollUpdates>("pollupdates", -2, "read-only admin", 
NO_KEY),
     MakeCmdAttr<CommandSST>("sst", -3, "write exclusive admin", 1, 1, 1),
     MakeCmdAttr<CommandFlushMemTable>("flushmemtable", -1, "exclusive write", 
NO_KEY),
-    MakeCmdAttr<CommandFlushBlockCache>("flushblockcache", 1, "exclusive 
write", NO_KEY), )
+    MakeCmdAttr<CommandFlushBlockCache>("flushblockcache", 1, "exclusive 
write", NO_KEY),
+    MakeCmdAttr<CommandLatency>("latency", -2, "read-only admin", NO_KEY), )
 }  // namespace redis
diff --git a/tests/gocase/unit/latency/latency_test.go 
b/tests/gocase/unit/latency/latency_test.go
new file mode 100644
index 000000000..ca8b58f96
--- /dev/null
+++ b/tests/gocase/unit/latency/latency_test.go
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package latency
+
+import (
+       "context"
+       "fmt"
+       "testing"
+
+       "github.com/apache/kvrocks/tests/gocase/util"
+       "github.com/stretchr/testify/require"
+)
+
+func isEmptyResult(val interface{}) bool {
+       switch v := val.(type) {
+       case []interface{}:
+               return len(v) == 0
+       case map[interface{}]interface{}:
+               return len(v) == 0
+       default:
+               return val == nil
+       }
+}
+
+func TestLatencyHelp(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       t.Run("LATENCY HELP returns help text", func(t *testing.T) {
+               result, err := rdb.Do(ctx, "LATENCY", "HELP").StringSlice()
+               require.NoError(t, err)
+               require.NotEmpty(t, result)
+               require.Contains(t, result[0], "HELP")
+       })
+
+       t.Run("LATENCY HELP is case-insensitive", func(t *testing.T) {
+               result, err := rdb.Do(ctx, "latency", "help").StringSlice()
+               require.NoError(t, err)
+               require.NotEmpty(t, result)
+       })
+}
+
+func TestLatencyReset(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       t.Run("LATENCY RESET always returns 0", func(t *testing.T) {
+               val, err := rdb.Do(ctx, "LATENCY", "RESET").Int64()
+               require.NoError(t, err)
+               require.EqualValues(t, 0, val)
+       })
+
+       t.Run("LATENCY RESET with event arguments still returns 0", func(t 
*testing.T) {
+               val, err := rdb.Do(ctx, "LATENCY", "RESET", "event1", 
"event2").Int64()
+               require.NoError(t, err)
+               require.EqualValues(t, 0, val)
+       })
+}
+
+func TestLatencyErrors(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       t.Run("LATENCY without subcommand returns error", func(t *testing.T) {
+               err := rdb.Do(ctx, "LATENCY").Err()
+               require.Error(t, err)
+       })
+
+       t.Run("LATENCY with unknown subcommand returns error", func(t 
*testing.T) {
+               err := rdb.Do(ctx, "LATENCY", "UNKNOWN").Err()
+               require.Error(t, err)
+               require.Contains(t, err.Error(), "Unknown LATENCY subcommand")
+       })
+}
+
+func TestLatencyHistogramWithoutBuckets(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{})
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       t.Run("LATENCY HISTOGRAM returns empty when histogram-bucket-boundaries 
is not set", func(t *testing.T) {
+               val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM").Result()
+               require.NoError(t, err)
+               require.True(t, isEmptyResult(val))
+       })
+}
+
+func TestLatencyHistogram(t *testing.T) {
+       srv := util.StartServer(t, map[string]string{
+               "histogram-bucket-boundaries": "10,20,30,50,100,200,500,1000",
+       })
+       defer srv.Close()
+
+       ctx := context.Background()
+       rdb := srv.NewClient()
+       defer func() { require.NoError(t, rdb.Close()) }()
+
+       t.Run("LATENCY HISTOGRAM returns data after running commands", func(t 
*testing.T) {
+               for i := 0; i < 10; i++ {
+                       require.NoError(t, rdb.Set(ctx, fmt.Sprintf("key-%d", 
i), "value", 0).Err())
+               }
+               for i := 0; i < 10; i++ {
+                       require.NoError(t, rdb.Get(ctx, fmt.Sprintf("key-%d", 
i)).Err())
+               }
+
+               val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM").Result()
+               require.NoError(t, err)
+               require.False(t, isEmptyResult(val))
+       })
+
+       t.Run("LATENCY HISTOGRAM filters by command name", func(t *testing.T) {
+               for i := 0; i < 5; i++ {
+                       require.NoError(t, rdb.Set(ctx, 
fmt.Sprintf("filter-key-%d", i), "value", 0).Err())
+               }
+
+               val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", "set").Result()
+               require.NoError(t, err)
+               require.False(t, isEmptyResult(val))
+       })
+
+       t.Run("LATENCY HISTOGRAM for nonexistent command returns empty", func(t 
*testing.T) {
+               val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", 
"nonexistent_command").Result()
+               require.NoError(t, err)
+               require.True(t, isEmptyResult(val))
+       })
+
+       t.Run("LATENCY HISTOGRAM skips commands with zero calls", func(t 
*testing.T) {
+               // hset has never been called in this test, so it should have 
calls==0
+               val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", "hset").Result()
+               require.NoError(t, err)
+               require.True(t, isEmptyResult(val))
+       })
+
+       t.Run("LATENCY HISTOGRAM is case-insensitive for command names", func(t 
*testing.T) {
+               valLower, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", 
"set").Result()
+               require.NoError(t, err)
+
+               valUpper, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", 
"SET").Result()
+               require.NoError(t, err)
+
+               require.False(t, isEmptyResult(valLower))
+               require.False(t, isEmptyResult(valUpper))
+       })
+
+       t.Run("LATENCY HISTOGRAM with multiple command filters", func(t 
*testing.T) {
+               require.NoError(t, rdb.Set(ctx, "multi-key", "value", 0).Err())
+               _, err := rdb.Get(ctx, "multi-key").Result()
+               require.NoError(t, err)
+
+               val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", "set", 
"get").Result()
+               require.NoError(t, err)
+               require.False(t, isEmptyResult(val))
+       })
+
+       t.Run("LATENCY HISTOGRAM contains calls and histogram_usec fields", 
func(t *testing.T) {
+               require.NoError(t, rdb.Set(ctx, "field-check-key", "value", 
0).Err())
+
+               val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", "set").Result()
+               require.NoError(t, err)
+
+               // RESP2 returns flat array: ["set", ["calls", N, 
"histogram_usec", [...]]]
+               // RESP3 returns map: {"set": {"calls": N, "histogram_usec": 
{...}}}
+               switch v := val.(type) {
+               case []interface{}:
+                       require.GreaterOrEqual(t, len(v), 2)
+                       innerSlice, ok := v[1].([]interface{})
+                       require.True(t, ok)
+                       require.GreaterOrEqual(t, len(innerSlice), 4)
+                       require.Equal(t, "calls", innerSlice[0])
+                       calls, ok := innerSlice[1].(int64)
+                       require.True(t, ok)
+                       require.Greater(t, calls, int64(0))
+               case map[interface{}]interface{}:
+                       setData, ok := v["set"]
+                       require.True(t, ok)
+                       innerMap, ok := setData.(map[interface{}]interface{})
+                       require.True(t, ok)
+                       calls, ok := innerMap["calls"]
+                       require.True(t, ok)
+                       require.NotNil(t, calls)
+               default:
+                       t.Fatalf("unexpected result type: %T", val)
+               }
+       })
+}

Reply via email to