This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new c19a600a8 feat(rust): add sans-IO frame codec and command dispatch
table (#2967)
c19a600a8 is described below
commit c19a600a85ef027d5d1c9c770f377486d89ba948
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 23 12:19:02 2026 +0100
feat(rust): add sans-IO frame codec and command dispatch table (#2967)
---
core/binary_protocol/src/codec.rs | 58 +++
core/binary_protocol/src/codes.rs | 54 +--
core/binary_protocol/src/consensus/operation.rs | 89 ++--
core/binary_protocol/src/dispatch.rs | 450 +++++++++++++++++++++
core/binary_protocol/src/error.rs | 4 +-
core/binary_protocol/src/framing.rs | 289 +++++++++++++
core/binary_protocol/src/lib.rs | 28 +-
core/binary_protocol/src/message_view.rs | 11 +-
core/binary_protocol/src/primitives/identifier.rs | 19 +-
.../binary_protocol/src/primitives/partitioning.rs | 25 +-
.../src/requests/users/create_user.rs | 5 +-
.../src/requests/users/update_permissions.rs | 5 +-
.../src/responses/clients/client_response.rs | 2 +-
.../src/responses/clients/get_client.rs | 7 +-
.../consumer_groups/get_consumer_group.rs | 15 +-
.../src/responses/streams/get_stream.rs | 5 +-
.../src/responses/system/get_cluster_metadata.rs | 244 +++++++++++
.../src/{frame.rs => responses/system/get_me.rs} | 20 +-
.../src/responses/system/get_snapshot.rs | 71 ++++
.../src/responses/system/get_stats.rs | 7 +-
core/binary_protocol/src/responses/system/mod.rs | 6 +
.../src/responses/topics/get_topic.rs | 5 +-
22 files changed, 1261 insertions(+), 158 deletions(-)
diff --git a/core/binary_protocol/src/codec.rs
b/core/binary_protocol/src/codec.rs
index 1ef155847..801ba787c 100644
--- a/core/binary_protocol/src/codec.rs
+++ b/core/binary_protocol/src/codec.rs
@@ -75,6 +75,32 @@ pub fn read_u8(buf: &[u8], offset: usize) -> Result<u8,
WireError> {
})
}
+/// Helper to read a `u16` LE from `buf` at `offset`.
+///
+/// # Errors
+/// Returns `WireError::UnexpectedEof` if fewer than 2 bytes remain.
+#[allow(clippy::missing_panics_doc)]
+#[inline]
+pub fn read_u16_le(buf: &[u8], offset: usize) -> Result<u16, WireError> {
+ let end = offset
+ .checked_add(2)
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: 2,
+ have: buf.len().saturating_sub(offset),
+ })?;
+ let slice = buf
+ .get(offset..end)
+ .ok_or_else(|| WireError::UnexpectedEof {
+ offset,
+ need: 2,
+ have: buf.len().saturating_sub(offset),
+ })?;
+ Ok(u16::from_le_bytes(
+ slice.try_into().expect("slice is exactly 2 bytes"),
+ ))
+}
+
/// Helper to read a `u32` LE from `buf` at `offset`.
///
/// # Errors
@@ -224,3 +250,35 @@ pub fn read_bytes(buf: &[u8], offset: usize, len: usize)
-> Result<&[u8], WireEr
have: buf.len().saturating_sub(offset),
})
}
+
+/// Cap a pre-allocation hint so a bogus wire count cannot cause OOM.
+/// The actual count is validated by the decode loop - this only limits
+/// the upfront allocation.
+#[inline]
+#[must_use]
+pub fn capped_capacity(count: usize, remaining: usize, min_item_size: usize)
-> usize {
+ if min_item_size == 0 {
+ return count;
+ }
+ count.min(remaining / min_item_size)
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn capped_capacity_limits_allocation() {
+ assert_eq!(capped_capacity(1_000_000, 100, 10), 10);
+ assert_eq!(capped_capacity(5, 100, 10), 5);
+ assert_eq!(capped_capacity(10, 100, 10), 10);
+ assert_eq!(capped_capacity(11, 100, 10), 10);
+ assert_eq!(capped_capacity(0, 100, 10), 0);
+ assert_eq!(capped_capacity(100, 0, 10), 0);
+ }
+
+ #[test]
+ fn capped_capacity_zero_item_size_returns_count() {
+ assert_eq!(capped_capacity(1_000_000, 100, 0), 1_000_000);
+ }
+}
diff --git a/core/binary_protocol/src/codes.rs
b/core/binary_protocol/src/codes.rs
index f20209172..a088b71fb 100644
--- a/core/binary_protocol/src/codes.rs
+++ b/core/binary_protocol/src/codes.rs
@@ -87,58 +87,14 @@ pub const DELETE_CONSUMER_GROUP_CODE: u32 = 603;
pub const JOIN_CONSUMER_GROUP_CODE: u32 = 604;
pub const LEAVE_CONSUMER_GROUP_CODE: u32 = 605;
+/// Lookup the human-readable name for a command code.
+///
/// # Errors
/// Returns `WireError::UnknownCommand` if the code is not recognized.
pub const fn command_name(code: u32) -> Result<&'static str, WireError> {
- match code {
- PING_CODE => Ok("ping"),
- GET_STATS_CODE => Ok("stats"),
- GET_SNAPSHOT_FILE_CODE => Ok("snapshot"),
- GET_CLUSTER_METADATA_CODE => Ok("cluster.metadata"),
- GET_ME_CODE => Ok("me"),
- GET_CLIENT_CODE => Ok("client.get"),
- GET_CLIENTS_CODE => Ok("client.list"),
- GET_USER_CODE => Ok("user.get"),
- GET_USERS_CODE => Ok("user.list"),
- CREATE_USER_CODE => Ok("user.create"),
- DELETE_USER_CODE => Ok("user.delete"),
- UPDATE_USER_CODE => Ok("user.update"),
- UPDATE_PERMISSIONS_CODE => Ok("user.permissions"),
- CHANGE_PASSWORD_CODE => Ok("user.password"),
- LOGIN_USER_CODE => Ok("user.login"),
- LOGOUT_USER_CODE => Ok("user.logout"),
- GET_PERSONAL_ACCESS_TOKENS_CODE => Ok("personal_access_token.list"),
- CREATE_PERSONAL_ACCESS_TOKEN_CODE =>
Ok("personal_access_token.create"),
- DELETE_PERSONAL_ACCESS_TOKEN_CODE =>
Ok("personal_access_token.delete"),
- LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE =>
Ok("personal_access_token.login"),
- POLL_MESSAGES_CODE => Ok("message.poll"),
- SEND_MESSAGES_CODE => Ok("message.send"),
- FLUSH_UNSAVED_BUFFER_CODE => Ok("message.flush_unsaved_buffer"),
- GET_CONSUMER_OFFSET_CODE => Ok("consumer_offset.get"),
- STORE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.store"),
- DELETE_CONSUMER_OFFSET_CODE => Ok("consumer_offset.delete"),
- GET_STREAM_CODE => Ok("stream.get"),
- GET_STREAMS_CODE => Ok("stream.list"),
- CREATE_STREAM_CODE => Ok("stream.create"),
- DELETE_STREAM_CODE => Ok("stream.delete"),
- UPDATE_STREAM_CODE => Ok("stream.update"),
- PURGE_STREAM_CODE => Ok("stream.purge"),
- GET_TOPIC_CODE => Ok("topic.get"),
- GET_TOPICS_CODE => Ok("topic.list"),
- CREATE_TOPIC_CODE => Ok("topic.create"),
- DELETE_TOPIC_CODE => Ok("topic.delete"),
- UPDATE_TOPIC_CODE => Ok("topic.update"),
- PURGE_TOPIC_CODE => Ok("topic.purge"),
- CREATE_PARTITIONS_CODE => Ok("partition.create"),
- DELETE_PARTITIONS_CODE => Ok("partition.delete"),
- DELETE_SEGMENTS_CODE => Ok("segment.delete"),
- GET_CONSUMER_GROUP_CODE => Ok("consumer_group.get"),
- GET_CONSUMER_GROUPS_CODE => Ok("consumer_group.list"),
- CREATE_CONSUMER_GROUP_CODE => Ok("consumer_group.create"),
- DELETE_CONSUMER_GROUP_CODE => Ok("consumer_group.delete"),
- JOIN_CONSUMER_GROUP_CODE => Ok("consumer_group.join"),
- LEAVE_CONSUMER_GROUP_CODE => Ok("consumer_group.leave"),
- _ => Err(WireError::UnknownCommand(code)),
+ match crate::dispatch::lookup_command(code) {
+ Some(meta) => Ok(meta.name),
+ None => Err(WireError::UnknownCommand(code)),
}
}
diff --git a/core/binary_protocol/src/consensus/operation.rs
b/core/binary_protocol/src/consensus/operation.rs
index 8bf2b5016..c17733deb 100644
--- a/core/binary_protocol/src/consensus/operation.rs
+++ b/core/binary_protocol/src/consensus/operation.rs
@@ -95,64 +95,48 @@ impl Operation {
}
/// Bidirectional mapping: `Operation` -> client command code.
+ ///
+ /// Delegates to the dispatch table as the single source of truth.
#[must_use]
pub const fn to_command_code(&self) -> Option<u32> {
- use crate::codes;
match self {
Self::Reserved => None,
- Self::CreateStream => Some(codes::CREATE_STREAM_CODE),
- Self::UpdateStream => Some(codes::UPDATE_STREAM_CODE),
- Self::DeleteStream => Some(codes::DELETE_STREAM_CODE),
- Self::PurgeStream => Some(codes::PURGE_STREAM_CODE),
- Self::CreateTopic => Some(codes::CREATE_TOPIC_CODE),
- Self::UpdateTopic => Some(codes::UPDATE_TOPIC_CODE),
- Self::DeleteTopic => Some(codes::DELETE_TOPIC_CODE),
- Self::PurgeTopic => Some(codes::PURGE_TOPIC_CODE),
- Self::CreatePartitions => Some(codes::CREATE_PARTITIONS_CODE),
- Self::DeletePartitions => Some(codes::DELETE_PARTITIONS_CODE),
- Self::DeleteSegments => Some(codes::DELETE_SEGMENTS_CODE),
- Self::CreateConsumerGroup =>
Some(codes::CREATE_CONSUMER_GROUP_CODE),
- Self::DeleteConsumerGroup =>
Some(codes::DELETE_CONSUMER_GROUP_CODE),
- Self::CreateUser => Some(codes::CREATE_USER_CODE),
- Self::UpdateUser => Some(codes::UPDATE_USER_CODE),
- Self::DeleteUser => Some(codes::DELETE_USER_CODE),
- Self::ChangePassword => Some(codes::CHANGE_PASSWORD_CODE),
- Self::UpdatePermissions => Some(codes::UPDATE_PERMISSIONS_CODE),
- Self::CreatePersonalAccessToken =>
Some(codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE),
- Self::DeletePersonalAccessToken =>
Some(codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE),
- Self::SendMessages => Some(codes::SEND_MESSAGES_CODE),
- Self::StoreConsumerOffset =>
Some(codes::STORE_CONSUMER_OFFSET_CODE),
+ Self::CreateStream
+ | Self::UpdateStream
+ | Self::DeleteStream
+ | Self::PurgeStream
+ | Self::CreateTopic
+ | Self::UpdateTopic
+ | Self::DeleteTopic
+ | Self::PurgeTopic
+ | Self::CreatePartitions
+ | Self::DeletePartitions
+ | Self::DeleteSegments
+ | Self::CreateConsumerGroup
+ | Self::DeleteConsumerGroup
+ | Self::CreateUser
+ | Self::UpdateUser
+ | Self::DeleteUser
+ | Self::ChangePassword
+ | Self::UpdatePermissions
+ | Self::CreatePersonalAccessToken
+ | Self::DeletePersonalAccessToken
+ | Self::SendMessages
+ | Self::StoreConsumerOffset => match
crate::dispatch::lookup_by_operation(*self) {
+ Some(meta) => Some(meta.code),
+ None => None,
+ },
}
}
/// Bidirectional mapping: client command code -> `Operation`.
+ ///
+ /// Delegates to the dispatch table as the single source of truth.
#[must_use]
pub const fn from_command_code(code: u32) -> Option<Self> {
- use crate::codes;
- match code {
- codes::CREATE_STREAM_CODE => Some(Self::CreateStream),
- codes::UPDATE_STREAM_CODE => Some(Self::UpdateStream),
- codes::DELETE_STREAM_CODE => Some(Self::DeleteStream),
- codes::PURGE_STREAM_CODE => Some(Self::PurgeStream),
- codes::CREATE_TOPIC_CODE => Some(Self::CreateTopic),
- codes::UPDATE_TOPIC_CODE => Some(Self::UpdateTopic),
- codes::DELETE_TOPIC_CODE => Some(Self::DeleteTopic),
- codes::PURGE_TOPIC_CODE => Some(Self::PurgeTopic),
- codes::CREATE_PARTITIONS_CODE => Some(Self::CreatePartitions),
- codes::DELETE_PARTITIONS_CODE => Some(Self::DeletePartitions),
- codes::DELETE_SEGMENTS_CODE => Some(Self::DeleteSegments),
- codes::CREATE_CONSUMER_GROUP_CODE =>
Some(Self::CreateConsumerGroup),
- codes::DELETE_CONSUMER_GROUP_CODE =>
Some(Self::DeleteConsumerGroup),
- codes::CREATE_USER_CODE => Some(Self::CreateUser),
- codes::UPDATE_USER_CODE => Some(Self::UpdateUser),
- codes::DELETE_USER_CODE => Some(Self::DeleteUser),
- codes::CHANGE_PASSWORD_CODE => Some(Self::ChangePassword),
- codes::UPDATE_PERMISSIONS_CODE => Some(Self::UpdatePermissions),
- codes::CREATE_PERSONAL_ACCESS_TOKEN_CODE =>
Some(Self::CreatePersonalAccessToken),
- codes::DELETE_PERSONAL_ACCESS_TOKEN_CODE =>
Some(Self::DeletePersonalAccessToken),
- codes::SEND_MESSAGES_CODE => Some(Self::SendMessages),
- codes::STORE_CONSUMER_OFFSET_CODE =>
Some(Self::StoreConsumerOffset),
- _ => None,
+ match crate::dispatch::lookup_command(code) {
+ Some(meta) => meta.operation,
+ None => None,
}
}
}
@@ -204,10 +188,11 @@ mod tests {
#[test]
fn read_only_commands_have_no_operation() {
- assert!(Operation::from_command_code(crate::PING_CODE).is_none());
- assert!(Operation::from_command_code(crate::GET_STATS_CODE).is_none());
-
assert!(Operation::from_command_code(crate::GET_STREAM_CODE).is_none());
-
assert!(Operation::from_command_code(crate::POLL_MESSAGES_CODE).is_none());
+ use crate::codes::{GET_STATS_CODE, GET_STREAM_CODE, PING_CODE,
POLL_MESSAGES_CODE};
+ assert!(Operation::from_command_code(PING_CODE).is_none());
+ assert!(Operation::from_command_code(GET_STATS_CODE).is_none());
+ assert!(Operation::from_command_code(GET_STREAM_CODE).is_none());
+ assert!(Operation::from_command_code(POLL_MESSAGES_CODE).is_none());
}
#[test]
diff --git a/core/binary_protocol/src/dispatch.rs
b/core/binary_protocol/src/dispatch.rs
new file mode 100644
index 000000000..fcd1dcb69
--- /dev/null
+++ b/core/binary_protocol/src/dispatch.rs
@@ -0,0 +1,450 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Command dispatch table mapping codes and operations to metadata.
+//!
+//! This is the protocol's identity registry. Every command has an entry
+//! with its numeric code, human-readable name, and optional VSR operation.
+//!
+//! Two lookup paths:
+//! - `lookup_command(code)`: current framing reads `[length][code][payload]`,
looks up by code
+//! - `lookup_by_operation(op)`: future VSR framing reads 256-byte header,
looks up by operation
+
+#[allow(clippy::wildcard_imports)]
+use crate::codes::*;
+use crate::consensus::Operation;
+
+/// Metadata for a single protocol command.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub struct CommandMeta {
+ pub code: u32,
+ pub name: &'static str,
+ /// VSR operation for replicated commands. `None` for non-replicated
+ /// commands that bypass consensus.
+ pub operation: Option<Operation>,
+}
+
+impl CommandMeta {
+ /// Returns `true` if this command is replicated through VSR consensus.
+ #[must_use]
+ pub const fn is_replicated(&self) -> bool {
+ self.operation.is_some()
+ }
+
+ const fn new(code: u32, name: &'static str, operation: Option<Operation>)
-> Self {
+ Self {
+ code,
+ name,
+ operation,
+ }
+ }
+
+ const fn non_replicated(code: u32, name: &'static str) -> Self {
+ Self::new(code, name, None)
+ }
+
+ const fn replicated(code: u32, name: &'static str, op: Operation) -> Self {
+ Self::new(code, name, Some(op))
+ }
+}
+
+/// All known command metadata entries.
+pub const COMMAND_TABLE: &[CommandMeta] = &[
+ // System
+ CommandMeta::non_replicated(PING_CODE, "ping"),
+ CommandMeta::non_replicated(GET_STATS_CODE, "stats"),
+ CommandMeta::non_replicated(GET_SNAPSHOT_FILE_CODE, "snapshot"),
+ CommandMeta::non_replicated(GET_CLUSTER_METADATA_CODE, "cluster.metadata"),
+ CommandMeta::non_replicated(GET_ME_CODE, "me"),
+ CommandMeta::non_replicated(GET_CLIENT_CODE, "client.get"),
+ CommandMeta::non_replicated(GET_CLIENTS_CODE, "client.list"),
+ // Users
+ CommandMeta::non_replicated(GET_USER_CODE, "user.get"),
+ CommandMeta::non_replicated(GET_USERS_CODE, "user.list"),
+ CommandMeta::replicated(CREATE_USER_CODE, "user.create",
Operation::CreateUser),
+ CommandMeta::replicated(DELETE_USER_CODE, "user.delete",
Operation::DeleteUser),
+ CommandMeta::replicated(UPDATE_USER_CODE, "user.update",
Operation::UpdateUser),
+ CommandMeta::replicated(
+ UPDATE_PERMISSIONS_CODE,
+ "user.permissions",
+ Operation::UpdatePermissions,
+ ),
+ CommandMeta::replicated(
+ CHANGE_PASSWORD_CODE,
+ "user.password",
+ Operation::ChangePassword,
+ ),
+ CommandMeta::non_replicated(LOGIN_USER_CODE, "user.login"),
+ CommandMeta::non_replicated(LOGOUT_USER_CODE, "user.logout"),
+ // Personal Access Tokens
+ CommandMeta::non_replicated(
+ GET_PERSONAL_ACCESS_TOKENS_CODE,
+ "personal_access_token.list",
+ ),
+ CommandMeta::replicated(
+ CREATE_PERSONAL_ACCESS_TOKEN_CODE,
+ "personal_access_token.create",
+ Operation::CreatePersonalAccessToken,
+ ),
+ CommandMeta::replicated(
+ DELETE_PERSONAL_ACCESS_TOKEN_CODE,
+ "personal_access_token.delete",
+ Operation::DeletePersonalAccessToken,
+ ),
+ CommandMeta::non_replicated(
+ LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
+ "personal_access_token.login",
+ ),
+ // Messages
+ CommandMeta::non_replicated(POLL_MESSAGES_CODE, "message.poll"),
+ CommandMeta::replicated(SEND_MESSAGES_CODE, "message.send",
Operation::SendMessages),
+ CommandMeta::non_replicated(FLUSH_UNSAVED_BUFFER_CODE,
"message.flush_unsaved_buffer"),
+ // Consumer Offsets
+ CommandMeta::non_replicated(GET_CONSUMER_OFFSET_CODE,
"consumer_offset.get"),
+ CommandMeta::replicated(
+ STORE_CONSUMER_OFFSET_CODE,
+ "consumer_offset.store",
+ Operation::StoreConsumerOffset,
+ ),
+ CommandMeta::non_replicated(DELETE_CONSUMER_OFFSET_CODE,
"consumer_offset.delete"),
+ // Streams
+ CommandMeta::non_replicated(GET_STREAM_CODE, "stream.get"),
+ CommandMeta::non_replicated(GET_STREAMS_CODE, "stream.list"),
+ CommandMeta::replicated(CREATE_STREAM_CODE, "stream.create",
Operation::CreateStream),
+ CommandMeta::replicated(DELETE_STREAM_CODE, "stream.delete",
Operation::DeleteStream),
+ CommandMeta::replicated(UPDATE_STREAM_CODE, "stream.update",
Operation::UpdateStream),
+ CommandMeta::replicated(PURGE_STREAM_CODE, "stream.purge",
Operation::PurgeStream),
+ // Topics
+ CommandMeta::non_replicated(GET_TOPIC_CODE, "topic.get"),
+ CommandMeta::non_replicated(GET_TOPICS_CODE, "topic.list"),
+ CommandMeta::replicated(CREATE_TOPIC_CODE, "topic.create",
Operation::CreateTopic),
+ CommandMeta::replicated(DELETE_TOPIC_CODE, "topic.delete",
Operation::DeleteTopic),
+ CommandMeta::replicated(UPDATE_TOPIC_CODE, "topic.update",
Operation::UpdateTopic),
+ CommandMeta::replicated(PURGE_TOPIC_CODE, "topic.purge",
Operation::PurgeTopic),
+ // Partitions
+ CommandMeta::replicated(
+ CREATE_PARTITIONS_CODE,
+ "partition.create",
+ Operation::CreatePartitions,
+ ),
+ CommandMeta::replicated(
+ DELETE_PARTITIONS_CODE,
+ "partition.delete",
+ Operation::DeletePartitions,
+ ),
+ // Segments
+ CommandMeta::replicated(
+ DELETE_SEGMENTS_CODE,
+ "segment.delete",
+ Operation::DeleteSegments,
+ ),
+ // Consumer Groups
+ CommandMeta::non_replicated(GET_CONSUMER_GROUP_CODE, "consumer_group.get"),
+ CommandMeta::non_replicated(GET_CONSUMER_GROUPS_CODE,
"consumer_group.list"),
+ CommandMeta::replicated(
+ CREATE_CONSUMER_GROUP_CODE,
+ "consumer_group.create",
+ Operation::CreateConsumerGroup,
+ ),
+ CommandMeta::replicated(
+ DELETE_CONSUMER_GROUP_CODE,
+ "consumer_group.delete",
+ Operation::DeleteConsumerGroup,
+ ),
+ CommandMeta::non_replicated(JOIN_CONSUMER_GROUP_CODE,
"consumer_group.join"),
+ CommandMeta::non_replicated(LEAVE_CONSUMER_GROUP_CODE,
"consumer_group.leave"),
+];
+
+/// Lookup command metadata by command code.
+///
+/// Uses a `match` (compiled to a jump table / binary search) for O(1) lookup
+/// instead of linear scan. The match maps code -> table index, keeping
+/// `COMMAND_TABLE` as the single source of truth for all metadata.
+#[must_use]
+pub const fn lookup_command(code: u32) -> Option<&'static CommandMeta> {
+ // Indices must match the order of entries in COMMAND_TABLE above.
+ let idx = match code {
+ PING_CODE => 0,
+ GET_STATS_CODE => 1,
+ GET_SNAPSHOT_FILE_CODE => 2,
+ GET_CLUSTER_METADATA_CODE => 3,
+ GET_ME_CODE => 4,
+ GET_CLIENT_CODE => 5,
+ GET_CLIENTS_CODE => 6,
+ GET_USER_CODE => 7,
+ GET_USERS_CODE => 8,
+ CREATE_USER_CODE => 9,
+ DELETE_USER_CODE => 10,
+ UPDATE_USER_CODE => 11,
+ UPDATE_PERMISSIONS_CODE => 12,
+ CHANGE_PASSWORD_CODE => 13,
+ LOGIN_USER_CODE => 14,
+ LOGOUT_USER_CODE => 15,
+ GET_PERSONAL_ACCESS_TOKENS_CODE => 16,
+ CREATE_PERSONAL_ACCESS_TOKEN_CODE => 17,
+ DELETE_PERSONAL_ACCESS_TOKEN_CODE => 18,
+ LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE => 19,
+ POLL_MESSAGES_CODE => 20,
+ SEND_MESSAGES_CODE => 21,
+ FLUSH_UNSAVED_BUFFER_CODE => 22,
+ GET_CONSUMER_OFFSET_CODE => 23,
+ STORE_CONSUMER_OFFSET_CODE => 24,
+ DELETE_CONSUMER_OFFSET_CODE => 25,
+ GET_STREAM_CODE => 26,
+ GET_STREAMS_CODE => 27,
+ CREATE_STREAM_CODE => 28,
+ DELETE_STREAM_CODE => 29,
+ UPDATE_STREAM_CODE => 30,
+ PURGE_STREAM_CODE => 31,
+ GET_TOPIC_CODE => 32,
+ GET_TOPICS_CODE => 33,
+ CREATE_TOPIC_CODE => 34,
+ DELETE_TOPIC_CODE => 35,
+ UPDATE_TOPIC_CODE => 36,
+ PURGE_TOPIC_CODE => 37,
+ CREATE_PARTITIONS_CODE => 38,
+ DELETE_PARTITIONS_CODE => 39,
+ DELETE_SEGMENTS_CODE => 40,
+ GET_CONSUMER_GROUP_CODE => 41,
+ GET_CONSUMER_GROUPS_CODE => 42,
+ CREATE_CONSUMER_GROUP_CODE => 43,
+ DELETE_CONSUMER_GROUP_CODE => 44,
+ JOIN_CONSUMER_GROUP_CODE => 45,
+ LEAVE_CONSUMER_GROUP_CODE => 46,
+ _ => return None,
+ };
+ Some(&COMMAND_TABLE[idx])
+}
+
+/// Lookup command metadata by VSR operation.
+///
+/// Returns `None` for `Operation::Reserved` and for non-replicated commands
+/// that have no operation mapping.
+#[must_use]
+pub const fn lookup_by_operation(op: Operation) -> Option<&'static
CommandMeta> {
+ // Indices must match the order of entries in COMMAND_TABLE above.
+ let idx = match op {
+ Operation::CreateStream => 28,
+ Operation::UpdateStream => 30,
+ Operation::DeleteStream => 29,
+ Operation::PurgeStream => 31,
+ Operation::CreateTopic => 34,
+ Operation::UpdateTopic => 36,
+ Operation::DeleteTopic => 35,
+ Operation::PurgeTopic => 37,
+ Operation::CreatePartitions => 38,
+ Operation::DeletePartitions => 39,
+ Operation::DeleteSegments => 40,
+ Operation::CreateConsumerGroup => 43,
+ Operation::DeleteConsumerGroup => 44,
+ Operation::CreateUser => 9,
+ Operation::UpdateUser => 11,
+ Operation::DeleteUser => 10,
+ Operation::ChangePassword => 13,
+ Operation::UpdatePermissions => 12,
+ Operation::CreatePersonalAccessToken => 17,
+ Operation::DeletePersonalAccessToken => 18,
+ Operation::SendMessages => 21,
+ Operation::StoreConsumerOffset => 24,
+ Operation::Reserved => return None,
+ };
+ Some(&COMMAND_TABLE[idx])
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn every_code_has_entry() {
+ let all_codes = [
+ PING_CODE,
+ GET_STATS_CODE,
+ GET_SNAPSHOT_FILE_CODE,
+ GET_CLUSTER_METADATA_CODE,
+ GET_ME_CODE,
+ GET_CLIENT_CODE,
+ GET_CLIENTS_CODE,
+ GET_USER_CODE,
+ GET_USERS_CODE,
+ CREATE_USER_CODE,
+ DELETE_USER_CODE,
+ UPDATE_USER_CODE,
+ UPDATE_PERMISSIONS_CODE,
+ CHANGE_PASSWORD_CODE,
+ LOGIN_USER_CODE,
+ LOGOUT_USER_CODE,
+ GET_PERSONAL_ACCESS_TOKENS_CODE,
+ CREATE_PERSONAL_ACCESS_TOKEN_CODE,
+ DELETE_PERSONAL_ACCESS_TOKEN_CODE,
+ LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
+ POLL_MESSAGES_CODE,
+ SEND_MESSAGES_CODE,
+ FLUSH_UNSAVED_BUFFER_CODE,
+ GET_CONSUMER_OFFSET_CODE,
+ STORE_CONSUMER_OFFSET_CODE,
+ DELETE_CONSUMER_OFFSET_CODE,
+ GET_STREAM_CODE,
+ GET_STREAMS_CODE,
+ CREATE_STREAM_CODE,
+ DELETE_STREAM_CODE,
+ UPDATE_STREAM_CODE,
+ PURGE_STREAM_CODE,
+ GET_TOPIC_CODE,
+ GET_TOPICS_CODE,
+ CREATE_TOPIC_CODE,
+ DELETE_TOPIC_CODE,
+ UPDATE_TOPIC_CODE,
+ PURGE_TOPIC_CODE,
+ CREATE_PARTITIONS_CODE,
+ DELETE_PARTITIONS_CODE,
+ DELETE_SEGMENTS_CODE,
+ GET_CONSUMER_GROUP_CODE,
+ GET_CONSUMER_GROUPS_CODE,
+ CREATE_CONSUMER_GROUP_CODE,
+ DELETE_CONSUMER_GROUP_CODE,
+ JOIN_CONSUMER_GROUP_CODE,
+ LEAVE_CONSUMER_GROUP_CODE,
+ ];
+ for code in all_codes {
+ assert!(
+ lookup_command(code).is_some(),
+ "missing dispatch entry for code {code}"
+ );
+ }
+ }
+
+ #[test]
+ fn no_duplicate_codes_in_table() {
+ let mut seen = std::collections::HashSet::new();
+ for entry in COMMAND_TABLE {
+ assert!(
+ seen.insert(entry.code),
+ "duplicate code {} ({}) in COMMAND_TABLE",
+ entry.code,
+ entry.name
+ );
+ }
+ }
+
+ #[test]
+ fn unknown_code_returns_none() {
+ assert!(lookup_command(9999).is_none());
+ }
+
+ #[test]
+ fn names_are_non_empty() {
+ for entry in COMMAND_TABLE {
+ assert!(!entry.name.is_empty(), "empty name for code {}",
entry.code);
+ }
+ }
+
+ #[test]
+ fn lookup_by_operation_roundtrips_with_lookup_command() {
+ let replicated_ops = [
+ Operation::CreateStream,
+ Operation::UpdateStream,
+ Operation::DeleteStream,
+ Operation::PurgeStream,
+ Operation::CreateTopic,
+ Operation::UpdateTopic,
+ Operation::DeleteTopic,
+ Operation::PurgeTopic,
+ Operation::CreatePartitions,
+ Operation::DeletePartitions,
+ Operation::DeleteSegments,
+ Operation::CreateConsumerGroup,
+ Operation::DeleteConsumerGroup,
+ Operation::CreateUser,
+ Operation::UpdateUser,
+ Operation::DeleteUser,
+ Operation::ChangePassword,
+ Operation::UpdatePermissions,
+ Operation::CreatePersonalAccessToken,
+ Operation::DeletePersonalAccessToken,
+ Operation::SendMessages,
+ Operation::StoreConsumerOffset,
+ ];
+ for op in replicated_ops {
+ let meta = lookup_by_operation(op)
+ .unwrap_or_else(|| panic!("no dispatch entry for operation
{op:?}"));
+
+ let by_code = lookup_command(meta.code)
+ .unwrap_or_else(|| panic!("no dispatch entry for code {}",
meta.code));
+
+ assert_eq!(
+ meta.code, by_code.code,
+ "lookup_by_operation and lookup disagree for {op:?}"
+ );
+ }
+ }
+
+ #[test]
+ fn reserved_operation_returns_none() {
+ assert!(lookup_by_operation(Operation::Reserved).is_none());
+ }
+
+ #[test]
+ fn no_duplicate_operations_in_table() {
+ let mut seen = std::collections::HashSet::new();
+ for entry in COMMAND_TABLE {
+ if let Some(op) = entry.operation {
+ assert!(
+ seen.insert(op as u8),
+ "duplicate operation {:?} ({}) in COMMAND_TABLE",
+ op,
+ entry.name
+ );
+ }
+ }
+ }
+
+ /// Verify that the match indices in `lookup_command` point to entries
+ /// whose `.code` field actually matches the looked-up code.
+ /// Catches table reordering that would silently break the match.
+ #[test]
+ fn lookup_command_indices_match_table_codes() {
+ for (i, entry) in COMMAND_TABLE.iter().enumerate() {
+ let looked_up = lookup_command(entry.code)
+ .unwrap_or_else(|| panic!("lookup_command({}) returned None",
entry.code));
+ assert_eq!(
+ looked_up.code, entry.code,
+ "lookup_command({}) returned entry at wrong index \
+ (expected table[{i}].code={}, got code={})",
+ entry.code, entry.code, looked_up.code
+ );
+ }
+ }
+
+ /// Verify that `lookup_by_operation` indices point to entries whose
+ /// `.operation` field matches the looked-up operation.
+ #[test]
+ fn lookup_by_operation_indices_match_table_ops() {
+ for entry in COMMAND_TABLE {
+ if let Some(op) = entry.operation {
+ let looked_up = lookup_by_operation(op)
+ .unwrap_or_else(|| panic!("lookup_by_operation({op:?})
returned None"));
+ assert_eq!(
+ looked_up.operation,
+ Some(op),
+ "lookup_by_operation({op:?}) returned entry with wrong
operation: {:?}",
+ looked_up.operation
+ );
+ }
+ }
+ }
+}
diff --git a/core/binary_protocol/src/error.rs
b/core/binary_protocol/src/error.rs
index f4f53ea56..82d5cb2b5 100644
--- a/core/binary_protocol/src/error.rs
+++ b/core/binary_protocol/src/error.rs
@@ -15,6 +15,8 @@
// specific language governing permissions and limitations
// under the License.
+use std::borrow::Cow;
+
/// Protocol-local error type for wire format encode/decode failures.
///
/// Intentionally decoupled from `IggyError` to keep the protocol crate
@@ -46,5 +48,5 @@ pub enum WireError {
PayloadTooLarge { size: usize, max: usize },
#[error("validation failed: {0}")]
- Validation(String),
+ Validation(Cow<'static, str>),
}
diff --git a/core/binary_protocol/src/framing.rs
b/core/binary_protocol/src/framing.rs
new file mode 100644
index 000000000..981ead667
--- /dev/null
+++ b/core/binary_protocol/src/framing.rs
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Sans-IO frame codec for the Iggy binary protocol.
+//!
+//! Encodes and decodes complete request/response frames without any I/O.
+//! The transport layer (TCP, QUIC, WebSocket) reads bytes into a buffer,
+//! then hands the buffer to these types for zero-copy parsing.
+//!
+//! When VSR consensus replaces this framing, the transport layer will
+//! switch to `consensus::header::GenericHeader` (256-byte fixed header)
+//! while the command payload codec stays the same.
+
+use crate::codec::{read_bytes, read_u32_le};
+use crate::error::WireError;
+use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
+use std::num::NonZeroU32;
+
+/// Status code for a successful response.
+pub const STATUS_OK: u32 = 0;
+
+/// Decoded request frame. Borrows the payload from the input buffer.
+///
+/// Wire format: `[length:4 LE][code:4 LE][payload:N]`
+/// where `length` = 4 (code size) + N (payload size).
+#[derive(Debug)]
+pub struct RequestFrame<'a> {
+ pub code: u32,
+ pub payload: &'a [u8],
+}
+
+impl<'a> RequestFrame<'a> {
+ /// Size of the frame header: `[length:4][code:4]`.
+ pub const HEADER_SIZE: usize = 8;
+
+ /// Decode a request frame from a complete buffer.
+ ///
+ /// # Errors
+ /// Returns `WireError::UnexpectedEof` if the buffer is too short.
+ pub fn decode(buf: &'a [u8]) -> Result<(Self, usize), WireError> {
+ let length = read_u32_le(buf, 0)? as usize;
+ if length < 4 {
+ return Err(WireError::Validation(Cow::Borrowed(
+ "request frame length must be at least 4 (code size)",
+ )));
+ }
+ let code = read_u32_le(buf, 4)?;
+ let payload_len = length - 4;
+ let payload = read_bytes(buf, Self::HEADER_SIZE, payload_len)?;
+ let total = Self::HEADER_SIZE + payload_len;
+ Ok((Self { code, payload }, total))
+ }
+
+ /// Encode a request frame into `out`.
+ ///
+ /// Writes `[length:4 LE][code:4 LE][payload]` where length includes
+ /// the 4-byte code field.
+ ///
+ /// # Errors
+ /// Returns `WireError::PayloadTooLarge` if payload exceeds u32 capacity.
+ pub fn encode(code: u32, payload: &[u8], out: &mut BytesMut) -> Result<(),
WireError> {
+ let length = payload
+ .len()
+ .checked_add(4)
+ .and_then(|n| u32::try_from(n).ok())
+ .ok_or(WireError::PayloadTooLarge {
+ size: payload.len(),
+ max: u32::MAX as usize - 4,
+ })?;
+ out.reserve(Self::HEADER_SIZE + payload.len());
+ out.put_u32_le(length);
+ out.put_u32_le(code);
+ out.put_slice(payload);
+ Ok(())
+ }
+
+ /// Total encoded size for a given payload length.
+ ///
+ /// Returns `None` if `HEADER_SIZE + payload_len` overflows `usize`.
+ #[must_use]
+ pub const fn encoded_size(payload_len: usize) -> Option<usize> {
+ Self::HEADER_SIZE.checked_add(payload_len)
+ }
+}
+
+/// Decoded response frame. Borrows the payload from the input buffer.
+///
+/// Wire format: `[status:4 LE][length:4 LE][payload:N]`
+/// where `status` = 0 for success, non-zero for error code.
+#[derive(Debug)]
+pub struct ResponseFrame<'a> {
+ pub status: u32,
+ pub payload: &'a [u8],
+}
+
+impl<'a> ResponseFrame<'a> {
+ /// Size of the frame header: `[status:4][length:4]`.
+ pub const HEADER_SIZE: usize = 8;
+
+ /// Decode a response frame from a complete buffer.
+ ///
+ /// # Errors
+ /// Returns `WireError::UnexpectedEof` if the buffer is too short.
+ pub fn decode(buf: &'a [u8]) -> Result<(Self, usize), WireError> {
+ let status = read_u32_le(buf, 0)?;
+ let length = read_u32_le(buf, 4)? as usize;
+ let payload = read_bytes(buf, Self::HEADER_SIZE, length)?;
+ let total = Self::HEADER_SIZE + length;
+ Ok((Self { status, payload }, total))
+ }
+
+ /// Encode a successful response with payload.
+ ///
+ /// # Errors
+ /// Returns `WireError::PayloadTooLarge` if payload exceeds u32 capacity.
+ pub fn encode_ok(payload: &[u8], out: &mut BytesMut) -> Result<(),
WireError> {
+ let length = u32::try_from(payload.len()).map_err(|_|
WireError::PayloadTooLarge {
+ size: payload.len(),
+ max: u32::MAX as usize,
+ })?;
+ out.reserve(Self::HEADER_SIZE + payload.len());
+ out.put_u32_le(STATUS_OK);
+ out.put_u32_le(length);
+ out.put_slice(payload);
+ Ok(())
+ }
+
+ /// Encode an error response (status code, empty payload).
+ pub fn encode_error(status: NonZeroU32, out: &mut BytesMut) {
+ out.reserve(Self::HEADER_SIZE);
+ out.put_u32_le(status.get());
+ out.put_u32_le(0);
+ }
+
+ /// Returns `true` if this is a success response.
+ #[must_use]
+ pub const fn is_ok(&self) -> bool {
+ self.status == STATUS_OK
+ }
+
+ /// Total encoded size for a given payload length.
+ ///
+ /// Returns `None` if `HEADER_SIZE + payload_len` overflows `usize`.
+ #[must_use]
+ pub const fn encoded_size(payload_len: usize) -> Option<usize> {
+ Self::HEADER_SIZE.checked_add(payload_len)
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn request_roundtrip() {
+ let payload = b"hello world";
+ let mut buf =
BytesMut::with_capacity(RequestFrame::encoded_size(payload.len()).unwrap());
+ RequestFrame::encode(42, payload, &mut buf).unwrap();
+
+ let (frame, consumed) = RequestFrame::decode(&buf).unwrap();
+ assert_eq!(consumed, buf.len());
+ assert_eq!(frame.code, 42);
+ assert_eq!(frame.payload, payload);
+ }
+
+ #[test]
+ fn request_empty_payload() {
+ let mut buf = BytesMut::with_capacity(RequestFrame::HEADER_SIZE);
+ RequestFrame::encode(1, &[], &mut buf).unwrap();
+
+ let (frame, consumed) = RequestFrame::decode(&buf).unwrap();
+ assert_eq!(consumed, 8);
+ assert_eq!(frame.code, 1);
+ assert!(frame.payload.is_empty());
+ }
+
+ #[test]
+ fn request_length_field_includes_code() {
+ let payload = b"test";
+ let mut buf = BytesMut::new();
+ RequestFrame::encode(99, payload, &mut buf).unwrap();
+
+ let length = u32::from_le_bytes(buf[0..4].try_into().unwrap());
+ assert_eq!(length, 4 + 4); // code(4) + payload(4)
+ }
+
+ #[test]
+ fn request_truncated_header() {
+ let buf = [0u8; 7]; // less than HEADER_SIZE
+ assert!(RequestFrame::decode(&buf).is_err());
+ }
+
+ #[test]
+ fn request_truncated_payload() {
+ let mut buf = BytesMut::new();
+ buf.put_u32_le(104); // length = 104 (code + 100 bytes payload)
+ buf.put_u32_le(1); // code
+ buf.put_slice(&[0u8; 50]); // only 50 of 100 bytes
+ assert!(RequestFrame::decode(&buf).is_err());
+ }
+
+ #[test]
+ fn request_length_too_small() {
+ let mut buf = BytesMut::new();
+ buf.put_u32_le(3); // length < 4 (must include code)
+ buf.put_u32_le(1);
+ assert!(RequestFrame::decode(&buf).is_err());
+ }
+
+ #[test]
+ fn request_encoded_size() {
+ assert_eq!(RequestFrame::encoded_size(0), Some(8));
+ assert_eq!(RequestFrame::encoded_size(100), Some(108));
+ assert_eq!(RequestFrame::encoded_size(usize::MAX), None);
+ }
+
+ #[test]
+ fn response_ok_roundtrip() {
+ let payload = b"response data";
+ let mut buf =
BytesMut::with_capacity(ResponseFrame::encoded_size(payload.len()).unwrap());
+ ResponseFrame::encode_ok(payload, &mut buf).unwrap();
+
+ let (frame, consumed) = ResponseFrame::decode(&buf).unwrap();
+ assert_eq!(consumed, buf.len());
+ assert!(frame.is_ok());
+ assert_eq!(frame.status, 0);
+ assert_eq!(frame.payload, payload);
+ }
+
+ #[test]
+ fn response_ok_empty_payload() {
+ let mut buf = BytesMut::new();
+ ResponseFrame::encode_ok(&[], &mut buf).unwrap();
+
+ let (frame, consumed) = ResponseFrame::decode(&buf).unwrap();
+ assert_eq!(consumed, 8);
+ assert!(frame.is_ok());
+ assert!(frame.payload.is_empty());
+ }
+
+ #[test]
+ fn response_error_roundtrip() {
+ let mut buf = BytesMut::new();
+ ResponseFrame::encode_error(NonZeroU32::new(1001).unwrap(), &mut buf);
+
+ let (frame, consumed) = ResponseFrame::decode(&buf).unwrap();
+ assert_eq!(consumed, 8);
+ assert!(!frame.is_ok());
+ assert_eq!(frame.status, 1001);
+ assert!(frame.payload.is_empty());
+ }
+
+ #[test]
+ fn response_truncated_header() {
+ let buf = [0u8; 7];
+ assert!(ResponseFrame::decode(&buf).is_err());
+ }
+
+ #[test]
+ fn response_truncated_payload() {
+ let mut buf = BytesMut::new();
+ buf.put_u32_le(0); // status OK
+ buf.put_u32_le(100); // length = 100
+ buf.put_slice(&[0u8; 50]); // only 50 bytes
+ assert!(ResponseFrame::decode(&buf).is_err());
+ }
+
+ #[test]
+ fn response_encoded_size() {
+ assert_eq!(ResponseFrame::encoded_size(0), Some(8));
+ assert_eq!(ResponseFrame::encoded_size(256), Some(264));
+ assert_eq!(ResponseFrame::encoded_size(usize::MAX), None);
+ }
+}
diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs
index 9610abf7e..7ed4fdad0 100644
--- a/core/binary_protocol/src/lib.rs
+++ b/core/binary_protocol/src/lib.rs
@@ -41,12 +41,25 @@
//!
//! All multi-byte integers are little-endian. Strings are length-prefixed
//! (u8 length for names, u32 length for longer strings).
+//!
+//! # VSR consensus framing
+//!
+//! All consensus headers are 256 bytes with `#[repr(C)]` layout.
+//! Deserialization is zero-copy via `bytemuck`. The [`Message`] type
+//! wraps a `Bytes` buffer with typed header access.
+//!
+//! - Client-facing: [`RequestHeader`], [`ReplyHeader`]
+//! - Replication: [`PrepareHeader`], [`PrepareOkHeader`], [`CommitHeader`]
+//! - View change: [`StartViewChangeHeader`], [`DoViewChangeHeader`],
+//! [`StartViewHeader`]
+//! - Dispatch: [`GenericHeader`] for type-erased initial parsing
pub mod codec;
pub mod codes;
pub mod consensus;
+pub mod dispatch;
pub mod error;
-pub mod frame;
+pub mod framing;
pub mod message_layout;
pub mod message_view;
pub mod primitives;
@@ -54,10 +67,17 @@ pub mod requests;
pub mod responses;
pub use codec::{WireDecode, WireEncode};
-pub use codes::*;
+pub use consensus::{
+ Command2, CommitHeader, ConsensusError, ConsensusHeader,
DoViewChangeHeader, GenericHeader,
+ HEADER_SIZE, Operation, PrepareHeader, PrepareOkHeader, ReplyHeader,
RequestHeader,
+ StartViewChangeHeader, StartViewHeader, message::Message,
+};
+pub use dispatch::{COMMAND_TABLE, CommandMeta, lookup_by_operation,
lookup_command};
pub use error::WireError;
-pub use frame::*;
-pub use message_layout::*;
+pub use framing::{RequestFrame, ResponseFrame, STATUS_OK};
+pub use message_view::{
+ WireMessageIterator, WireMessageIteratorMut, WireMessageView,
WireMessageViewMut,
+};
pub use primitives::consumer::WireConsumer;
pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier,
WireName};
pub use primitives::partitioning::{MAX_MESSAGES_KEY_LENGTH, WirePartitioning};
diff --git a/core/binary_protocol/src/message_view.rs
b/core/binary_protocol/src/message_view.rs
index c291d9a7e..3159dac5b 100644
--- a/core/binary_protocol/src/message_view.rs
+++ b/core/binary_protocol/src/message_view.rs
@@ -26,6 +26,7 @@ use crate::message_layout::{
MSG_PAYLOAD_LEN_OFFSET, MSG_TIMESTAMP_OFFSET, MSG_USER_HEADERS_LEN_OFFSET,
WIRE_MESSAGE_HEADER_SIZE,
};
+use std::borrow::Cow;
// Private helpers for infallible reads on validated buffers
@@ -72,7 +73,9 @@ fn validate_frame(buf: &[u8]) -> Result<(usize, usize,
usize), WireError> {
let total = WIRE_MESSAGE_HEADER_SIZE
.checked_add(payload_len)
.and_then(|s| s.checked_add(user_headers_len))
- .ok_or_else(|| WireError::Validation("message frame size
overflow".to_string()))?;
+ .ok_or(WireError::Validation(Cow::Borrowed(
+ "message frame size overflow",
+ )))?;
if buf.len() < total {
return Err(WireError::UnexpectedEof {
@@ -371,9 +374,9 @@ impl<'a> WireMessageIteratorMut<'a> {
.and_then(|s| s.checked_add(user_headers_len))
else {
self.remaining = 0;
- return Some(Err(WireError::Validation(
- "message frame size overflow".to_string(),
- )));
+ return Some(Err(WireError::Validation(Cow::Borrowed(
+ "message frame size overflow",
+ ))));
};
(total, rest.len())
};
diff --git a/core/binary_protocol/src/primitives/identifier.rs
b/core/binary_protocol/src/primitives/identifier.rs
index af25c189c..f28614eba 100644
--- a/core/binary_protocol/src/primitives/identifier.rs
+++ b/core/binary_protocol/src/primitives/identifier.rs
@@ -18,6 +18,7 @@
use crate::WireError;
use crate::codec::{WireDecode, WireEncode, read_bytes, read_str, read_u8};
use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
use std::ops::Deref;
// WireName
@@ -40,10 +41,10 @@ impl WireName {
pub fn new(s: impl Into<String>) -> Result<Self, WireError> {
let s = s.into();
if s.is_empty() || s.len() > MAX_WIRE_NAME_LENGTH {
- return Err(WireError::Validation(format!(
+ return Err(WireError::Validation(Cow::Owned(format!(
"wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got {}",
s.len()
- )));
+ ))));
}
Ok(Self(s))
}
@@ -102,9 +103,9 @@ impl WireDecode for WireName {
fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
let name_len = read_u8(buf, 0)? as usize;
if name_len == 0 || name_len > MAX_WIRE_NAME_LENGTH {
- return Err(WireError::Validation(format!(
+ return Err(WireError::Validation(Cow::Owned(format!(
"wire name must be 1-{MAX_WIRE_NAME_LENGTH} bytes, got
{name_len}"
- )));
+ ))));
}
let name = read_str(buf, 1, name_len)?;
Ok((Self(name), 1 + name_len))
@@ -199,9 +200,9 @@ impl WireDecode for WireIdentifier {
match kind {
KIND_NUMERIC => {
if length != NUMERIC_VALUE_LEN as usize {
- return Err(WireError::Validation(format!(
+ return Err(WireError::Validation(Cow::Owned(format!(
"numeric identifier must be {NUMERIC_VALUE_LEN} bytes,
got {length}"
- )));
+ ))));
}
let id = u32::from_le_bytes(
value
@@ -212,9 +213,9 @@ impl WireDecode for WireIdentifier {
}
KIND_STRING => {
if length == 0 {
- return Err(WireError::Validation(
- "string identifier cannot be empty".to_string(),
- ));
+ return Err(WireError::Validation(Cow::Borrowed(
+ "string identifier cannot be empty",
+ )));
}
let s =
std::str::from_utf8(value).map_err(|_|
WireError::InvalidUtf8 { offset: 2 })?;
diff --git a/core/binary_protocol/src/primitives/partitioning.rs
b/core/binary_protocol/src/primitives/partitioning.rs
index 4bfbf5f58..6e3b277de 100644
--- a/core/binary_protocol/src/primitives/partitioning.rs
+++ b/core/binary_protocol/src/primitives/partitioning.rs
@@ -18,6 +18,7 @@
use crate::WireError;
use crate::codec::{WireDecode, WireEncode, read_bytes, read_u8, read_u32_le};
use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
const KIND_BALANCED: u8 = 1;
const KIND_PARTITION_ID: u8 = 2;
@@ -49,15 +50,15 @@ impl WirePartitioning {
/// Returns `WireError::Validation` if `key` is empty or exceeds 255 bytes.
pub fn messages_key(key: Vec<u8>) -> Result<Self, WireError> {
if key.is_empty() {
- return Err(WireError::Validation(
- "messages_key partitioning cannot have empty key".to_string(),
- ));
+ return Err(WireError::Validation(Cow::Borrowed(
+ "messages_key partitioning cannot have empty key",
+ )));
}
if key.len() > MAX_MESSAGES_KEY_LENGTH {
- return Err(WireError::Validation(format!(
+ return Err(WireError::Validation(Cow::Owned(format!(
"messages_key length {} exceeds maximum
{MAX_MESSAGES_KEY_LENGTH}",
key.len()
- )));
+ ))));
}
Ok(Self::MessagesKey(key))
}
@@ -107,26 +108,26 @@ impl WireDecode for WirePartitioning {
match kind {
KIND_BALANCED => {
if length != 0 {
- return Err(WireError::Validation(format!(
+ return Err(WireError::Validation(Cow::Owned(format!(
"balanced partitioning must have length 0, got
{length}"
- )));
+ ))));
}
Ok((Self::Balanced, 2))
}
KIND_PARTITION_ID => {
if length != 4 {
- return Err(WireError::Validation(format!(
+ return Err(WireError::Validation(Cow::Owned(format!(
"partition_id partitioning must have length 4, got
{length}"
- )));
+ ))));
}
let id = read_u32_le(buf, 2)?;
Ok((Self::PartitionId(id), 6))
}
KIND_MESSAGES_KEY => {
if length == 0 {
- return Err(WireError::Validation(
- "messages_key partitioning cannot have empty
key".to_string(),
- ));
+ return Err(WireError::Validation(Cow::Borrowed(
+ "messages_key partitioning cannot have empty key",
+ )));
}
let key = read_bytes(buf, 2, length)?;
Ok((Self::MessagesKey(key.to_vec()), 2 + length))
diff --git a/core/binary_protocol/src/requests/users/create_user.rs
b/core/binary_protocol/src/requests/users/create_user.rs
index 4348426e9..632e077d9 100644
--- a/core/binary_protocol/src/requests/users/create_user.rs
+++ b/core/binary_protocol/src/requests/users/create_user.rs
@@ -20,6 +20,7 @@ use crate::codec::{WireDecode, WireEncode, read_str, read_u8,
read_u32_le};
use crate::primitives::identifier::WireName;
use crate::primitives::permissions::WirePermissions;
use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
/// `CreateUser` request.
///
@@ -85,9 +86,9 @@ impl WireDecode for CreateUserRequest {
let permissions = if has_permissions == 1 && perm_len > 0 {
let (perms, consumed) = WirePermissions::decode(&buf[pos..])?;
if consumed != perm_len {
- return Err(WireError::Validation(format!(
+ return Err(WireError::Validation(Cow::Owned(format!(
"permissions length mismatch: header says {perm_len},
decoded {consumed}"
- )));
+ ))));
}
pos += consumed;
Some(perms)
diff --git a/core/binary_protocol/src/requests/users/update_permissions.rs
b/core/binary_protocol/src/requests/users/update_permissions.rs
index 6b5bcbbc4..16de85737 100644
--- a/core/binary_protocol/src/requests/users/update_permissions.rs
+++ b/core/binary_protocol/src/requests/users/update_permissions.rs
@@ -20,6 +20,7 @@ use crate::WireIdentifier;
use crate::codec::{WireDecode, WireEncode, read_u8, read_u32_le};
use crate::primitives::permissions::WirePermissions;
use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
/// `UpdatePermissions` request.
///
@@ -67,9 +68,9 @@ impl WireDecode for UpdatePermissionsRequest {
let permissions = if has_permissions == 1 && perm_len > 0 {
let (perms, consumed) = WirePermissions::decode(&buf[pos..])?;
if consumed != perm_len {
- return Err(WireError::Validation(format!(
+ return Err(WireError::Validation(Cow::Owned(format!(
"permissions length mismatch: header says {perm_len},
decoded {consumed}"
- )));
+ ))));
}
pos += consumed;
Some(perms)
diff --git a/core/binary_protocol/src/responses/clients/client_response.rs
b/core/binary_protocol/src/responses/clients/client_response.rs
index 73e871402..8de029353 100644
--- a/core/binary_protocol/src/responses/clients/client_response.rs
+++ b/core/binary_protocol/src/responses/clients/client_response.rs
@@ -33,7 +33,7 @@ pub struct ConsumerGroupInfoResponse {
}
impl ConsumerGroupInfoResponse {
- const SIZE: usize = 12;
+ pub(crate) const SIZE: usize = 12;
}
impl WireEncode for ConsumerGroupInfoResponse {
diff --git a/core/binary_protocol/src/responses/clients/get_client.rs
b/core/binary_protocol/src/responses/clients/get_client.rs
index f2fe4c2f0..3fa4ffedc 100644
--- a/core/binary_protocol/src/responses/clients/get_client.rs
+++ b/core/binary_protocol/src/responses/clients/get_client.rs
@@ -56,7 +56,12 @@ impl WireDecode for ClientDetailsResponse {
fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
let (client, mut pos) = ClientResponse::decode(buf)?;
let count = client.consumer_groups_count as usize;
- let mut consumer_groups = Vec::with_capacity(count);
+ let remaining = buf.len().saturating_sub(pos);
+ let mut consumer_groups =
Vec::with_capacity(crate::codec::capped_capacity(
+ count,
+ remaining,
+ ConsumerGroupInfoResponse::SIZE,
+ ));
for _ in 0..count {
let (group, consumed) =
ConsumerGroupInfoResponse::decode(&buf[pos..])?;
pos += consumed;
diff --git
a/core/binary_protocol/src/responses/consumer_groups/get_consumer_group.rs
b/core/binary_protocol/src/responses/consumer_groups/get_consumer_group.rs
index 39ecab2d1..551cf0da1 100644
--- a/core/binary_protocol/src/responses/consumer_groups/get_consumer_group.rs
+++ b/core/binary_protocol/src/responses/consumer_groups/get_consumer_group.rs
@@ -51,7 +51,12 @@ impl WireDecode for ConsumerGroupMemberResponse {
fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
let id = read_u32_le(buf, 0)?;
let partitions_count = read_u32_le(buf, 4)?;
- let mut partitions = Vec::with_capacity(partitions_count as usize);
+ let remaining = buf.len().saturating_sub(8);
+ let mut partitions = Vec::with_capacity(crate::codec::capped_capacity(
+ partitions_count as usize,
+ remaining,
+ 4,
+ ));
let mut offset = 8;
for _ in 0..partitions_count {
let partition_id = read_u32_le(buf, offset)?;
@@ -207,4 +212,12 @@ mod tests {
);
}
}
+
+ #[test]
+ fn bogus_partition_count_does_not_oom() {
+ let mut buf = BytesMut::new();
+ buf.put_u32_le(1); // id
+ buf.put_u32_le(u32::MAX); // partitions_count
+ assert!(ConsumerGroupMemberResponse::decode(&buf).is_err());
+ }
}
diff --git a/core/binary_protocol/src/responses/streams/get_stream.rs
b/core/binary_protocol/src/responses/streams/get_stream.rs
index 3470e3743..25aae5177 100644
--- a/core/binary_protocol/src/responses/streams/get_stream.rs
+++ b/core/binary_protocol/src/responses/streams/get_stream.rs
@@ -20,6 +20,7 @@ use crate::codec::{WireDecode, WireEncode, read_u8,
read_u32_le, read_u64_le};
use crate::primitives::identifier::WireName;
use crate::responses::streams::StreamResponse;
use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
/// Topic header within a `GetStream` response.
///
@@ -141,11 +142,11 @@ impl WireDecode for GetStreamResponse {
topics.push(topic);
}
if topics.len() != stream.topics_count as usize {
- return Err(WireError::Validation(format!(
+ return Err(WireError::Validation(Cow::Owned(format!(
"stream.topics_count={} but decoded {} topics",
stream.topics_count,
topics.len()
- )));
+ ))));
}
Ok((Self { stream, topics }, pos))
}
diff --git a/core/binary_protocol/src/responses/system/get_cluster_metadata.rs
b/core/binary_protocol/src/responses/system/get_cluster_metadata.rs
new file mode 100644
index 000000000..0b343e644
--- /dev/null
+++ b/core/binary_protocol/src/responses/system/get_cluster_metadata.rs
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_str, read_u8, read_u16_le,
read_u32_le};
+use bytes::{BufMut, BytesMut};
+
+/// `GetClusterMetadata` response: cluster name and list of nodes.
+///
+/// Wire format:
+/// ```text
+/// [name_len:4 LE][name:N][nodes_count:4 LE][ClusterNodeResponse]*
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct ClusterMetadataResponse {
+ pub name: String,
+ pub nodes: Vec<ClusterNodeResponse>,
+}
+
+/// A single node within the cluster metadata.
+///
+/// Wire format:
+/// ```text
+/// [name_len:4 LE][name:N][ip_len:4 LE][ip:N]
+/// [tcp:2 LE][quic:2 LE][http:2 LE][websocket:2 LE]
+/// [role:1][status:1]
+/// ```
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct ClusterNodeResponse {
+ pub name: String,
+ pub ip: String,
+ pub tcp_port: u16,
+ pub quic_port: u16,
+ pub http_port: u16,
+ pub websocket_port: u16,
+ pub role: u8,
+ pub status: u8,
+}
+
+const NODE_FIXED_SIZE: usize = 4 + 4 + 8 + 1 + 1; // name_len + ip_len + ports
+ role + status
+
+impl WireEncode for ClusterNodeResponse {
+ fn encoded_size(&self) -> usize {
+ NODE_FIXED_SIZE + self.name.len() + self.ip.len()
+ }
+
+ #[allow(clippy::cast_possible_truncation)]
+ fn encode(&self, buf: &mut BytesMut) {
+ buf.put_u32_le(self.name.len() as u32);
+ buf.put_slice(self.name.as_bytes());
+ buf.put_u32_le(self.ip.len() as u32);
+ buf.put_slice(self.ip.as_bytes());
+ buf.put_u16_le(self.tcp_port);
+ buf.put_u16_le(self.quic_port);
+ buf.put_u16_le(self.http_port);
+ buf.put_u16_le(self.websocket_port);
+ buf.put_u8(self.role);
+ buf.put_u8(self.status);
+ }
+}
+
+impl WireDecode for ClusterNodeResponse {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let name_len = read_u32_le(buf, 0)? as usize;
+ let name = read_str(buf, 4, name_len)?;
+ let pos = 4 + name_len;
+
+ let ip_len = read_u32_le(buf, pos)? as usize;
+ let ip = read_str(buf, pos + 4, ip_len)?;
+ let pos = pos + 4 + ip_len;
+
+ let tcp_port = read_u16_le(buf, pos)?;
+ let quic_port = read_u16_le(buf, pos + 2)?;
+ let http_port = read_u16_le(buf, pos + 4)?;
+ let websocket_port = read_u16_le(buf, pos + 6)?;
+ let role = read_u8(buf, pos + 8)?;
+ let status = read_u8(buf, pos + 9)?;
+
+ Ok((
+ Self {
+ name,
+ ip,
+ tcp_port,
+ quic_port,
+ http_port,
+ websocket_port,
+ role,
+ status,
+ },
+ pos + 10,
+ ))
+ }
+}
+
+impl WireEncode for ClusterMetadataResponse {
+ fn encoded_size(&self) -> usize {
+ 4 + self.name.len()
+ + 4
+ + self
+ .nodes
+ .iter()
+ .map(WireEncode::encoded_size)
+ .sum::<usize>()
+ }
+
+ #[allow(clippy::cast_possible_truncation)]
+ fn encode(&self, buf: &mut BytesMut) {
+ buf.put_u32_le(self.name.len() as u32);
+ buf.put_slice(self.name.as_bytes());
+ buf.put_u32_le(self.nodes.len() as u32);
+ for node in &self.nodes {
+ node.encode(buf);
+ }
+ }
+}
+
+impl WireDecode for ClusterMetadataResponse {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let name_len = read_u32_le(buf, 0)? as usize;
+ let name = read_str(buf, 4, name_len)?;
+ let mut pos = 4 + name_len;
+
+ let nodes_count = read_u32_le(buf, pos)? as usize;
+ pos += 4;
+
+ let remaining = buf.len().saturating_sub(pos);
+ let mut nodes = Vec::with_capacity(crate::codec::capped_capacity(
+ nodes_count,
+ remaining,
+ NODE_FIXED_SIZE,
+ ));
+ for _ in 0..nodes_count {
+ let (node, consumed) = ClusterNodeResponse::decode(&buf[pos..])?;
+ pos += consumed;
+ nodes.push(node);
+ }
+
+ Ok((Self { name, nodes }, pos))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ fn sample_node(name: &str, ip: &str) -> ClusterNodeResponse {
+ ClusterNodeResponse {
+ name: name.to_string(),
+ ip: ip.to_string(),
+ tcp_port: 8090,
+ quic_port: 8091,
+ http_port: 3000,
+ websocket_port: 3001,
+ role: 1,
+ status: 1,
+ }
+ }
+
+ #[test]
+ fn node_roundtrip() {
+ let node = sample_node("node-1", "192.168.1.1");
+ let bytes = node.to_bytes();
+ let (decoded, consumed) = ClusterNodeResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, node);
+ }
+
+ #[test]
+ fn roundtrip_no_nodes() {
+ let resp = ClusterMetadataResponse {
+ name: "test-cluster".to_string(),
+ nodes: vec![],
+ };
+ let bytes = resp.to_bytes();
+ let (decoded, consumed) =
ClusterMetadataResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, resp);
+ }
+
+ #[test]
+ fn roundtrip_with_nodes() {
+ let resp = ClusterMetadataResponse {
+ name: "prod-cluster".to_string(),
+ nodes: vec![
+ sample_node("node-1", "10.0.0.1"),
+ sample_node("node-2", "10.0.0.2"),
+ sample_node("node-3", "10.0.0.3"),
+ ],
+ };
+ let bytes = resp.to_bytes();
+ let (decoded, consumed) =
ClusterMetadataResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, resp);
+ }
+
+ #[test]
+ fn truncated_returns_error() {
+ let resp = ClusterMetadataResponse {
+ name: "c".to_string(),
+ nodes: vec![sample_node("n", "1.2.3.4")],
+ };
+ let bytes = resp.to_bytes();
+ for i in 0..bytes.len() {
+ assert!(
+ ClusterMetadataResponse::decode(&bytes[..i]).is_err(),
+ "expected error for truncation at byte {i}"
+ );
+ }
+ }
+
+ #[test]
+ fn empty_cluster_name() {
+ let resp = ClusterMetadataResponse {
+ name: String::new(),
+ nodes: vec![],
+ };
+ let bytes = resp.to_bytes();
+ let (decoded, _) = ClusterMetadataResponse::decode(&bytes).unwrap();
+ assert_eq!(decoded.name, "");
+ }
+
+ #[test]
+ fn bogus_node_count_does_not_oom() {
+ let mut buf = BytesMut::new();
+ buf.put_u32_le(4);
+ buf.put_slice(b"test");
+ buf.put_u32_le(u32::MAX);
+ assert!(ClusterMetadataResponse::decode(&buf).is_err());
+ }
+}
diff --git a/core/binary_protocol/src/frame.rs
b/core/binary_protocol/src/responses/system/get_me.rs
similarity index 56%
rename from core/binary_protocol/src/frame.rs
rename to core/binary_protocol/src/responses/system/get_me.rs
index 77a5f2b3e..5b2be1091 100644
--- a/core/binary_protocol/src/frame.rs
+++ b/core/binary_protocol/src/responses/system/get_me.rs
@@ -15,18 +15,8 @@
// specific language governing permissions and limitations
// under the License.
-// TODO(hubcio): Legacy framing constants for the current binary protocol.
-// Once VSR consensus is integrated, both client-server and
-// replica-replica traffic will use the unified 256-byte
-// consensus header (`consensus::header::HEADER_SIZE`).
-// These constants will be removed at that point.
-
-/// Request frame: `[length:4 LE][code:4 LE][payload:N]`
-/// `length` = size of code + payload = 4 + N
-pub const REQUEST_HEADER_SIZE: usize = 4;
-
-/// Response frame: `[status:4 LE][length:4 LE][payload:N]`
-pub const RESPONSE_HEADER_SIZE: usize = 8;
-
-/// Status code for a successful response.
-pub const STATUS_OK: u32 = 0;
+/// `GetMe` response: same wire format as `ClientDetailsResponse`.
+///
+/// The server returns the authenticated client's own info using the
+/// same `map_client` serialization as `GetClient`.
+pub type GetMeResponse =
crate::responses::clients::get_client::ClientDetailsResponse;
diff --git a/core/binary_protocol/src/responses/system/get_snapshot.rs
b/core/binary_protocol/src/responses/system/get_snapshot.rs
new file mode 100644
index 000000000..2fcbb9291
--- /dev/null
+++ b/core/binary_protocol/src/responses/system/get_snapshot.rs
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode};
+use bytes::{BufMut, BytesMut};
+
+/// `GetSnapshot` response: raw snapshot data (ZIP archive).
+///
+/// The server produces a ZIP file containing diagnostic data.
+/// The payload is the complete archive with no additional framing.
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct GetSnapshotResponse {
+ pub data: Vec<u8>,
+}
+
+impl WireEncode for GetSnapshotResponse {
+ fn encoded_size(&self) -> usize {
+ self.data.len()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ buf.put_slice(&self.data);
+ }
+}
+
+impl WireDecode for GetSnapshotResponse {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ Ok((Self { data: buf.to_vec() }, buf.len()))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn roundtrip() {
+ let resp = GetSnapshotResponse {
+ data: vec![0x50, 0x4B, 0x03, 0x04, 1, 2, 3, 4],
+ };
+ let bytes = resp.to_bytes();
+ let (decoded, consumed) = GetSnapshotResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, resp);
+ }
+
+ #[test]
+ fn empty_snapshot() {
+ let resp = GetSnapshotResponse { data: vec![] };
+ let bytes = resp.to_bytes();
+ assert!(bytes.is_empty());
+ let (decoded, consumed) = GetSnapshotResponse::decode(&bytes).unwrap();
+ assert_eq!(consumed, 0);
+ assert_eq!(decoded, resp);
+ }
+}
diff --git a/core/binary_protocol/src/responses/system/get_stats.rs
b/core/binary_protocol/src/responses/system/get_stats.rs
index ec9467007..09923b6a8 100644
--- a/core/binary_protocol/src/responses/system/get_stats.rs
+++ b/core/binary_protocol/src/responses/system/get_stats.rs
@@ -221,7 +221,12 @@ impl WireDecode for StatsResponse {
let cache_count = read_u32_le(buf, pos)? as usize;
pos += 4;
- let mut cache_metrics = Vec::with_capacity(cache_count);
+ let remaining = buf.len().saturating_sub(pos);
+ let mut cache_metrics =
Vec::with_capacity(crate::codec::capped_capacity(
+ cache_count,
+ remaining,
+ CacheMetricEntry::SIZE,
+ ));
for _ in 0..cache_count {
let stream_id = read_u32_le(buf, pos)?;
let topic_id = read_u32_le(buf, pos + 4)?;
diff --git a/core/binary_protocol/src/responses/system/mod.rs
b/core/binary_protocol/src/responses/system/mod.rs
index 7394548b6..8b3cafc70 100644
--- a/core/binary_protocol/src/responses/system/mod.rs
+++ b/core/binary_protocol/src/responses/system/mod.rs
@@ -15,9 +15,15 @@
// specific language governing permissions and limitations
// under the License.
+pub mod get_cluster_metadata;
+pub mod get_me;
+pub mod get_snapshot;
pub mod get_stats;
mod ping;
pub use super::EmptyResponse;
+pub use get_cluster_metadata::{ClusterMetadataResponse, ClusterNodeResponse};
+pub use get_me::GetMeResponse;
+pub use get_snapshot::GetSnapshotResponse;
pub use get_stats::StatsResponse;
pub use ping::PingResponse;
diff --git a/core/binary_protocol/src/responses/topics/get_topic.rs
b/core/binary_protocol/src/responses/topics/get_topic.rs
index 80b510a9c..b90e52156 100644
--- a/core/binary_protocol/src/responses/topics/get_topic.rs
+++ b/core/binary_protocol/src/responses/topics/get_topic.rs
@@ -19,6 +19,7 @@ use crate::WireError;
use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le};
use crate::responses::streams::get_stream::TopicHeader;
use bytes::{BufMut, BytesMut};
+use std::borrow::Cow;
/// Partition details within a `GetTopic` response.
///
@@ -120,11 +121,11 @@ impl WireDecode for GetTopicResponse {
partitions.push(partition);
}
if partitions.len() != topic.partitions_count as usize {
- return Err(WireError::Validation(format!(
+ return Err(WireError::Validation(Cow::Owned(format!(
"topic.partitions_count={} but decoded {} partitions",
topic.partitions_count,
partitions.len()
- )));
+ ))));
}
Ok((Self { topic, partitions }, pos))
}