This is an automated email from the ASF dual-hosted git repository.
git-hulk pushed a commit to branch unstable
in repository https://gitbox.apache.org/repos/asf/kvrocks.git
The following commit(s) were added to refs/heads/unstable by this push:
new 855bad3c1 feat(server): implement LATENCY command set (Redis 7.0+
compatibility) (#3461)
855bad3c1 is described below
commit 855bad3c13b6fd9fc9d8397ade8aa1a05dc747b5
Author: gongna-au <[email protected]>
AuthorDate: Mon Apr 27 21:20:35 2026 +0800
feat(server): implement LATENCY command set (Redis 7.0+ compatibility)
(#3461)
## Summary
This PR implements the `LATENCY` command for Kvrocks, focusing on the
`LATENCY HISTOGRAM` subcommand introduced in Redis 7.0. It leverages the
existing histogram infrastructure (`histogram-bucket-boundaries` config
+ `CommandHistogram` stats) to expose per-command latency distributions
via the standard Redis protocol.
## Changes
### New File: `src/commands/cmd_latency.cc`
Implements `CommandLatency` with three subcommands:
- **`LATENCY HISTOGRAM [command ...]`** — Returns a cumulative
distribution of command latencies.
- Output uses the Redis 7.0 format: outer map keyed by command name,
each value is a map with `calls` (integer) and `histogram_usec` (map of
boundary → cumulative count).
- Frequency counts from `CommandHistogram::buckets` are converted to
cumulative sums at output time.
- Empty leading buckets are skipped (matching Redis behavior of omitting
zero-count ranges).
- The last bucket (overflow) is rendered as `"inf"`.
- Returns an empty map if `histogram-bucket-boundaries` is not
configured.
- **`LATENCY RESET [event ...]`** — Returns `(integer) 0`.
- In Redis, this resets the spike-sampling event system (`LATENCY
LATEST`/`HISTORY`/`GRAPH`), which Kvrocks does not have. Per Redis docs,
histogram data is reset via `CONFIG RESETSTAT`, not `LATENCY RESET`.
Returning 0 correctly indicates no event classes were reset.
- **`LATENCY HELP`** — Returns usage information for all supported
subcommands.
### Build System
No changes needed. The file is automatically picked up by
`file(GLOB_RECURSE KVROCKS_SRCS src/*.cc)` in root `CMakeLists.txt`.
### Command Registration
Registered under `CommandCategory::Server` via
`REDIS_REGISTER_COMMANDS(Server, ...)`, same category as
`cmd_server.cc`. This is safe because the macro generates unique static
variables per translation unit using `__LINE__`.
## Compatibility
| Subcommand | Redis | Kvrocks | Note |
| --- | --- | --- | --- |
| `LATENCY HISTOGRAM [cmd ...]` | 7.0+ | **Yes** | Cumulative
distribution, `histogram_usec` field, empty buckets skipped |
| `LATENCY HELP` | 7.0+ | **Yes** | |
| `LATENCY RESET [event ...]` | 2.8.13+ | **Yes** | Always returns 0 (no
spike-sampling infrastructure) |
| `LATENCY LATEST` | 2.8.13+ | No | Requires spike-sampling event system
|
| `LATENCY HISTORY event` | 2.8.13+ | No | Requires spike-sampling event
system |
| `LATENCY GRAPH event` | 2.8.13+ | No | Requires spike-sampling event
system |
| `LATENCY DOCTOR` | 2.8.13+ | No | Requires spike-sampling event system
|
## Configuration Prerequisite
Histograms are only tracked when `histogram-bucket-boundaries` is set in
`kvrocks.conf`. Example:
```
histogram-bucket-boundaries 1,5,10,25,50,100,250,500,1000,2500,5000,10000
```
If empty (default), `LATENCY HISTOGRAM` returns an empty map — this is
expected behavior, not an error.
## RESP Protocol Details
- Uses RESP3 Map (`%`) via `conn->HeaderOfMap()`, which auto-degrades to
interleaved RESP2 arrays.
- Per-command entry structure:
```
command_name => {
"calls" => (integer) N,
"histogram_usec" => {
boundary_1 => (integer) cumulative_count_1,
boundary_2 => (integer) cumulative_count_2,
...
"inf" => (integer) total_calls
}
}
```
Co-authored-by: gongna-au <[email protected]>
Co-authored-by: Sisyphus <[email protected]>
Co-authored-by: 纪华裕 <[email protected]>
---
src/commands/cmd_server.cc | 99 +++++++++++++-
tests/gocase/unit/latency/latency_test.go | 217 ++++++++++++++++++++++++++++++
2 files changed, 315 insertions(+), 1 deletion(-)
diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc
index d5f0d75ca..35a85c442 100644
--- a/src/commands/cmd_server.cc
+++ b/src/commands/cmd_server.cc
@@ -1627,6 +1627,102 @@ class CommandFlushBlockCache : public Commander {
}
};
+class CommandLatency : public Commander {
+ public:
+ Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv,
Connection *conn, std::string *output) override {
+ if (args_.size() < 2) {
+ return {Status::RedisParseErr, errWrongNumOfArguments};
+ }
+
+ std::string subcommand = util::ToLower(args_[1]);
+ if (subcommand == "histogram") {
+ return getHistogram(srv, conn, output);
+ } else if (subcommand == "reset") {
+ *output = redis::Integer(0);
+ return Status::OK();
+ } else if (subcommand == "help") {
+ std::vector<std::string> help = {
+ "HELP",
+ " Print this help message.",
+ "HISTOGRAM [command ...]",
+ " Return a cumulative distribution of latencies in the format of
a histogram for the specified "
+ "command(s).",
+ " If no commands are specified, all commands with latency
statistics are returned.",
+ "RESET [event ...]",
+ " Return the number of reset event classes (Kvrocks has no
spike-sampling infrastructure, always returns "
+ "0)."};
+ *output = ArrayOfBulkStrings(help);
+ return Status::OK();
+ }
+
+ return {Status::RedisParseErr, "Unknown LATENCY subcommand or wrong number
of arguments"};
+ }
+
+ private:
+ Status getHistogram(Server *srv, Connection *conn, std::string *output) {
+ if (srv->stats.bucket_boundaries.empty()) {
+ *output = conn->HeaderOfMap(0);
+ return Status::OK();
+ }
+
+ std::vector<const std::pair<const std::string, CommandHistogram> *>
target_histograms;
+ if (args_.size() > 2) {
+ for (size_t i = 2; i < args_.size(); i++) {
+ auto it = srv->stats.commands_histogram.find(util::ToLower(args_[i]));
+ if (it != srv->stats.commands_histogram.end() && it->second.calls > 0)
{
+ target_histograms.push_back(&(*it));
+ }
+ }
+ } else {
+ for (const auto &iter : srv->stats.commands_histogram) {
+ if (iter.second.calls > 0) {
+ target_histograms.push_back(&iter);
+ }
+ }
+ }
+
+ *output = conn->HeaderOfMap(target_histograms.size());
+ for (const auto *pair_ptr : target_histograms) {
+ const auto &cmd_name = pair_ptr->first;
+ const auto &hist = pair_ptr->second;
+
+ std::vector<std::pair<int64_t, uint64_t>> cumulative_buckets;
+ uint64_t cumulative = 0;
+ for (size_t i = 0; i < hist.buckets.size(); i++) {
+ cumulative += hist.buckets[i]->load(std::memory_order_relaxed);
+ if (cumulative == 0) continue;
+
+ int64_t boundary = 0;
+ if (i < srv->stats.bucket_boundaries.size()) {
+ boundary = static_cast<int64_t>(srv->stats.bucket_boundaries[i]);
+ } else {
+ boundary = -1;
+ }
+ cumulative_buckets.emplace_back(boundary, cumulative);
+ }
+
+ *output += redis::BulkString(cmd_name);
+ *output += conn->HeaderOfMap(2);
+
+ *output += redis::BulkString("calls");
+ *output += redis::Integer(hist.calls.load(std::memory_order_relaxed));
+
+ *output += redis::BulkString("histogram_usec");
+ *output += conn->HeaderOfMap(cumulative_buckets.size());
+ for (const auto &[boundary, count] : cumulative_buckets) {
+ if (boundary < 0) {
+ *output += redis::BulkString("inf");
+ } else {
+ *output += redis::Integer(boundary);
+ }
+ *output += redis::Integer(count);
+ }
+ }
+
+ return Status::OK();
+ }
+};
+
REDIS_REGISTER_COMMANDS(
Server, MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loading auth",
NO_KEY),
MakeCmdAttr<CommandPing>("ping", -1, "read-only", NO_KEY),
@@ -1670,5 +1766,6 @@ REDIS_REGISTER_COMMANDS(
MakeCmdAttr<CommandPollUpdates>("pollupdates", -2, "read-only admin",
NO_KEY),
MakeCmdAttr<CommandSST>("sst", -3, "write exclusive admin", 1, 1, 1),
MakeCmdAttr<CommandFlushMemTable>("flushmemtable", -1, "exclusive write",
NO_KEY),
- MakeCmdAttr<CommandFlushBlockCache>("flushblockcache", 1, "exclusive
write", NO_KEY), )
+ MakeCmdAttr<CommandFlushBlockCache>("flushblockcache", 1, "exclusive
write", NO_KEY),
+ MakeCmdAttr<CommandLatency>("latency", -2, "read-only admin", NO_KEY), )
} // namespace redis
diff --git a/tests/gocase/unit/latency/latency_test.go
b/tests/gocase/unit/latency/latency_test.go
new file mode 100644
index 000000000..ca8b58f96
--- /dev/null
+++ b/tests/gocase/unit/latency/latency_test.go
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package latency
+
+import (
+ "context"
+ "fmt"
+ "testing"
+
+ "github.com/apache/kvrocks/tests/gocase/util"
+ "github.com/stretchr/testify/require"
+)
+
+func isEmptyResult(val interface{}) bool {
+ switch v := val.(type) {
+ case []interface{}:
+ return len(v) == 0
+ case map[interface{}]interface{}:
+ return len(v) == 0
+ default:
+ return val == nil
+ }
+}
+
+func TestLatencyHelp(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("LATENCY HELP returns help text", func(t *testing.T) {
+ result, err := rdb.Do(ctx, "LATENCY", "HELP").StringSlice()
+ require.NoError(t, err)
+ require.NotEmpty(t, result)
+ require.Contains(t, result[0], "HELP")
+ })
+
+ t.Run("LATENCY HELP is case-insensitive", func(t *testing.T) {
+ result, err := rdb.Do(ctx, "latency", "help").StringSlice()
+ require.NoError(t, err)
+ require.NotEmpty(t, result)
+ })
+}
+
+func TestLatencyReset(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("LATENCY RESET always returns 0", func(t *testing.T) {
+ val, err := rdb.Do(ctx, "LATENCY", "RESET").Int64()
+ require.NoError(t, err)
+ require.EqualValues(t, 0, val)
+ })
+
+ t.Run("LATENCY RESET with event arguments still returns 0", func(t
*testing.T) {
+ val, err := rdb.Do(ctx, "LATENCY", "RESET", "event1",
"event2").Int64()
+ require.NoError(t, err)
+ require.EqualValues(t, 0, val)
+ })
+}
+
+func TestLatencyErrors(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("LATENCY without subcommand returns error", func(t *testing.T) {
+ err := rdb.Do(ctx, "LATENCY").Err()
+ require.Error(t, err)
+ })
+
+ t.Run("LATENCY with unknown subcommand returns error", func(t
*testing.T) {
+ err := rdb.Do(ctx, "LATENCY", "UNKNOWN").Err()
+ require.Error(t, err)
+ require.Contains(t, err.Error(), "Unknown LATENCY subcommand")
+ })
+}
+
+func TestLatencyHistogramWithoutBuckets(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{})
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("LATENCY HISTOGRAM returns empty when histogram-bucket-boundaries
is not set", func(t *testing.T) {
+ val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM").Result()
+ require.NoError(t, err)
+ require.True(t, isEmptyResult(val))
+ })
+}
+
+func TestLatencyHistogram(t *testing.T) {
+ srv := util.StartServer(t, map[string]string{
+ "histogram-bucket-boundaries": "10,20,30,50,100,200,500,1000",
+ })
+ defer srv.Close()
+
+ ctx := context.Background()
+ rdb := srv.NewClient()
+ defer func() { require.NoError(t, rdb.Close()) }()
+
+ t.Run("LATENCY HISTOGRAM returns data after running commands", func(t
*testing.T) {
+ for i := 0; i < 10; i++ {
+ require.NoError(t, rdb.Set(ctx, fmt.Sprintf("key-%d",
i), "value", 0).Err())
+ }
+ for i := 0; i < 10; i++ {
+ require.NoError(t, rdb.Get(ctx, fmt.Sprintf("key-%d",
i)).Err())
+ }
+
+ val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM").Result()
+ require.NoError(t, err)
+ require.False(t, isEmptyResult(val))
+ })
+
+ t.Run("LATENCY HISTOGRAM filters by command name", func(t *testing.T) {
+ for i := 0; i < 5; i++ {
+ require.NoError(t, rdb.Set(ctx,
fmt.Sprintf("filter-key-%d", i), "value", 0).Err())
+ }
+
+ val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", "set").Result()
+ require.NoError(t, err)
+ require.False(t, isEmptyResult(val))
+ })
+
+ t.Run("LATENCY HISTOGRAM for nonexistent command returns empty", func(t
*testing.T) {
+ val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM",
"nonexistent_command").Result()
+ require.NoError(t, err)
+ require.True(t, isEmptyResult(val))
+ })
+
+ t.Run("LATENCY HISTOGRAM skips commands with zero calls", func(t
*testing.T) {
+ // hset has never been called in this test, so it should have
calls==0
+ val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", "hset").Result()
+ require.NoError(t, err)
+ require.True(t, isEmptyResult(val))
+ })
+
+ t.Run("LATENCY HISTOGRAM is case-insensitive for command names", func(t
*testing.T) {
+ valLower, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM",
"set").Result()
+ require.NoError(t, err)
+
+ valUpper, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM",
"SET").Result()
+ require.NoError(t, err)
+
+ require.False(t, isEmptyResult(valLower))
+ require.False(t, isEmptyResult(valUpper))
+ })
+
+ t.Run("LATENCY HISTOGRAM with multiple command filters", func(t
*testing.T) {
+ require.NoError(t, rdb.Set(ctx, "multi-key", "value", 0).Err())
+ _, err := rdb.Get(ctx, "multi-key").Result()
+ require.NoError(t, err)
+
+ val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", "set",
"get").Result()
+ require.NoError(t, err)
+ require.False(t, isEmptyResult(val))
+ })
+
+ t.Run("LATENCY HISTOGRAM contains calls and histogram_usec fields",
func(t *testing.T) {
+ require.NoError(t, rdb.Set(ctx, "field-check-key", "value",
0).Err())
+
+ val, err := rdb.Do(ctx, "LATENCY", "HISTOGRAM", "set").Result()
+ require.NoError(t, err)
+
+ // RESP2 returns flat array: ["set", ["calls", N,
"histogram_usec", [...]]]
+ // RESP3 returns map: {"set": {"calls": N, "histogram_usec":
{...}}}
+ switch v := val.(type) {
+ case []interface{}:
+ require.GreaterOrEqual(t, len(v), 2)
+ innerSlice, ok := v[1].([]interface{})
+ require.True(t, ok)
+ require.GreaterOrEqual(t, len(innerSlice), 4)
+ require.Equal(t, "calls", innerSlice[0])
+ calls, ok := innerSlice[1].(int64)
+ require.True(t, ok)
+ require.Greater(t, calls, int64(0))
+ case map[interface{}]interface{}:
+ setData, ok := v["set"]
+ require.True(t, ok)
+ innerMap, ok := setData.(map[interface{}]interface{})
+ require.True(t, ok)
+ calls, ok := innerMap["calls"]
+ require.True(t, ok)
+ require.NotNil(t, calls)
+ default:
+ t.Fatalf("unexpected result type: %T", val)
+ }
+ })
+}