This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 5fef170dc fix(go): update stats field and its deserialization logic to
align with rust SDK (#3032)
5fef170dc is described below
commit 5fef170dc7eaa48254265fb1f958e1741f9683e1
Author: Chengxi Luo <[email protected]>
AuthorDate: Tue Apr 14 03:08:52 2026 -0400
fix(go): update stats field and its deserialization logic to align with
rust SDK (#3032)
---
examples/go/go.sum | 2 +
.../go/binary_serialization/stats_serializer.go | 90 ---------
.../binary_serialization/stats_serializer_test.go | 127 -------------
foreign/go/client/tcp/tcp_utilities.go | 7 +-
foreign/go/contracts/stats.go | 141 +++++++++++---
foreign/go/contracts/stats_test.go | 211 +++++++++++++++++++++
foreign/go/go.mod | 1 +
7 files changed, 336 insertions(+), 243 deletions(-)
diff --git a/examples/go/go.sum b/examples/go/go.sum
index 7c18edbda..571ad119b 100644
--- a/examples/go/go.sum
+++ b/examples/go/go.sum
@@ -2,6 +2,8 @@ github.com/avast/retry-go/v5 v5.0.0
h1:kf1Qc2UsTZ4qq8elDymqfbISvkyMuhgRxuJqX2NHP
github.com/avast/retry-go/v5 v5.0.0/go.mod
h1://d+usmKWio1agtZfS1H/ltTqwtIfBnRq9zEwjc3eH8=
github.com/davecgh/go-spew v1.1.1
h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod
h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
+github.com/google/go-cmp v0.7.0/go.mod
h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0=
github.com/google/uuid v1.6.0/go.mod
h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/klauspost/compress v1.18.5
h1:/h1gH5Ce+VWNLSWqPzOVn6XBO+vJbCNGvjoaGBFW2IE=
diff --git a/foreign/go/binary_serialization/stats_serializer.go
b/foreign/go/binary_serialization/stats_serializer.go
deleted file mode 100644
index 101cc0730..000000000
--- a/foreign/go/binary_serialization/stats_serializer.go
+++ /dev/null
@@ -1,90 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package binaryserialization
-
-import (
- "encoding/binary"
- "math"
-
- iggcon "github.com/apache/iggy/foreign/go/contracts"
-)
-
-type TcpStats struct {
- iggcon.Stats
-}
-
-// Constants for byte positions and lengths in the payload.
-const (
- processIDPos = 0
- cpuUsagePos = 4
- totalCpuUsagePos = 8
- memoryUsagePos = 12
- totalMemoryPos = 20
- availableMemoryPos = 28
- runTimePos = 36
- startTimePos = 44
- readBytesPos = 52
- writtenBytesPos = 60
- messagesSizeBytesPos = 68
- streamsCountPos = 76
- topicsCountPos = 80
- partitionsCountPos = 84
- segmentsCountPos = 88
- messagesCountPos = 92
- clientsCountPos = 100
- consumerGroupsCountPos = 104
-)
-
-func (stats *TcpStats) Deserialize(payload []byte) error {
- stats.ProcessId = binary.LittleEndian.Uint32(payload[processIDPos :
processIDPos+4])
- stats.CpuUsage =
math.Float32frombits(binary.LittleEndian.Uint32(payload[cpuUsagePos :
cpuUsagePos+4]))
- stats.TotalCpuUsage =
math.Float32frombits(binary.LittleEndian.Uint32(payload[totalCpuUsagePos :
totalCpuUsagePos+4]))
- stats.MemoryUsage = binary.LittleEndian.Uint64(payload[memoryUsagePos :
memoryUsagePos+8])
- stats.TotalMemory = binary.LittleEndian.Uint64(payload[totalMemoryPos :
totalMemoryPos+8])
- stats.AvailableMemory =
binary.LittleEndian.Uint64(payload[availableMemoryPos : availableMemoryPos+8])
- stats.RunTime = binary.LittleEndian.Uint64(payload[runTimePos :
runTimePos+8])
- stats.StartTime = binary.LittleEndian.Uint64(payload[startTimePos :
startTimePos+8])
- stats.ReadBytes = binary.LittleEndian.Uint64(payload[readBytesPos :
readBytesPos+8])
- stats.WrittenBytes = binary.LittleEndian.Uint64(payload[writtenBytesPos
: writtenBytesPos+8])
- stats.MessagesSizeBytes =
binary.LittleEndian.Uint64(payload[messagesSizeBytesPos :
messagesSizeBytesPos+8])
- stats.StreamsCount = binary.LittleEndian.Uint32(payload[streamsCountPos
: streamsCountPos+4])
- stats.TopicsCount = binary.LittleEndian.Uint32(payload[topicsCountPos :
topicsCountPos+4])
- stats.PartitionsCount =
binary.LittleEndian.Uint32(payload[partitionsCountPos : partitionsCountPos+4])
- stats.SegmentsCount =
binary.LittleEndian.Uint32(payload[segmentsCountPos : segmentsCountPos+4])
- stats.MessagesCount =
binary.LittleEndian.Uint64(payload[messagesCountPos : messagesCountPos+8])
- stats.ClientsCount = binary.LittleEndian.Uint32(payload[clientsCountPos
: clientsCountPos+4])
- stats.ConsumerGroupsCount =
binary.LittleEndian.Uint32(payload[consumerGroupsCountPos :
consumerGroupsCountPos+4])
-
- position := consumerGroupsCountPos + 4
- hostnameLength := int(binary.LittleEndian.Uint32(payload[position :
position+4]))
- stats.Hostname = string(payload[position+4 : position+4+hostnameLength])
- position += 4 + hostnameLength
-
- osNameLength := int(binary.LittleEndian.Uint32(payload[position :
position+4]))
- stats.OsName = string(payload[position+4 : position+4+osNameLength])
- position += 4 + osNameLength
-
- osVersionLength := int(binary.LittleEndian.Uint32(payload[position :
position+4]))
- stats.OsVersion = string(payload[position+4 :
position+4+osVersionLength])
- position += 4 + osVersionLength
-
- kernelVersionLength := int(binary.LittleEndian.Uint32(payload[position
: position+4]))
- stats.KernelVersion = string(payload[position+4 :
position+4+kernelVersionLength])
-
- return nil
-}
diff --git a/foreign/go/binary_serialization/stats_serializer_test.go
b/foreign/go/binary_serialization/stats_serializer_test.go
deleted file mode 100644
index 2e5004ce2..000000000
--- a/foreign/go/binary_serialization/stats_serializer_test.go
+++ /dev/null
@@ -1,127 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package binaryserialization
-
-import (
- "testing"
-)
-
-func TestDeserialize(t *testing.T) {
- payload2 := []byte{
- 240, 68, 0, 0, //"process_id": 17648,
- 124, 202, 146, 58, //"cpu_usage": 0.0011199261,
- 124, 202, 146, 58, //"total_cpu_usage": 0.0011199261,
- 0, 224, 66, 3, 0, 0, 0, 0, //"memory_usage": 54714368,
- 0, 32, 123, 208, 7, 0, 0, 0, //"total_memory": 33562501120,
- 0, 224, 28, 253, 4, 0, 0, 0, //"available_memory": 21426397184,
- 169, 13, 0, 0, 0, 0, 0, 0, //"run_time": 3497,
- 239, 201, 6, 101, 0, 0, 0, 0, //"start_time": 1694943727,
- 0, 0, 0, 0, 0, 0, 0, 0, //"read_bytes": 0,
- 0, 128, 8, 0, 0, 0, 0, 0, //"written_bytes": 557056,
- 138, 2, 0, 0, 0, 0, 0, 0, //"messages_size_bytes": 650,
- 1, 0, 0, 0, //"streams_count": 1,
- 1, 0, 0, 0, //"topics_count": 1,
- 12, 0, 0, 0, //"partitions_count": 12,
- 12, 0, 0, 0, //"segments_count": 12,
- 4, 0, 0, 0, 0, 0, 0, 0, //"messages_count": 4,
- 11, 0, 0, 0, //"clients_count": 11,
- 0, 0, 0, 0, //"consumer_groups_count": 0,
- 6, 0, 0, 0, //hostname length: 6
- 112, 111, 112, 45, 111, 115, //"hostname": "pop-os",
- 7, 0, 0, 0, //os name length :7
- 80, 111, 112, 33, 95, 79, 83, //"os_name": "Pop!_OS",
- 19, 0, 0, 0, //os version length: 19
- 76, 105, 110, 117, 120, 32, 50, 50, 46, 48, 52, 32, 80, 111,
112, 33, 95, 79, 83, //"os_version": "Linux 22.04 Pop!_OS",
- 22, 0, 0, 0, //kernel version length: 22
- 54, 46, 52, 46, 54, 45, 55, 54, 48, 54, 48, 52, 48, 54, 45,
103, 101, 110, 101, 114, 105, 99, //"kernel_version": "6.4.6-76060406-generic"
- }
-
- // Create a TcpStats object and deserialize the payload
- var stats TcpStats
- err := stats.Deserialize(payload2)
-
- // Check if there was an error during deserialization
- if err != nil {
- t.Errorf("Deserialization error: %v", err)
- }
-
- // Verify the deserialized values
- if stats.ProcessId != 17648 {
- t.Errorf("ProcessId is incorrect. Expected: 17648, Got: %d",
stats.ProcessId)
- }
- if stats.CpuUsage != 0.0011199261 {
- t.Errorf("CPUUsage is incorrect. Expected: 0.0011199261, Got:
%f", stats.CpuUsage)
- }
- if stats.MemoryUsage != 54714368 {
- t.Errorf("MemoryUsage is incorrect. Expected: 54714368, Got:
%d", stats.MemoryUsage)
- }
- if stats.TotalMemory != 33562501120 {
- t.Errorf("TotalMemory is incorrect. Expected: 33562501120, Got:
%d", stats.TotalMemory)
- }
- if stats.AvailableMemory != 21426397184 {
- t.Errorf("AvailableMemory is incorrect. Expected: 21426397184,
Got: %d", stats.AvailableMemory)
- }
- if stats.RunTime != 3497 {
- t.Errorf("RunTime is incorrect. Expected: 3497, Got: %d",
stats.RunTime)
- }
- if stats.StartTime != 1694943727 {
- t.Errorf("StartTime is incorrect. Expected: 1694943727, Got:
%d", stats.StartTime)
- }
- if stats.ReadBytes != 0 {
- t.Errorf("ReadBytes is incorrect. Expected: 0, Got: %d",
stats.ReadBytes)
- }
- if stats.WrittenBytes != 557056 {
- t.Errorf("WrittenBytes is incorrect. Expected: 557056, Got:
%d", stats.WrittenBytes)
- }
- if stats.MessagesSizeBytes != 650 {
- t.Errorf("MessagesSizeBytes is incorrect. Expected: 650, Got:
%d", stats.MessagesSizeBytes)
- }
- if stats.StreamsCount != 1 {
- t.Errorf("StreamsCount is incorrect. Expected: 1, Got: %d",
stats.StreamsCount)
- }
- if stats.TopicsCount != 1 {
- t.Errorf("TopicsCount is incorrect. Expected: 1, Got: %d",
stats.TopicsCount)
- }
- if stats.PartitionsCount != 12 {
- t.Errorf("PartitionsCount is incorrect. Expected: 12, Got: %d",
stats.PartitionsCount)
- }
- if stats.SegmentsCount != 12 {
- t.Errorf("SegmentsCount is incorrect. Expected: 12, Got: %d",
stats.SegmentsCount)
- }
- if stats.MessagesCount != 4 {
- t.Errorf("MessagesCount is incorrect. Expected: 4, Got: %d",
stats.MessagesCount)
- }
- if stats.ClientsCount != 11 {
- t.Errorf("ClientsCount is incorrect. Expected: 11, Got: %d",
stats.ClientsCount)
- }
- if stats.ConsumerGroupsCount != 0 {
- t.Errorf("ConsumerGroupsCount is incorrect. Expected: 0, Got:
%d", stats.ConsumerGroupsCount)
- }
- if stats.Hostname != "pop-os" {
- t.Errorf("Hostname is incorrect. Expected: \"pop-os\", Got:
\"%s\"", stats.Hostname)
- }
- if stats.OsName != "Pop!_OS" {
- t.Errorf("OsName is incorrect. Expected: \"Pop!_OS\", Got:
\"%s\"", stats.OsName)
- }
- if stats.OsVersion != "Linux 22.04 Pop!_OS" {
- t.Errorf("OsVersion is incorrect. Expected: \"Linux 22.04
Pop!_OS\", Got: \"%s\"", stats.OsVersion)
- }
- if stats.KernelVersion != "6.4.6-76060406-generic" {
- t.Errorf("KernelVersion is incorrect. Expected:
\"6.4.6-76060406-generic\", Got: \"%s\"", stats.KernelVersion)
- }
-}
diff --git a/foreign/go/client/tcp/tcp_utilities.go
b/foreign/go/client/tcp/tcp_utilities.go
index 0da1784cb..f3338adec 100644
--- a/foreign/go/client/tcp/tcp_utilities.go
+++ b/foreign/go/client/tcp/tcp_utilities.go
@@ -18,7 +18,6 @@
package tcp
import (
- binaryserialization
"github.com/apache/iggy/foreign/go/binary_serialization"
iggcon "github.com/apache/iggy/foreign/go/contracts"
"github.com/apache/iggy/foreign/go/internal/command"
)
@@ -29,10 +28,10 @@ func (c *IggyTcpClient) GetStats() (*iggcon.Stats, error) {
return nil, err
}
- stats := &binaryserialization.TcpStats{}
- err = stats.Deserialize(buffer)
+ s := &iggcon.Stats{}
+ err = s.UnmarshalBinary(buffer)
- return &stats.Stats, err
+ return s, err
}
func (c *IggyTcpClient) Ping() error {
diff --git a/foreign/go/contracts/stats.go b/foreign/go/contracts/stats.go
index 1409b01e5..df89a4a1a 100644
--- a/foreign/go/contracts/stats.go
+++ b/foreign/go/contracts/stats.go
@@ -17,27 +17,124 @@
package iggcon
+import (
+ "fmt"
+
+ "github.com/apache/iggy/foreign/go/internal/codec"
+)
+
+// CacheMetrics holds cache hit/miss statistics for a single partition.
+type CacheMetrics struct {
+ StreamId uint32 `json:"stream_id"`
+ TopicId uint32 `json:"topic_id"`
+ PartitionId uint32 `json:"partition_id"`
+ Hits uint64 `json:"hits"`
+ Misses uint64 `json:"misses"`
+ HitRatio float32 `json:"hit_ratio"`
+}
+
+// cacheMetricsWireSize is the fixed size of a CacheMetrics entry on the wire:
+// stream_id(4) + topic_id(4) + partition_id(4) + hits(8) + misses(8) +
hit_ratio(4) = 32.
+const cacheMetricsWireSize = 4 + 4 + 4 + 8 + 8 + 4
+
type Stats struct {
- ProcessId uint32 `json:"process_id"`
- CpuUsage float32 `json:"cpu_usage"`
- TotalCpuUsage float32 `json:"total_cpu_usage"`
- MemoryUsage uint64 `json:"memory_usage"`
- TotalMemory uint64 `json:"total_memory"`
- AvailableMemory uint64 `json:"available_memory"`
- RunTime uint64 `json:"run_time"`
- StartTime uint64 `json:"start_time"`
- ReadBytes uint64 `json:"read_bytes"`
- WrittenBytes uint64 `json:"written_bytes"`
- MessagesSizeBytes uint64 `json:"messages_size_bytes"`
- StreamsCount uint32 `json:"streams_count"`
- TopicsCount uint32 `json:"topics_count"`
- PartitionsCount uint32 `json:"partitions_count"`
- SegmentsCount uint32 `json:"segments_count"`
- MessagesCount uint64 `json:"messages_count"`
- ClientsCount uint32 `json:"clients_count"`
- ConsumerGroupsCount uint32 `json:"consumer_groups_count"`
- Hostname string `json:"hostname"`
- OsName string `json:"os_name"`
- OsVersion string `json:"os_version"`
- KernelVersion string `json:"kernel_version"`
+ ProcessId uint32 `json:"process_id"`
+ CpuUsage float32 `json:"cpu_usage"`
+ TotalCpuUsage float32 `json:"total_cpu_usage"`
+ MemoryUsage uint64 `json:"memory_usage"`
+ TotalMemory uint64 `json:"total_memory"`
+ AvailableMemory uint64 `json:"available_memory"`
+ RunTime uint64 `json:"run_time"`
+ StartTime uint64 `json:"start_time"`
+ ReadBytes uint64 `json:"read_bytes"`
+ WrittenBytes uint64 `json:"written_bytes"`
+ MessagesSizeBytes uint64 `json:"messages_size_bytes"`
+ StreamsCount uint32 `json:"streams_count"`
+ TopicsCount uint32 `json:"topics_count"`
+ PartitionsCount uint32 `json:"partitions_count"`
+ SegmentsCount uint32 `json:"segments_count"`
+ MessagesCount uint64 `json:"messages_count"`
+ ClientsCount uint32 `json:"clients_count"`
+ ConsumerGroupsCount uint32 `json:"consumer_groups_count"`
+ Hostname string `json:"hostname"`
+ OsName string `json:"os_name"`
+ OsVersion string `json:"os_version"`
+ KernelVersion string `json:"kernel_version"`
+ IggyServerVersion string `json:"iggy_server_version"`
+ IggyServerSemver uint32 `json:"iggy_server_semver"`
+ CacheMetrics []CacheMetrics `json:"cache_metrics"`
+ ThreadsCount uint32 `json:"threads_count"`
+ FreeDiskSpace uint64 `json:"free_disk_space"`
+ TotalDiskSpace uint64 `json:"total_disk_space"`
+}
+
+func (cm *CacheMetrics) MarshalBinary() ([]byte, error) {
+ w := codec.NewWriterCap(32)
+ w.U32(cm.StreamId)
+ w.U32(cm.TopicId)
+ w.U32(cm.PartitionId)
+ w.U64(cm.Hits)
+ w.U64(cm.Misses)
+ w.F32(cm.HitRatio)
+ return w.Bytes(), w.Err()
+}
+
+func (cm *CacheMetrics) UnmarshalBinary(data []byte) error {
+ r := codec.NewReader(data)
+ cm.StreamId = r.U32()
+ cm.TopicId = r.U32()
+ cm.PartitionId = r.U32()
+ cm.Hits = r.U64()
+ cm.Misses = r.U64()
+ cm.HitRatio = r.F32()
+ return r.Err()
+}
+
+func (s *Stats) UnmarshalBinary(payload []byte) error {
+ r := codec.NewReader(payload)
+ s.ProcessId = r.U32()
+ s.CpuUsage = r.F32()
+ s.TotalCpuUsage = r.F32()
+ s.MemoryUsage = r.U64()
+ s.TotalMemory = r.U64()
+ s.AvailableMemory = r.U64()
+ s.RunTime = r.U64()
+ s.StartTime = r.U64()
+ s.ReadBytes = r.U64()
+ s.WrittenBytes = r.U64()
+ s.MessagesSizeBytes = r.U64()
+ s.StreamsCount = r.U32()
+ s.TopicsCount = r.U32()
+ s.PartitionsCount = r.U32()
+ s.SegmentsCount = r.U32()
+ s.MessagesCount = r.U64()
+ s.ClientsCount = r.U32()
+ s.ConsumerGroupsCount = r.U32()
+ s.Hostname = r.U32LenStr()
+ s.OsName = r.U32LenStr()
+ s.OsVersion = r.U32LenStr()
+ s.KernelVersion = r.U32LenStr()
+ s.IggyServerVersion = r.U32LenStr()
+ s.IggyServerSemver = r.U32()
+ cacheCount := int(r.U32())
+ if r.Err() != nil {
+ return r.Err()
+ }
+
+ if cacheCount > r.Remaining()/cacheMetricsWireSize {
+ return fmt.Errorf("stats: cache metrics count %d exceeds
remaining bytes %d", cacheCount, r.Remaining())
+ }
+
+ if cacheCount > 0 {
+ s.CacheMetrics = make([]CacheMetrics, cacheCount)
+ for i := range s.CacheMetrics {
+ r.Obj(cacheMetricsWireSize, &s.CacheMetrics[i])
+ }
+ }
+
+ s.ThreadsCount = r.U32()
+ s.FreeDiskSpace = r.U64()
+ s.TotalDiskSpace = r.U64()
+
+ return r.Err()
}
diff --git a/foreign/go/contracts/stats_test.go
b/foreign/go/contracts/stats_test.go
new file mode 100644
index 000000000..1e50790aa
--- /dev/null
+++ b/foreign/go/contracts/stats_test.go
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package iggcon
+
+import (
+ "encoding/binary"
+ "math"
+ "strings"
+ "testing"
+
+ "github.com/apache/iggy/foreign/go/internal/codec"
+ "github.com/google/go-cmp/cmp"
+)
+
+// buildStatsPayload constructs a wire payload for Stats using the same
+// layout as the Rust binary_protocol StatsResponse encoder.
+func buildStatsPayload(t *testing.T, s Stats) []byte {
+ t.Helper()
+ w := codec.NewWriter()
+ w.U32(s.ProcessId)
+ w.F32(s.CpuUsage)
+ w.F32(s.TotalCpuUsage)
+ w.U64(s.MemoryUsage)
+ w.U64(s.TotalMemory)
+ w.U64(s.AvailableMemory)
+ w.U64(s.RunTime)
+ w.U64(s.StartTime)
+ w.U64(s.ReadBytes)
+ w.U64(s.WrittenBytes)
+ w.U64(s.MessagesSizeBytes)
+ w.U32(s.StreamsCount)
+ w.U32(s.TopicsCount)
+ w.U32(s.PartitionsCount)
+ w.U32(s.SegmentsCount)
+ w.U64(s.MessagesCount)
+ w.U32(s.ClientsCount)
+ w.U32(s.ConsumerGroupsCount)
+ w.U32LenStr(s.Hostname)
+ w.U32LenStr(s.OsName)
+ w.U32LenStr(s.OsVersion)
+ w.U32LenStr(s.KernelVersion)
+ w.U32LenStr(s.IggyServerVersion)
+
+ w.U32(s.IggyServerSemver)
+
+ w.U32(uint32(len(s.CacheMetrics)))
+ for i := range s.CacheMetrics {
+ w.Obj(&s.CacheMetrics[i])
+ }
+
+ w.U32(s.ThreadsCount)
+ w.U64(s.FreeDiskSpace)
+ w.U64(s.TotalDiskSpace)
+
+ if err := w.Err(); err != nil {
+ t.Fatalf("buildStatsPayload: %v", err)
+ }
+ return w.Bytes()
+}
+
+func sampleStats() Stats {
+ return Stats{
+ ProcessId: 1234,
+ CpuUsage: 25.5,
+ TotalCpuUsage: 50.0,
+ MemoryUsage: 1_073_741_824,
+ TotalMemory: 8_589_934_592,
+ AvailableMemory: 4_294_967_296,
+ RunTime: 3600,
+ StartTime: 1_710_000_000_000,
+ ReadBytes: 1_000_000,
+ WrittenBytes: 500_000,
+ MessagesSizeBytes: 2_000_000,
+ StreamsCount: 3,
+ TopicsCount: 10,
+ PartitionsCount: 30,
+ SegmentsCount: 90,
+ MessagesCount: 50_000,
+ ClientsCount: 5,
+ ConsumerGroupsCount: 2,
+ Hostname: "node-1",
+ OsName: "Linux",
+ OsVersion: "6.1",
+ KernelVersion: "6.1.0",
+ IggyServerVersion: "0.6.0",
+ IggyServerSemver: 600,
+ ThreadsCount: 16,
+ FreeDiskSpace: 107_374_182_400,
+ TotalDiskSpace: 512_110_190_592,
+ }
+}
+
+func assertStatsEqual(t *testing.T, got, want Stats) {
+ t.Helper()
+
+ if diff := cmp.Diff(want, got); diff != "" {
+ t.Errorf("Stats mismatch (-want +got):\n%s", diff)
+ }
+}
+
+func TestUnmarshalBinary(t *testing.T) {
+ want := sampleStats()
+ payload := buildStatsPayload(t, want)
+
+ var got Stats
+ if err := got.UnmarshalBinary(payload); err != nil {
+ t.Fatalf("UnmarshalBinary error: %v", err)
+ }
+
+ assertStatsEqual(t, got, want)
+}
+
+func TestUnmarshalBinary_maliciousCacheCount(t *testing.T) {
+ s := Stats{
+ Hostname: "h",
+ OsName: "o",
+ OsVersion: "s",
+ KernelVersion: "t",
+ IggyServerVersion: "v",
+ }
+ payload := buildStatsPayload(t, s)
+ // cache_metrics_count sits before the tail fields: threads(4) +
free_disk(8) + total_disk(8) = 20
+ cacheCountOffset := len(payload) - 20 - 4
+ binary.LittleEndian.PutUint32(payload[cacheCountOffset:],
math.MaxUint32)
+
+ var stats Stats
+ err := stats.UnmarshalBinary(payload)
+ if err == nil {
+ t.Fatal("expected error for oversized cache metrics count, got
nil")
+ }
+ if !strings.Contains(err.Error(), "cache metrics count") {
+ t.Errorf("unexpected error: got %v, want error mentioning cache
metrics count", err)
+ }
+}
+
+func TestUnmarshalBinary_withCacheMetrics(t *testing.T) {
+ want := sampleStats()
+ want.CacheMetrics = []CacheMetrics{
+ {StreamId: 1, TopicId: 1, PartitionId: 0, Hits: 1000, Misses:
50, HitRatio: 0.95238095},
+ {StreamId: 2, TopicId: 3, PartitionId: 1, Hits: 0, Misses: 100,
HitRatio: 0.0},
+ }
+ payload := buildStatsPayload(t, want)
+
+ var got Stats
+ if err := got.UnmarshalBinary(payload); err != nil {
+ t.Fatalf("UnmarshalBinary error: %v", err)
+ }
+
+ assertStatsEqual(t, got, want)
+}
+
+func TestUnmarshalBinary_truncatedFixedFields(t *testing.T) {
+ // Payload too short to contain all fixed fields.
+ payload := make([]byte, 10) // far too small
+ var stats Stats
+ if err := stats.UnmarshalBinary(payload); err == nil {
+ t.Fatal("expected error for truncated payload, got nil")
+ }
+}
+
+func TestUnmarshalBinary_truncatedAfterStrings(t *testing.T) {
+ // Build a valid payload then chop off everything after the semver
+ // so that reading cache count fails.
+ s := Stats{
+ Hostname: "h",
+ OsName: "o",
+ OsVersion: "v",
+ KernelVersion: "k",
+ IggyServerVersion: "s",
+ }
+ payload := buildStatsPayload(t, s)
+ // Remove tail fields + cache_metrics_count (4+4+8+8 = 24 bytes from
the end).
+ payload = payload[:len(payload)-24]
+
+ var stats Stats
+ if err := stats.UnmarshalBinary(payload); err == nil {
+ t.Fatal("expected error for truncated cache count, got nil")
+ }
+}
+
+func TestUnmarshalBinary_emptyStrings(t *testing.T) {
+ want := sampleStats()
+ want.Hostname = ""
+ want.OsName = ""
+ want.OsVersion = ""
+ want.KernelVersion = ""
+ want.IggyServerVersion = ""
+ payload := buildStatsPayload(t, want)
+
+ var got Stats
+ if err := got.UnmarshalBinary(payload); err != nil {
+ t.Fatalf("UnmarshalBinary error: %v", err)
+ }
+
+ assertStatsEqual(t, got, want)
+}
diff --git a/foreign/go/go.mod b/foreign/go/go.mod
index 143aac228..88aa68b39 100644
--- a/foreign/go/go.mod
+++ b/foreign/go/go.mod
@@ -4,6 +4,7 @@ go 1.25.0
require (
github.com/avast/retry-go/v5 v5.0.0
+ github.com/google/go-cmp v0.7.0
github.com/google/uuid v1.6.0
github.com/klauspost/compress v1.18.5
github.com/stretchr/testify v1.11.1