This is an automated email from the ASF dual-hosted git repository.
piotr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 30fae4cef fix: stats semver wire alignment using a zero sentinel
(#3036)
30fae4cef is described below
commit 30fae4cefb03bf9230311089a502eb635a1a4af5
Author: Atharva Lade <[email protected]>
AuthorDate: Mon Mar 30 04:35:37 2026 -0500
fix: stats semver wire alignment using a zero sentinel (#3036)
Closes #2979
---
.../src/responses/system/get_stats.rs | 88 ++++++++++------------
1 file changed, 38 insertions(+), 50 deletions(-)
diff --git a/core/binary_protocol/src/responses/system/get_stats.rs
b/core/binary_protocol/src/responses/system/get_stats.rs
index 09923b6a8..3e84bd439 100644
--- a/core/binary_protocol/src/responses/system/get_stats.rs
+++ b/core/binary_protocol/src/responses/system/get_stats.rs
@@ -49,12 +49,12 @@ impl CacheMetricEntry {
/// [os_version_len:4][os_version:N]
/// [kernel_version_len:4][kernel_version:N]
/// [iggy_server_version_len:4][iggy_server_version:N]
-/// [iggy_server_semver:4]?
+/// [iggy_server_semver:4]
/// [cache_metrics_count:4]
/// For each:
[stream_id:4][topic_id:4][partition_id:4][hits:8][misses:8][hit_ratio:f32]
-/// [threads_count:4]?
-/// [free_disk_space:8]?
-/// [total_disk_space:8]?
+/// [threads_count:4]
+/// [free_disk_space:8]
+/// [total_disk_space:8]
/// ```
#[derive(Debug, Clone, PartialEq)]
pub struct StatsResponse {
@@ -106,20 +106,18 @@ fn decode_len_prefixed_str(buf: &[u8], pos: usize) ->
Result<(String, usize), Wi
impl WireEncode for StatsResponse {
fn encoded_size(&self) -> usize {
- let tail = if self.iggy_server_semver.is_some() {
- 4 + 4 + 8 + 8 // semver + threads_count + free_disk_space +
total_disk_space
- } else {
- 0
- };
NUMERIC_HEADER_SIZE
+ (4 + self.hostname.len())
+ (4 + self.os_name.len())
+ (4 + self.os_version.len())
+ (4 + self.kernel_version.len())
+ (4 + self.iggy_server_version.len())
+ + 4 // iggy_server_semver (0 sentinel when absent)
+ 4 // cache_metrics_count
+ self.cache_metrics.len() * CacheMetricEntry::SIZE
- + tail
+ + 4 // threads_count
+ + 8 // free_disk_space
+ + 8 // total_disk_space
}
fn encode(&self, buf: &mut BytesMut) {
@@ -148,9 +146,7 @@ impl WireEncode for StatsResponse {
encode_len_prefixed_str(buf, &self.kernel_version);
encode_len_prefixed_str(buf, &self.iggy_server_version);
- if let Some(semver) = self.iggy_server_semver {
- buf.put_u32_le(semver);
- }
+ buf.put_u32_le(self.iggy_server_semver.unwrap_or(0));
#[allow(clippy::cast_possible_truncation)]
buf.put_u32_le(self.cache_metrics.len() as u32);
@@ -163,12 +159,9 @@ impl WireEncode for StatsResponse {
buf.put_f32_le(entry.hit_ratio);
}
- // Tail fields are only present in the new wire format (semver
present).
- if self.iggy_server_semver.is_some() {
- buf.put_u32_le(self.threads_count);
- buf.put_u64_le(self.free_disk_space);
- buf.put_u64_le(self.total_disk_space);
- }
+ buf.put_u32_le(self.threads_count);
+ buf.put_u64_le(self.free_disk_space);
+ buf.put_u64_le(self.total_disk_space);
}
}
@@ -206,17 +199,9 @@ impl WireDecode for StatsResponse {
let (iggy_server_version, next) = decode_len_prefixed_str(buf, pos)?;
pos = next;
- // iggy_server_semver is optional - only present if enough bytes remain
- // before the cache_metrics section. We need at least 8 bytes for
- // semver(4) + cache_count(4), vs 4 bytes for just cache_count(4).
- let remaining = buf.len().saturating_sub(pos);
- let iggy_server_semver = if remaining >= 8 {
- let v = read_u32_le(buf, pos)?;
- pos += 4;
- Some(v)
- } else {
- None
- };
+ let raw_semver = read_u32_le(buf, pos)?;
+ pos += 4;
+ let iggy_server_semver = (raw_semver != 0).then_some(raw_semver);
let cache_count = read_u32_le(buf, pos)? as usize;
pos += 4;
@@ -245,24 +230,14 @@ impl WireDecode for StatsResponse {
});
}
- // Optional tail fields for backwards compatibility with older servers
- let mut threads_count = 0u32;
- if pos + 4 <= buf.len() {
- threads_count = read_u32_le(buf, pos)?;
- pos += 4;
- }
+ let threads_count = read_u32_le(buf, pos)?;
+ pos += 4;
- let mut free_disk_space = 0u64;
- if pos + 8 <= buf.len() {
- free_disk_space = read_u64_le(buf, pos)?;
- pos += 8;
- }
+ let free_disk_space = read_u64_le(buf, pos)?;
+ pos += 8;
- let mut total_disk_space = 0u64;
- if pos + 8 <= buf.len() {
- total_disk_space = read_u64_le(buf, pos)?;
- pos += 8;
- }
+ let total_disk_space = read_u64_le(buf, pos)?;
+ pos += 8;
Ok((
Self {
@@ -378,16 +353,29 @@ mod tests {
let mut stats = sample_stats();
stats.iggy_server_semver = None;
stats.cache_metrics = vec![];
- // Tail fields are not encoded when semver is absent (old wire format).
- stats.threads_count = 0;
- stats.free_disk_space = 0;
- stats.total_disk_space = 0;
+ stats.threads_count = 16;
+ stats.free_disk_space = 107_374_182_400;
+ stats.total_disk_space = 512_110_190_592;
let bytes = stats.to_bytes();
let (decoded, consumed) = StatsResponse::decode(&bytes).unwrap();
assert_eq!(consumed, bytes.len());
assert_eq!(decoded, stats);
}
+ #[test]
+ fn encoded_size_same_with_or_without_semver() {
+ let mut with_semver = sample_stats();
+ let mut without_semver = sample_stats();
+ without_semver.iggy_server_semver = None;
+ assert_eq!(with_semver.encoded_size(), without_semver.encoded_size());
+
+ with_semver.iggy_server_semver = None;
+ let bytes = with_semver.to_bytes();
+ let (decoded, consumed) = StatsResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded.iggy_server_semver, None);
+ }
+
#[test]
fn roundtrip_empty_strings() {
let stats = StatsResponse {