This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch refactor-binary-7-http in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 9a603e12dd36bd5014ad789251c0d0268b6734de Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Mar 27 14:52:46 2026 +0100 fix(server,sdk): harden wire type migration The wire type refactor (prior commits) left several issues identified during 4-expert review: - WithId WireDecode impls panicked on corrupted WAL entries with inflated length fields instead of returning errors. All 5 wrapper types now bounds-check before slicing. - Binary handlers lost domain validation when command structs were replaced with wire types. Username (3-50), password (3-100), PAT name (3-30), and partition count constraints were only enforced on the HTTP path. Restored in 6 handlers. - Topic wire conversion silently degraded unknown compression codes to None via unwrap_or. Changed From to TryFrom with error propagation through SDK callers. - Duplicate 54-line permissions conversion (owned From impl + borrowing free function) collapsed to single delegation. --- core/common/src/traits/binary_impls/streams.rs | 4 +- core/common/src/traits/binary_impls/topics.rs | 6 +- core/common/src/wire_conversions.rs | 104 +++++++-------------- .../partitions/delete_partitions_handler.rs | 4 + .../create_personal_access_token_handler.rs | 10 ++ .../handlers/users/change_password_handler.rs | 10 ++ .../binary/handlers/users/create_user_handler.rs | 12 +++ .../binary/handlers/users/login_user_handler.rs | 12 +++ .../binary/handlers/users/update_user_handler.rs | 8 ++ core/server/src/state/models.rs | 93 ++++++++++++++++-- 10 files changed, 179 insertions(+), 84 deletions(-) diff --git a/core/common/src/traits/binary_impls/streams.rs b/core/common/src/traits/binary_impls/streams.rs index b64c99261..bbfb015d3 100644 --- a/core/common/src/traits/binary_impls/streams.rs +++ b/core/common/src/traits/binary_impls/streams.rs @@ -47,7 +47,7 @@ impl<B: BinaryClient> StreamClient for B { return Ok(None); } let wire_resp = super::decode_response::<GetStreamResponse>(&response)?; - Ok(Some(StreamDetails::from(wire_resp))) + Ok(Some(StreamDetails::try_from(wire_resp)?)) } async fn get_streams(&self) -> Result<Vec<Stream>, IggyError> { @@ -72,7 +72,7 @@ impl<B: BinaryClient> StreamClient for B { ) .await?; let wire_resp = super::decode_response::<GetStreamResponse>(&response)?; - Ok(StreamDetails::from(wire_resp)) + Ok(StreamDetails::try_from(wire_resp)?) } async fn update_stream(&self, stream_id: &Identifier, name: &str) -> Result<(), IggyError> { diff --git a/core/common/src/traits/binary_impls/topics.rs b/core/common/src/traits/binary_impls/topics.rs index 9304ed54c..4c5c4ddbb 100644 --- a/core/common/src/traits/binary_impls/topics.rs +++ b/core/common/src/traits/binary_impls/topics.rs @@ -59,7 +59,7 @@ impl<B: BinaryClient> TopicClient for B { return Ok(None); } let wire_resp = super::decode_response::<GetTopicResponse>(&response)?; - Ok(Some(TopicDetails::from(wire_resp))) + Ok(Some(TopicDetails::try_from(wire_resp)?)) } async fn get_topics(&self, stream_id: &Identifier) -> Result<Vec<Topic>, IggyError> { @@ -78,7 +78,7 @@ impl<B: BinaryClient> TopicClient for B { return Ok(Vec::new()); } let wire_resp = super::decode_response::<GetTopicsResponse>(&response)?; - Ok(topics_from_wire(wire_resp)) + Ok(topics_from_wire(wire_resp)?) } async fn create_topic( @@ -110,7 +110,7 @@ impl<B: BinaryClient> TopicClient for B { ) .await?; let wire_resp = super::decode_response::<GetTopicResponse>(&response)?; - Ok(TopicDetails::from(wire_resp)) + Ok(TopicDetails::try_from(wire_resp)?) } async fn update_topic( diff --git a/core/common/src/wire_conversions.rs b/core/common/src/wire_conversions.rs index a1bb47556..eaa728cdb 100644 --- a/core/common/src/wire_conversions.rs +++ b/core/common/src/wire_conversions.rs @@ -84,11 +84,17 @@ impl From<StreamResponse> for Stream { } } -impl From<GetStreamResponse> for StreamDetails { - fn from(w: GetStreamResponse) -> Self { - let mut topics: Vec<Topic> = w.topics.into_iter().map(Topic::from).collect(); +impl TryFrom<GetStreamResponse> for StreamDetails { + type Error = IggyError; + + fn try_from(w: GetStreamResponse) -> Result<Self, Self::Error> { + let mut topics: Vec<Topic> = w + .topics + .into_iter() + .map(Topic::try_from) + .collect::<Result<_, _>>()?; topics.sort_by_key(|t| t.id); - Self { + Ok(Self { id: w.stream.id, created_at: w.stream.created_at.into(), name: w.stream.name.to_string(), @@ -96,7 +102,7 @@ impl From<GetStreamResponse> for StreamDetails { messages_count: w.stream.messages_count, topics_count: w.stream.topics_count, topics, - } + }) } } @@ -110,14 +116,16 @@ pub fn streams_from_wire(w: GetStreamsResponse) -> Vec<Stream> { // Topics // --------------------------------------------------------------------------- -impl From<TopicHeader> for Topic { - fn from(w: TopicHeader) -> Self { +impl TryFrom<TopicHeader> for Topic { + type Error = IggyError; + + fn try_from(w: TopicHeader) -> Result<Self, Self::Error> { let message_expiry = match w.message_expiry { 0 => IggyExpiry::NeverExpire, v => v.into(), }; let max_topic_size: MaxTopicSize = w.max_topic_size.into(); - Self { + Ok(Self { id: w.id, created_at: w.created_at.into(), name: w.name.to_string(), @@ -125,11 +133,10 @@ impl From<TopicHeader> for Topic { size: IggyByteSize::from(w.size_bytes), messages_count: w.messages_count, message_expiry, - compression_algorithm: CompressionAlgorithm::from_code(w.compression_algorithm) - .unwrap_or(CompressionAlgorithm::None), + compression_algorithm: CompressionAlgorithm::from_code(w.compression_algorithm)?, max_topic_size, replication_factor: w.replication_factor, - } + }) } } @@ -146,13 +153,15 @@ impl From<PartitionResponse> for Partition { } } -impl From<GetTopicResponse> for TopicDetails { - fn from(w: GetTopicResponse) -> Self { - let topic = Topic::from(w.topic); +impl TryFrom<GetTopicResponse> for TopicDetails { + type Error = IggyError; + + fn try_from(w: GetTopicResponse) -> Result<Self, Self::Error> { + let topic = Topic::try_from(w.topic)?; let mut partitions: Vec<Partition> = w.partitions.into_iter().map(Partition::from).collect(); partitions.sort_by_key(|p| p.id); - Self { + Ok(Self { id: topic.id, created_at: topic.created_at, name: topic.name, @@ -164,14 +173,18 @@ impl From<GetTopicResponse> for TopicDetails { replication_factor: topic.replication_factor, partitions_count: topic.partitions_count, partitions, - } + }) } } -pub fn topics_from_wire(w: GetTopicsResponse) -> Vec<Topic> { - let mut topics: Vec<Topic> = w.topics.into_iter().map(Topic::from).collect(); +pub fn topics_from_wire(w: GetTopicsResponse) -> Result<Vec<Topic>, IggyError> { + let mut topics: Vec<Topic> = w + .topics + .into_iter() + .map(Topic::try_from) + .collect::<Result<_, _>>()?; topics.sort_by_key(|t| t.id); - topics + Ok(topics) } // --------------------------------------------------------------------------- @@ -635,58 +648,7 @@ impl From<WirePermissions> for Permissions { /// Convert `&WirePermissions` to domain `Permissions` without consuming the input. pub fn wire_permissions_to_permissions(wp: &WirePermissions) -> Permissions { - let streams = if wp.streams.is_empty() { - None - } else { - let mut map = BTreeMap::new(); - for ws in &wp.streams { - let topics = if ws.topics.is_empty() { - None - } else { - let mut tmap = BTreeMap::new(); - for wt in &ws.topics { - tmap.insert( - wt.topic_id as usize, - TopicPermissions { - manage_topic: wt.manage_topic, - read_topic: wt.read_topic, - poll_messages: wt.poll_messages, - send_messages: wt.send_messages, - }, - ); - } - Some(tmap) - }; - map.insert( - ws.stream_id as usize, - StreamPermissions { - manage_stream: ws.manage_stream, - read_stream: ws.read_stream, - manage_topics: ws.manage_topics, - read_topics: ws.read_topics, - poll_messages: ws.poll_messages, - send_messages: ws.send_messages, - topics, - }, - ); - } - Some(map) - }; - Permissions { - global: GlobalPermissions { - manage_servers: wp.global.manage_servers, - read_servers: wp.global.read_servers, - manage_users: wp.global.manage_users, - read_users: wp.global.read_users, - manage_streams: wp.global.manage_streams, - read_streams: wp.global.read_streams, - manage_topics: wp.global.manage_topics, - read_topics: wp.global.read_topics, - poll_messages: wp.global.poll_messages, - send_messages: wp.global.send_messages, - }, - streams, - } + Permissions::from(wp.clone()) } // --------------------------------------------------------------------------- diff --git a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs index f925d958b..14fcd577e 100644 --- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs +++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs @@ -39,6 +39,10 @@ pub async fn handle_delete_partitions( ); shard.ensure_authenticated(session)?; + if req.partitions_count == 0 { + return Err(IggyError::TooManyPartitions); + } + let request = ShardRequest::control_plane(ShardRequestPayload::DeletePartitionsRequest { user_id: session.get_user_id(), command: req, diff --git a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs index 263872da2..6bfc77fa0 100644 --- a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs +++ b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs @@ -25,6 +25,9 @@ use iggy_binary_protocol::WireName; use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::requests::personal_access_tokens::CreatePersonalAccessTokenRequest; use iggy_binary_protocol::responses::personal_access_tokens::RawPersonalAccessTokenResponse; +use iggy_common::defaults::{ + MAX_PERSONAL_ACCESS_TOKEN_NAME_LENGTH, MIN_PERSONAL_ACCESS_TOKEN_NAME_LENGTH, +}; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; @@ -42,6 +45,13 @@ pub async fn handle_create_personal_access_token( ); shard.ensure_authenticated(session)?; + let name_len = req.name.as_str().len(); + if !(MIN_PERSONAL_ACCESS_TOKEN_NAME_LENGTH..=MAX_PERSONAL_ACCESS_TOKEN_NAME_LENGTH) + .contains(&name_len) + { + return Err(IggyError::InvalidPersonalAccessTokenName); + } + let request = ShardRequest::control_plane(ShardRequestPayload::CreatePersonalAccessTokenRequest { user_id: session.get_user_id(), diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs b/core/server/src/binary/handlers/users/change_password_handler.rs index 17fe0d071..860b0d82e 100644 --- a/core/server/src/binary/handlers/users/change_password_handler.rs +++ b/core/server/src/binary/handlers/users/change_password_handler.rs @@ -22,6 +22,7 @@ use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload}; use crate::streaming::session::Session; use iggy_binary_protocol::requests::users::ChangePasswordRequest; +use iggy_common::defaults::{MAX_PASSWORD_LENGTH, MIN_PASSWORD_LENGTH}; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; @@ -39,6 +40,15 @@ pub async fn handle_change_password( ); shard.ensure_authenticated(session)?; + let current_len = req.current_password.len(); + if !(MIN_PASSWORD_LENGTH..=MAX_PASSWORD_LENGTH).contains(¤t_len) { + return Err(IggyError::InvalidPassword); + } + let new_len = req.new_password.len(); + if !(MIN_PASSWORD_LENGTH..=MAX_PASSWORD_LENGTH).contains(&new_len) { + return Err(IggyError::InvalidPassword); + } + let request = ShardRequest::control_plane(ShardRequestPayload::ChangePasswordRequest { user_id: session.get_user_id(), command: req, diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs b/core/server/src/binary/handlers/users/create_user_handler.rs index 04aaf8a2e..7532523a4 100644 --- a/core/server/src/binary/handlers/users/create_user_handler.rs +++ b/core/server/src/binary/handlers/users/create_user_handler.rs @@ -25,6 +25,9 @@ use iggy_binary_protocol::WireName; use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::requests::users::CreateUserRequest; use iggy_binary_protocol::responses::users::{UserDetailsResponse, UserResponse}; +use iggy_common::defaults::{ + MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH, +}; use iggy_common::wire_conversions::permissions_to_wire; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; @@ -44,6 +47,15 @@ pub async fn handle_create_user( shard.ensure_authenticated(session)?; shard.metadata.perm_create_user(session.get_user_id())?; + let username_len = req.username.as_str().len(); + if !(MIN_USERNAME_LENGTH..=MAX_USERNAME_LENGTH).contains(&username_len) { + return Err(IggyError::InvalidUsername); + } + let password_len = req.password.len(); + if !(MIN_PASSWORD_LENGTH..=MAX_PASSWORD_LENGTH).contains(&password_len) { + return Err(IggyError::InvalidPassword); + } + let request = ShardRequest::control_plane(ShardRequestPayload::CreateUserRequest { user_id: session.get_user_id(), command: req, diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs b/core/server/src/binary/handlers/users/login_user_handler.rs index b3d603552..aa0924427 100644 --- a/core/server/src/binary/handlers/users/login_user_handler.rs +++ b/core/server/src/binary/handlers/users/login_user_handler.rs @@ -26,6 +26,9 @@ use iggy_binary_protocol::requests::users::LoginUserRequest; use iggy_binary_protocol::responses::users::IdentityResponse; use iggy_common::IggyError; use iggy_common::SenderKind; +use iggy_common::defaults::{ + MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH, MIN_USERNAME_LENGTH, +}; use std::rc::Rc; use tracing::{debug, info, instrument, warn}; @@ -42,6 +45,15 @@ pub async fn handle_login_user( } let username = req.username.as_str(); + let username_len = username.len(); + if !(MIN_USERNAME_LENGTH..=MAX_USERNAME_LENGTH).contains(&username_len) { + return Err(IggyError::InvalidUsername); + } + let password_len = req.password.len(); + if !(MIN_PASSWORD_LENGTH..=MAX_PASSWORD_LENGTH).contains(&password_len) { + return Err(IggyError::InvalidPassword); + } + debug!("session: {session}, command: login_user, username: {username}"); info!("Logging in user: {username} ..."); diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs b/core/server/src/binary/handlers/users/update_user_handler.rs index 014bc9730..23b2e51cd 100644 --- a/core/server/src/binary/handlers/users/update_user_handler.rs +++ b/core/server/src/binary/handlers/users/update_user_handler.rs @@ -22,6 +22,7 @@ use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload}; use crate::streaming::session::Session; use iggy_binary_protocol::requests::users::UpdateUserRequest; +use iggy_common::defaults::{MAX_USERNAME_LENGTH, MIN_USERNAME_LENGTH}; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; @@ -40,6 +41,13 @@ pub async fn handle_update_user( shard.ensure_authenticated(session)?; shard.metadata.perm_update_user(session.get_user_id())?; + if let Some(ref username) = req.username { + let username_len = username.as_str().len(); + if !(MIN_USERNAME_LENGTH..=MAX_USERNAME_LENGTH).contains(&username_len) { + return Err(IggyError::InvalidUsername); + } + } + let request = ShardRequest::control_plane(ShardRequestPayload::UpdateUserRequest { user_id: session.get_user_id(), command: req, diff --git a/core/server/src/state/models.rs b/core/server/src/state/models.rs index 86af7917c..619e1a82e 100644 --- a/core/server/src/state/models.rs +++ b/core/server/src/state/models.rs @@ -131,8 +131,22 @@ impl WireDecode for CreateStreamWithId { } let stream_id = u32::from_le_bytes(buf[0..4].try_into().unwrap()); let command_length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize; - let (command, _) = CreateStreamRequest::decode(&buf[8..8 + command_length])?; - Ok((Self { stream_id, command }, 8 + command_length)) + let total = 8usize.checked_add(command_length).ok_or( + iggy_binary_protocol::WireError::UnexpectedEof { + offset: 4, + need: command_length, + have: buf.len() - 8, + }, + )?; + if buf.len() < total { + return Err(iggy_binary_protocol::WireError::UnexpectedEof { + offset: 8, + need: command_length, + have: buf.len() - 8, + }); + } + let (command, _) = CreateStreamRequest::decode(&buf[8..total])?; + Ok((Self { stream_id, command }, total)) } } @@ -159,8 +173,22 @@ impl WireDecode for CreateTopicWithId { } let topic_id = u32::from_le_bytes(buf[0..4].try_into().unwrap()); let command_length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize; - let (command, _) = CreateTopicRequest::decode(&buf[8..8 + command_length])?; - Ok((Self { topic_id, command }, 8 + command_length)) + let total = 8usize.checked_add(command_length).ok_or( + iggy_binary_protocol::WireError::UnexpectedEof { + offset: 4, + need: command_length, + have: buf.len() - 8, + }, + )?; + if buf.len() < total { + return Err(iggy_binary_protocol::WireError::UnexpectedEof { + offset: 8, + need: command_length, + have: buf.len() - 8, + }); + } + let (command, _) = CreateTopicRequest::decode(&buf[8..total])?; + Ok((Self { topic_id, command }, total)) } } @@ -187,8 +215,22 @@ impl WireDecode for CreateConsumerGroupWithId { } let group_id = u32::from_le_bytes(buf[0..4].try_into().unwrap()); let command_length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize; - let (command, _) = CreateConsumerGroupRequest::decode(&buf[8..8 + command_length])?; - Ok((Self { group_id, command }, 8 + command_length)) + let total = 8usize.checked_add(command_length).ok_or( + iggy_binary_protocol::WireError::UnexpectedEof { + offset: 4, + need: command_length, + have: buf.len() - 8, + }, + )?; + if buf.len() < total { + return Err(iggy_binary_protocol::WireError::UnexpectedEof { + offset: 8, + need: command_length, + have: buf.len() - 8, + }); + } + let (command, _) = CreateConsumerGroupRequest::decode(&buf[8..total])?; + Ok((Self { group_id, command }, total)) } } @@ -215,8 +257,22 @@ impl WireDecode for CreateUserWithId { } let user_id = u32::from_le_bytes(buf[0..4].try_into().unwrap()); let command_length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize; - let (command, _) = CreateUserRequest::decode(&buf[8..8 + command_length])?; - Ok((Self { user_id, command }, 8 + command_length)) + let total = 8usize.checked_add(command_length).ok_or( + iggy_binary_protocol::WireError::UnexpectedEof { + offset: 4, + need: command_length, + have: buf.len() - 8, + }, + )?; + if buf.len() < total { + return Err(iggy_binary_protocol::WireError::UnexpectedEof { + offset: 8, + need: command_length, + have: buf.len() - 8, + }); + } + let (command, _) = CreateUserRequest::decode(&buf[8..total])?; + Ok((Self { user_id, command }, total)) } } @@ -244,12 +300,33 @@ impl WireDecode for CreatePersonalAccessTokenWithHash { } let hash_length = u32::from_le_bytes(buf[0..4].try_into().unwrap()) as usize; let mut pos = 4; + if buf.len() < pos + hash_length { + return Err(iggy_binary_protocol::WireError::UnexpectedEof { + offset: pos, + need: hash_length, + have: buf.len() - pos, + }); + } let hash = std::str::from_utf8(&buf[pos..pos + hash_length]) .map_err(|_| iggy_binary_protocol::WireError::InvalidUtf8 { offset: pos })? .to_string(); pos += hash_length; + if buf.len() < pos + 4 { + return Err(iggy_binary_protocol::WireError::UnexpectedEof { + offset: pos, + need: 4, + have: buf.len() - pos, + }); + } let command_length = u32::from_le_bytes(buf[pos..pos + 4].try_into().unwrap()) as usize; pos += 4; + if buf.len() < pos + command_length { + return Err(iggy_binary_protocol::WireError::UnexpectedEof { + offset: pos, + need: command_length, + have: buf.len() - pos, + }); + } let (command, _) = CreatePersonalAccessTokenRequest::decode(&buf[pos..pos + command_length])?; pos += command_length;
