This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch refactor-binary-7-http in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 06f091ed500dfa543eca5da74ed01acee7265829 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Mar 27 13:50:55 2026 +0100 fix(server,sdk): harden wire type migration from review findings The wire type refactor (prior commits) left several issues identified during expert review: - EntryCommand::decode panicked on corrupted WAL entries with inflated length fields instead of returning an error - partitioning_to_wire panicked via unwrap on malformed Partitioning structs reachable through public fields - Permission/identifier conversion code was duplicated between dispatch.rs and wire_conversions.rs (maintenance hazard for wire format divergence) - SDK binary impls erased all WireError context into a generic InvalidFormat, making decode failures opaque - MAX_NAME_LENGTH and MAX_PARTITIONS_COUNT were defined 3 times across HTTP submodules Consolidates wire conversions into iggy_common, adds decode_response helper with tracing, and moves shared constants to lib.rs. --- core/common/src/http/consumer_groups/mod.rs | 2 +- core/common/src/http/partitions/mod.rs | 2 +- core/common/src/http/streams/mod.rs | 2 +- core/common/src/http/topics/mod.rs | 3 +- core/common/src/http/users/login_user.rs | 26 +++- core/common/src/lib.rs | 3 + core/common/src/traits/binary_impls/cluster.rs | 5 +- .../src/traits/binary_impls/consumer_groups.rs | 11 +- .../src/traits/binary_impls/consumer_offsets.rs | 5 +- core/common/src/traits/binary_impls/messages.rs | 2 +- core/common/src/traits/binary_impls/mod.rs | 11 ++ .../traits/binary_impls/personal_access_tokens.rs | 11 +- core/common/src/traits/binary_impls/streams.rs | 11 +- core/common/src/traits/binary_impls/system.rs | 14 +-- core/common/src/traits/binary_impls/topics.rs | 11 +- core/common/src/traits/binary_impls/users.rs | 14 +-- core/common/src/wire_conversions.rs | 75 ++++++++++- core/server/src/binary/dispatch.rs | 137 +-------------------- .../binary/handlers/users/create_user_handler.rs | 5 +- .../src/binary/handlers/users/get_user_handler.rs | 5 +- core/server/src/http/consumer_groups.rs | 12 +- core/server/src/http/partitions.rs | 10 +- core/server/src/http/segments.rs | 6 +- core/server/src/http/streams.rs | 8 +- core/server/src/http/topics.rs | 16 +-- core/server/src/http/users.rs | 14 +-- core/server/src/shard/execution.rs | 3 +- core/server/src/state/command.rs | 7 ++ core/server/src/state/models.rs | 1 - core/server/src/state/system.rs | 4 +- 30 files changed, 194 insertions(+), 242 deletions(-) diff --git a/core/common/src/http/consumer_groups/mod.rs b/core/common/src/http/consumer_groups/mod.rs index c51892da9..6805f5b61 100644 --- a/core/common/src/http/consumer_groups/mod.rs +++ b/core/common/src/http/consumer_groups/mod.rs @@ -19,4 +19,4 @@ pub mod create_consumer_group; pub mod delete_consumer_group; -const MAX_NAME_LENGTH: usize = 255; +use crate::MAX_NAME_LENGTH; diff --git a/core/common/src/http/partitions/mod.rs b/core/common/src/http/partitions/mod.rs index e56644529..adfe76df0 100644 --- a/core/common/src/http/partitions/mod.rs +++ b/core/common/src/http/partitions/mod.rs @@ -19,4 +19,4 @@ pub mod create_partitions; pub mod delete_partitions; -const MAX_PARTITIONS_COUNT: u32 = 1000; +use crate::MAX_PARTITIONS_COUNT; diff --git a/core/common/src/http/streams/mod.rs b/core/common/src/http/streams/mod.rs index 429b4ed2d..f0da8a2b4 100644 --- a/core/common/src/http/streams/mod.rs +++ b/core/common/src/http/streams/mod.rs @@ -21,4 +21,4 @@ pub mod delete_stream; pub mod purge_stream; pub mod update_stream; -const MAX_NAME_LENGTH: usize = 255; +use crate::MAX_NAME_LENGTH; diff --git a/core/common/src/http/topics/mod.rs b/core/common/src/http/topics/mod.rs index c1122e723..07a6e250b 100644 --- a/core/common/src/http/topics/mod.rs +++ b/core/common/src/http/topics/mod.rs @@ -21,5 +21,4 @@ pub mod delete_topic; pub mod purge_topic; pub mod update_topic; -const MAX_NAME_LENGTH: usize = 255; -const MAX_PARTITIONS_COUNT: u32 = 1000; +use crate::{MAX_NAME_LENGTH, MAX_PARTITIONS_COUNT}; diff --git a/core/common/src/http/users/login_user.rs b/core/common/src/http/users/login_user.rs index ae0757407..9130f9151 100644 --- a/core/common/src/http/users/login_user.rs +++ b/core/common/src/http/users/login_user.rs @@ -16,7 +16,10 @@ * under the License. */ -use secrecy::SecretString; +use super::defaults::*; +use crate::Validatable; +use crate::error::IggyError; +use secrecy::{ExposeSecret, SecretString}; use serde::{Deserialize, Serialize}; #[derive(Debug, Serialize, Deserialize)] @@ -29,3 +32,24 @@ pub struct LoginUser { #[serde(default)] pub context: Option<String>, } + +impl Validatable<IggyError> for LoginUser { + fn validate(&self) -> Result<(), IggyError> { + if self.username.is_empty() + || self.username.len() > MAX_USERNAME_LENGTH + || self.username.len() < MIN_USERNAME_LENGTH + { + return Err(IggyError::InvalidUsername); + } + + let password = self.password.expose_secret(); + if password.is_empty() + || password.len() > MAX_PASSWORD_LENGTH + || password.len() < MIN_PASSWORD_LENGTH + { + return Err(IggyError::InvalidPassword); + } + + Ok(()) + } +} diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index dca080e70..6ad7b346d 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -132,3 +132,6 @@ pub use utils::text; pub use utils::timestamp::*; pub use utils::topic_size::MaxTopicSize; pub use utils::versioning::SemanticVersion; + +pub const MAX_NAME_LENGTH: usize = 255; +pub const MAX_PARTITIONS_COUNT: u32 = 1000; diff --git a/core/common/src/traits/binary_impls/cluster.rs b/core/common/src/traits/binary_impls/cluster.rs index 779b6a427..c51f17906 100644 --- a/core/common/src/traits/binary_impls/cluster.rs +++ b/core/common/src/traits/binary_impls/cluster.rs @@ -19,7 +19,7 @@ use crate::traits::binary_auth::fail_if_not_authenticated; use crate::wire_conversions::cluster_metadata_from_wire; use crate::{BinaryClient, ClusterClient, ClusterMetadata, IggyError}; -use iggy_binary_protocol::codec::{WireDecode, WireEncode}; +use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::GET_CLUSTER_METADATA_CODE; use iggy_binary_protocol::requests::system::GetClusterMetadataRequest; use iggy_binary_protocol::responses::system::get_cluster_metadata::ClusterMetadataResponse; @@ -34,8 +34,7 @@ impl<B: BinaryClient> ClusterClient for B { GetClusterMetadataRequest.to_bytes(), ) .await?; - let wire_resp = ClusterMetadataResponse::decode_from(&response) - .map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<ClusterMetadataResponse>(&response)?; cluster_metadata_from_wire(wire_resp) } } diff --git a/core/common/src/traits/binary_impls/consumer_groups.rs b/core/common/src/traits/binary_impls/consumer_groups.rs index 23d78cac3..25e21ab75 100644 --- a/core/common/src/traits/binary_impls/consumer_groups.rs +++ b/core/common/src/traits/binary_impls/consumer_groups.rs @@ -22,7 +22,7 @@ use crate::{ BinaryClient, ConsumerGroup, ConsumerGroupClient, ConsumerGroupDetails, Identifier, IggyError, }; use iggy_binary_protocol::WireName; -use iggy_binary_protocol::codec::{WireDecode, WireEncode}; +use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ CREATE_CONSUMER_GROUP_CODE, DELETE_CONSUMER_GROUP_CODE, GET_CONSUMER_GROUP_CODE, GET_CONSUMER_GROUPS_CODE, JOIN_CONSUMER_GROUP_CODE, LEAVE_CONSUMER_GROUP_CODE, @@ -60,8 +60,7 @@ impl<B: BinaryClient> ConsumerGroupClient for B { if response.is_empty() { return Ok(None); } - let wire_resp = ConsumerGroupDetailsResponse::decode_from(&response) - .map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<ConsumerGroupDetailsResponse>(&response)?; Ok(Some(ConsumerGroupDetails::from(wire_resp))) } @@ -86,8 +85,7 @@ impl<B: BinaryClient> ConsumerGroupClient for B { if response.is_empty() { return Ok(Vec::new()); } - let wire_resp = GetConsumerGroupsResponse::decode_from(&response) - .map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<GetConsumerGroupsResponse>(&response)?; Ok(consumer_groups_from_wire(wire_resp)) } @@ -112,8 +110,7 @@ impl<B: BinaryClient> ConsumerGroupClient for B { .to_bytes(), ) .await?; - let wire_resp = ConsumerGroupDetailsResponse::decode_from(&response) - .map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<ConsumerGroupDetailsResponse>(&response)?; Ok(ConsumerGroupDetails::from(wire_resp)) } diff --git a/core/common/src/traits/binary_impls/consumer_offsets.rs b/core/common/src/traits/binary_impls/consumer_offsets.rs index 4b0d9720e..af665a513 100644 --- a/core/common/src/traits/binary_impls/consumer_offsets.rs +++ b/core/common/src/traits/binary_impls/consumer_offsets.rs @@ -21,7 +21,7 @@ use crate::wire_conversions::{consumer_to_wire, identifier_to_wire}; use crate::{ BinaryClient, Consumer, ConsumerOffsetClient, ConsumerOffsetInfo, Identifier, IggyError, }; -use iggy_binary_protocol::codec::{WireDecode, WireEncode}; +use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ DELETE_CONSUMER_OFFSET_CODE, GET_CONSUMER_OFFSET_CODE, STORE_CONSUMER_OFFSET_CODE, }; @@ -85,8 +85,7 @@ impl<B: BinaryClient> ConsumerOffsetClient for B { if response.is_empty() { return Ok(None); } - let wire_resp = - ConsumerOffsetResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<ConsumerOffsetResponse>(&response)?; Ok(Some(ConsumerOffsetInfo::from(wire_resp))) } diff --git a/core/common/src/traits/binary_impls/messages.rs b/core/common/src/traits/binary_impls/messages.rs index d77d07b61..783e5b693 100644 --- a/core/common/src/traits/binary_impls/messages.rs +++ b/core/common/src/traits/binary_impls/messages.rs @@ -71,7 +71,7 @@ impl<B: BinaryClient> MessageClient for B { fail_if_not_authenticated(self).await?; let wire_stream_id = identifier_to_wire(stream_id)?; let wire_topic_id = identifier_to_wire(topic_id)?; - let wire_partitioning = partitioning_to_wire(partitioning); + let wire_partitioning = partitioning_to_wire(partitioning)?; let raw_messages: Vec<RawMessage<'_>> = messages .iter() .map(|m| RawMessage { diff --git a/core/common/src/traits/binary_impls/mod.rs b/core/common/src/traits/binary_impls/mod.rs index 3d039130d..cbdd3edb9 100644 --- a/core/common/src/traits/binary_impls/mod.rs +++ b/core/common/src/traits/binary_impls/mod.rs @@ -26,3 +26,14 @@ mod streams; mod system; mod topics; mod users; + +use crate::IggyError; +use iggy_binary_protocol::WireDecode; + +/// Decode a wire response, logging the error details before converting to `IggyError`. +pub(crate) fn decode_response<T: WireDecode>(response: &[u8]) -> Result<T, IggyError> { + T::decode_from(response).map_err(|e| { + tracing::warn!("failed to decode {}: {e}", std::any::type_name::<T>()); + IggyError::InvalidFormat + }) +} diff --git a/core/common/src/traits/binary_impls/personal_access_tokens.rs b/core/common/src/traits/binary_impls/personal_access_tokens.rs index 202c60a72..f85f34d36 100644 --- a/core/common/src/traits/binary_impls/personal_access_tokens.rs +++ b/core/common/src/traits/binary_impls/personal_access_tokens.rs @@ -24,7 +24,7 @@ use crate::{ PersonalAccessTokenExpiry, PersonalAccessTokenInfo, RawPersonalAccessToken, }; use iggy_binary_protocol::WireName; -use iggy_binary_protocol::codec::{WireDecode, WireEncode}; +use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ CREATE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE, GET_PERSONAL_ACCESS_TOKENS_CODE, LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE, @@ -50,8 +50,7 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B { if response.is_empty() { return Ok(Vec::new()); } - let wire_resp = GetPersonalAccessTokensResponse::decode_from(&response) - .map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<GetPersonalAccessTokensResponse>(&response)?; Ok(personal_access_tokens_from_wire(wire_resp)) } @@ -72,8 +71,7 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B { .to_bytes(), ) .await?; - let wire_resp = RawPersonalAccessTokenResponse::decode_from(&response) - .map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<RawPersonalAccessTokenResponse>(&response)?; Ok(RawPersonalAccessToken::from(wire_resp)) } @@ -101,8 +99,7 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B { .await?; self.set_state(ClientState::Authenticated).await; self.publish_event(DiagnosticEvent::SignedIn).await; - let wire_resp = - IdentityResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<IdentityResponse>(&response)?; Ok(IdentityInfo::from(wire_resp)) } } diff --git a/core/common/src/traits/binary_impls/streams.rs b/core/common/src/traits/binary_impls/streams.rs index f170e97b1..b64c99261 100644 --- a/core/common/src/traits/binary_impls/streams.rs +++ b/core/common/src/traits/binary_impls/streams.rs @@ -20,7 +20,7 @@ use crate::traits::binary_auth::fail_if_not_authenticated; use crate::wire_conversions::{identifier_to_wire, streams_from_wire}; use crate::{BinaryClient, Identifier, IggyError, Stream, StreamClient, StreamDetails}; use iggy_binary_protocol::WireName; -use iggy_binary_protocol::codec::{WireDecode, WireEncode}; +use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ CREATE_STREAM_CODE, DELETE_STREAM_CODE, GET_STREAM_CODE, GET_STREAMS_CODE, PURGE_STREAM_CODE, UPDATE_STREAM_CODE, @@ -46,8 +46,7 @@ impl<B: BinaryClient> StreamClient for B { if response.is_empty() { return Ok(None); } - let wire_resp = - GetStreamResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<GetStreamResponse>(&response)?; Ok(Some(StreamDetails::from(wire_resp))) } @@ -59,8 +58,7 @@ impl<B: BinaryClient> StreamClient for B { if response.is_empty() { return Ok(Vec::new()); } - let wire_resp = - GetStreamsResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<GetStreamsResponse>(&response)?; Ok(streams_from_wire(wire_resp)) } @@ -73,8 +71,7 @@ impl<B: BinaryClient> StreamClient for B { CreateStreamRequest { name: wire_name }.to_bytes(), ) .await?; - let wire_resp = - GetStreamResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<GetStreamResponse>(&response)?; Ok(StreamDetails::from(wire_resp)) } diff --git a/core/common/src/traits/binary_impls/system.rs b/core/common/src/traits/binary_impls/system.rs index ebb88bf17..365f2b0b8 100644 --- a/core/common/src/traits/binary_impls/system.rs +++ b/core/common/src/traits/binary_impls/system.rs @@ -22,7 +22,7 @@ use crate::{ BinaryClient, ClientInfo, ClientInfoDetails, IggyDuration, IggyError, Snapshot, SnapshotCompression, Stats, SystemClient, SystemSnapshotType, }; -use iggy_binary_protocol::codec::{WireDecode, WireEncode}; +use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ GET_CLIENT_CODE, GET_CLIENTS_CODE, GET_ME_CODE, GET_SNAPSHOT_FILE_CODE, GET_STATS_CODE, PING_CODE, @@ -41,8 +41,7 @@ impl<B: BinaryClient> SystemClient for B { let response = self .send_raw_with_response(GET_STATS_CODE, GetStatsRequest.to_bytes()) .await?; - let wire_resp = - StatsResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<StatsResponse>(&response)?; Ok(Stats::from(wire_resp)) } @@ -51,8 +50,7 @@ impl<B: BinaryClient> SystemClient for B { let response = self .send_raw_with_response(GET_ME_CODE, GetMeRequest.to_bytes()) .await?; - let wire_resp = - ClientDetailsResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<ClientDetailsResponse>(&response)?; Ok(ClientInfoDetails::from(wire_resp)) } @@ -64,8 +62,7 @@ impl<B: BinaryClient> SystemClient for B { if response.is_empty() { return Ok(None); } - let wire_resp = - ClientDetailsResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<ClientDetailsResponse>(&response)?; Ok(Some(ClientInfoDetails::from(wire_resp))) } @@ -77,8 +74,7 @@ impl<B: BinaryClient> SystemClient for B { if response.is_empty() { return Ok(Vec::new()); } - let wire_resp = - GetClientsResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<GetClientsResponse>(&response)?; Ok(clients_from_wire(wire_resp)) } diff --git a/core/common/src/traits/binary_impls/topics.rs b/core/common/src/traits/binary_impls/topics.rs index 8e6547ddf..9304ed54c 100644 --- a/core/common/src/traits/binary_impls/topics.rs +++ b/core/common/src/traits/binary_impls/topics.rs @@ -23,7 +23,7 @@ use crate::{ TopicClient, TopicDetails, }; use iggy_binary_protocol::WireName; -use iggy_binary_protocol::codec::{WireDecode, WireEncode}; +use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ CREATE_TOPIC_CODE, DELETE_TOPIC_CODE, GET_TOPIC_CODE, GET_TOPICS_CODE, PURGE_TOPIC_CODE, UPDATE_TOPIC_CODE, @@ -58,8 +58,7 @@ impl<B: BinaryClient> TopicClient for B { if response.is_empty() { return Ok(None); } - let wire_resp = - GetTopicResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<GetTopicResponse>(&response)?; Ok(Some(TopicDetails::from(wire_resp))) } @@ -78,8 +77,7 @@ impl<B: BinaryClient> TopicClient for B { if response.is_empty() { return Ok(Vec::new()); } - let wire_resp = - GetTopicsResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<GetTopicsResponse>(&response)?; Ok(topics_from_wire(wire_resp)) } @@ -111,8 +109,7 @@ impl<B: BinaryClient> TopicClient for B { .to_bytes(), ) .await?; - let wire_resp = - GetTopicResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<GetTopicResponse>(&response)?; Ok(TopicDetails::from(wire_resp)) } diff --git a/core/common/src/traits/binary_impls/users.rs b/core/common/src/traits/binary_impls/users.rs index bf2f06fd6..24a877a4c 100644 --- a/core/common/src/traits/binary_impls/users.rs +++ b/core/common/src/traits/binary_impls/users.rs @@ -23,7 +23,7 @@ use crate::{ UserClient, UserInfo, UserInfoDetails, UserStatus, }; use iggy_binary_protocol::WireName; -use iggy_binary_protocol::codec::{WireDecode, WireEncode}; +use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ CHANGE_PASSWORD_CODE, CREATE_USER_CODE, DELETE_USER_CODE, GET_USER_CODE, GET_USERS_CODE, LOGIN_USER_CODE, LOGOUT_USER_CODE, UPDATE_PERMISSIONS_CODE, UPDATE_USER_CODE, @@ -49,8 +49,7 @@ impl<B: BinaryClient> UserClient for B { if response.is_empty() { return Ok(None); } - let wire_resp = - UserDetailsResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<UserDetailsResponse>(&response)?; UserInfoDetails::try_from(wire_resp).map(Some) } @@ -62,8 +61,7 @@ impl<B: BinaryClient> UserClient for B { if response.is_empty() { return Ok(Vec::new()); } - let wire_resp = - GetUsersResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<GetUsersResponse>(&response)?; users_from_wire(wire_resp) } @@ -89,8 +87,7 @@ impl<B: BinaryClient> UserClient for B { .to_bytes(), ) .await?; - let wire_resp = - UserDetailsResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<UserDetailsResponse>(&response)?; UserInfoDetails::try_from(wire_resp) } @@ -187,8 +184,7 @@ impl<B: BinaryClient> UserClient for B { .await?; self.set_state(ClientState::Authenticated).await; self.publish_event(DiagnosticEvent::SignedIn).await; - let wire_resp = - IdentityResponse::decode_from(&response).map_err(|_| IggyError::InvalidFormat)?; + let wire_resp = super::decode_response::<IdentityResponse>(&response)?; Ok(IdentityInfo::from(wire_resp)) } diff --git a/core/common/src/wire_conversions.rs b/core/common/src/wire_conversions.rs index c5a48916c..a1bb47556 100644 --- a/core/common/src/wire_conversions.rs +++ b/core/common/src/wire_conversions.rs @@ -64,6 +64,9 @@ use iggy_binary_protocol::responses::users::user_response::UserResponse; use iggy_binary_protocol::responses::users::{GetUsersResponse, UserDetailsResponse}; use std::collections::{BTreeMap, HashMap}; +/// Sentinel value in the wire protocol indicating no authenticated user. +const WIRE_NO_USER_ID: u32 = u32::MAX; + // --------------------------------------------------------------------------- // Streams // --------------------------------------------------------------------------- @@ -240,7 +243,7 @@ impl From<ConsumerGroupInfoResponse> for ConsumerGroupInfo { impl From<ClientResponse> for ClientInfo { fn from(w: ClientResponse) -> Self { let user_id = match w.user_id { - u32::MAX => None, + WIRE_NO_USER_ID => None, id => Some(id), }; let transport = match w.transport { @@ -535,16 +538,20 @@ pub fn polling_strategy_to_wire( /// Convert a domain `Partitioning` to `WirePartitioning`. pub fn partitioning_to_wire( partitioning: &crate::Partitioning, -) -> iggy_binary_protocol::primitives::partitioning::WirePartitioning { +) -> Result<iggy_binary_protocol::primitives::partitioning::WirePartitioning, IggyError> { use iggy_binary_protocol::primitives::partitioning::WirePartitioning; match partitioning.kind { - crate::PartitioningKind::Balanced => WirePartitioning::Balanced, + crate::PartitioningKind::Balanced => Ok(WirePartitioning::Balanced), crate::PartitioningKind::PartitionId => { - let id = u32::from_le_bytes(partitioning.value[..4].try_into().unwrap()); - WirePartitioning::PartitionId(id) + let bytes: [u8; 4] = partitioning + .value + .get(..4) + .and_then(|s| s.try_into().ok()) + .ok_or(IggyError::InvalidCommand)?; + Ok(WirePartitioning::PartitionId(u32::from_le_bytes(bytes))) } crate::PartitioningKind::MessagesKey => { - WirePartitioning::MessagesKey(partitioning.value.clone()) + Ok(WirePartitioning::MessagesKey(partitioning.value.clone())) } } } @@ -626,6 +633,62 @@ impl From<WirePermissions> for Permissions { } } +/// Convert `&WirePermissions` to domain `Permissions` without consuming the input. +pub fn wire_permissions_to_permissions(wp: &WirePermissions) -> Permissions { + let streams = if wp.streams.is_empty() { + None + } else { + let mut map = BTreeMap::new(); + for ws in &wp.streams { + let topics = if ws.topics.is_empty() { + None + } else { + let mut tmap = BTreeMap::new(); + for wt in &ws.topics { + tmap.insert( + wt.topic_id as usize, + TopicPermissions { + manage_topic: wt.manage_topic, + read_topic: wt.read_topic, + poll_messages: wt.poll_messages, + send_messages: wt.send_messages, + }, + ); + } + Some(tmap) + }; + map.insert( + ws.stream_id as usize, + StreamPermissions { + manage_stream: ws.manage_stream, + read_stream: ws.read_stream, + manage_topics: ws.manage_topics, + read_topics: ws.read_topics, + poll_messages: ws.poll_messages, + send_messages: ws.send_messages, + topics, + }, + ); + } + Some(map) + }; + Permissions { + global: GlobalPermissions { + manage_servers: wp.global.manage_servers, + read_servers: wp.global.read_servers, + manage_users: wp.global.manage_users, + read_users: wp.global.read_users, + manage_streams: wp.global.manage_streams, + read_streams: wp.global.read_streams, + manage_topics: wp.global.manage_topics, + read_topics: wp.global.read_topics, + poll_messages: wp.global.poll_messages, + send_messages: wp.global.send_messages, + }, + streams, + } +} + // --------------------------------------------------------------------------- // Permissions (domain -> wire) // --------------------------------------------------------------------------- diff --git a/core/server/src/binary/dispatch.rs b/core/server/src/binary/dispatch.rs index b5e485783..9138483e1 100644 --- a/core/server/src/binary/dispatch.rs +++ b/core/server/src/binary/dispatch.rs @@ -23,9 +23,6 @@ use bytes::BytesMut; use iggy_binary_protocol::RequestFrame; use iggy_binary_protocol::codec::WireDecode; use iggy_binary_protocol::codes::*; -use iggy_binary_protocol::primitives::permissions::{ - WireGlobalPermissions, WirePermissions, WireStreamPermissions, WireTopicPermissions, -}; use iggy_binary_protocol::requests::consumer_groups::*; use iggy_binary_protocol::requests::consumer_offsets::*; use iggy_binary_protocol::requests::messages::*; @@ -37,10 +34,8 @@ use iggy_binary_protocol::requests::system::*; use iggy_binary_protocol::requests::topics::*; use iggy_binary_protocol::requests::users::*; use iggy_common::{ - Consumer, ConsumerKind, GlobalPermissions, Identifier, IggyError, Permissions, PollingKind, - PollingStrategy, SenderKind, StreamPermissions, TopicPermissions, + Consumer, ConsumerKind, Identifier, IggyError, PollingKind, PollingStrategy, SenderKind, }; -use std::collections::BTreeMap; use std::rc::Rc; use tracing::{error, warn}; @@ -102,18 +97,6 @@ pub fn wire_id_to_identifier( } } -/// Convert a domain `Identifier` to `WireIdentifier`. -pub fn identifier_to_wire_id( - id: &Identifier, -) -> Result<iggy_binary_protocol::WireIdentifier, IggyError> { - if let Ok(value) = id.get_u32_value() { - Ok(iggy_binary_protocol::WireIdentifier::numeric(value)) - } else { - let name = id.get_string_value()?; - iggy_binary_protocol::WireIdentifier::named(name).map_err(|_| IggyError::InvalidIdentifier) - } -} - /// Convert a `WireConsumer` to the domain `Consumer`. pub fn wire_consumer_to_consumer( wire: &iggy_binary_protocol::WireConsumer, @@ -133,124 +116,6 @@ pub fn wire_polling_to_strategy( }) } -fn wire_topic_to_domain(wt: &WireTopicPermissions) -> TopicPermissions { - TopicPermissions { - manage_topic: wt.manage_topic, - read_topic: wt.read_topic, - poll_messages: wt.poll_messages, - send_messages: wt.send_messages, - } -} - -fn wire_stream_to_domain(ws: &WireStreamPermissions) -> StreamPermissions { - let topics = if ws.topics.is_empty() { - None - } else { - let mut map = BTreeMap::new(); - for wt in &ws.topics { - map.insert(wt.topic_id as usize, wire_topic_to_domain(wt)); - } - Some(map) - }; - StreamPermissions { - manage_stream: ws.manage_stream, - read_stream: ws.read_stream, - manage_topics: ws.manage_topics, - read_topics: ws.read_topics, - poll_messages: ws.poll_messages, - send_messages: ws.send_messages, - topics, - } -} - -/// Convert `WirePermissions` to the domain `Permissions`. -pub fn wire_permissions_to_permissions(wp: &WirePermissions) -> Permissions { - let streams = if wp.streams.is_empty() { - None - } else { - let mut map = BTreeMap::new(); - for ws in &wp.streams { - map.insert(ws.stream_id as usize, wire_stream_to_domain(ws)); - } - Some(map) - }; - Permissions { - global: GlobalPermissions { - manage_servers: wp.global.manage_servers, - read_servers: wp.global.read_servers, - manage_users: wp.global.manage_users, - read_users: wp.global.read_users, - manage_streams: wp.global.manage_streams, - read_streams: wp.global.read_streams, - manage_topics: wp.global.manage_topics, - read_topics: wp.global.read_topics, - poll_messages: wp.global.poll_messages, - send_messages: wp.global.send_messages, - }, - streams, - } -} - -fn domain_topic_to_wire(topic_id: usize, tp: &TopicPermissions) -> WireTopicPermissions { - WireTopicPermissions { - topic_id: topic_id as u32, - manage_topic: tp.manage_topic, - read_topic: tp.read_topic, - poll_messages: tp.poll_messages, - send_messages: tp.send_messages, - } -} - -fn domain_stream_to_wire(stream_id: usize, sp: &StreamPermissions) -> WireStreamPermissions { - let topics: Vec<WireTopicPermissions> = sp - .topics - .as_ref() - .map(|map| { - map.iter() - .map(|(&tid, tp)| domain_topic_to_wire(tid, tp)) - .collect() - }) - .unwrap_or_default(); - WireStreamPermissions { - stream_id: stream_id as u32, - manage_stream: sp.manage_stream, - read_stream: sp.read_stream, - manage_topics: sp.manage_topics, - read_topics: sp.read_topics, - poll_messages: sp.poll_messages, - send_messages: sp.send_messages, - topics, - } -} - -/// Convert domain `Permissions` to `WirePermissions`. -pub fn domain_permissions_to_wire(perms: &Permissions) -> WirePermissions { - let streams: Vec<WireStreamPermissions> = perms - .streams - .as_ref() - .map(|map| { - map.iter() - .map(|(&sid, sp)| domain_stream_to_wire(sid, sp)) - .collect() - }) - .unwrap_or_default(); - WirePermissions { - global: WireGlobalPermissions { - manage_servers: perms.global.manage_servers, - read_servers: perms.global.read_servers, - manage_users: perms.global.manage_users, - read_users: perms.global.read_users, - manage_streams: perms.global.manage_streams, - read_streams: perms.global.read_streams, - manage_topics: perms.global.manage_topics, - read_topics: perms.global.read_topics, - poll_messages: perms.global.poll_messages, - send_messages: perms.global.send_messages, - }, - streams, - } -} - /// Maximum payload size for control-plane commands (non-SendMessages). /// Prevents OOM from malicious clients sending `length = u32::MAX`. /// SendMessages has its own size validation via `total_payload_size` checks. diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs b/core/server/src/binary/handlers/users/create_user_handler.rs index 909e69410..04aaf8a2e 100644 --- a/core/server/src/binary/handlers/users/create_user_handler.rs +++ b/core/server/src/binary/handlers/users/create_user_handler.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::binary::dispatch::{HandlerResult, domain_permissions_to_wire}; +use crate::binary::dispatch::HandlerResult; use crate::shard::IggyShard; use crate::shard::transmission::frame::ShardResponse; use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload}; @@ -25,6 +25,7 @@ use iggy_binary_protocol::WireName; use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::requests::users::CreateUserRequest; use iggy_binary_protocol::responses::users::{UserDetailsResponse, UserResponse}; +use iggy_common::wire_conversions::permissions_to_wire; use iggy_common::{IggyError, SenderKind}; use std::rc::Rc; use tracing::{debug, instrument}; @@ -58,7 +59,7 @@ pub async fn handle_create_user( username: WireName::new(&user.username) .map_err(|_| IggyError::InvalidCommand)?, }, - permissions: user.permissions.as_ref().map(domain_permissions_to_wire), + permissions: user.permissions.as_ref().map(permissions_to_wire), }; sender.send_ok_response(&response.to_bytes()).await?; } diff --git a/core/server/src/binary/handlers/users/get_user_handler.rs b/core/server/src/binary/handlers/users/get_user_handler.rs index 66251e46c..f7415c8f5 100644 --- a/core/server/src/binary/handlers/users/get_user_handler.rs +++ b/core/server/src/binary/handlers/users/get_user_handler.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::binary::dispatch::{HandlerResult, domain_permissions_to_wire, wire_id_to_identifier}; +use crate::binary::dispatch::{HandlerResult, wire_id_to_identifier}; use crate::shard::IggyShard; use crate::streaming::session::Session; use iggy_binary_protocol::WireName; @@ -25,6 +25,7 @@ use iggy_binary_protocol::requests::users::GetUserRequest; use iggy_binary_protocol::responses::users::{UserDetailsResponse, UserResponse}; use iggy_common::IggyError; use iggy_common::SenderKind; +use iggy_common::wire_conversions::permissions_to_wire; use std::rc::Rc; use tracing::debug; @@ -55,7 +56,7 @@ pub async fn handle_get_user( username: WireName::new(user.username.as_ref()) .map_err(|_| IggyError::InvalidCommand)?, }, - permissions: user.permissions.as_deref().map(domain_permissions_to_wire), + permissions: user.permissions.as_deref().map(permissions_to_wire), }; sender.send_ok_response(&response.to_bytes()).await?; Ok(HandlerResult::Finished) diff --git a/core/server/src/http/consumer_groups.rs b/core/server/src/http/consumer_groups.rs index 35ae2a03f..fe98ff56b 100644 --- a/core/server/src/http/consumer_groups.rs +++ b/core/server/src/http/consumer_groups.rs @@ -16,7 +16,6 @@ * under the License. */ -use crate::binary::dispatch::identifier_to_wire_id; use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::mapper; @@ -36,6 +35,7 @@ use iggy_binary_protocol::requests::consumer_groups::{ use iggy_common::Identifier; use iggy_common::Validatable; use iggy_common::create_consumer_group::CreateConsumerGroup; +use iggy_common::wire_conversions::identifier_to_wire; use iggy_common::{ConsumerGroup, ConsumerGroupDetails, IggyError}; use std::sync::Arc; use tracing::instrument; @@ -123,8 +123,8 @@ async fn create_consumer_group( let topic = shard.resolve_topic(&command.stream_id, &command.topic_id)?; let wire_command = WireCreateConsumerGroup { - stream_id: identifier_to_wire_id(&command.stream_id)?, - topic_id: identifier_to_wire_id(&command.topic_id)?, + stream_id: identifier_to_wire(&command.stream_id)?, + topic_id: identifier_to_wire(&command.topic_id)?, name: WireName::new(&command.name).map_err(|_| IggyError::InvalidConsumerGroupName)?, }; let request = ShardRequest::control_plane(ShardRequestPayload::CreateConsumerGroupRequest { @@ -160,9 +160,9 @@ async fn delete_consumer_group( let group_id = Identifier::from_str_value(&group_id)?; let wire_command = WireDeleteConsumerGroup { - stream_id: identifier_to_wire_id(&stream_id)?, - topic_id: identifier_to_wire_id(&topic_id)?, - group_id: identifier_to_wire_id(&group_id)?, + stream_id: identifier_to_wire(&stream_id)?, + topic_id: identifier_to_wire(&topic_id)?, + group_id: identifier_to_wire(&group_id)?, }; let request = ShardRequest::control_plane(ShardRequestPayload::DeleteConsumerGroupRequest { user_id: identity.user_id, diff --git a/core/server/src/http/partitions.rs b/core/server/src/http/partitions.rs index ff696099c..b7883c55e 100644 --- a/core/server/src/http/partitions.rs +++ b/core/server/src/http/partitions.rs @@ -16,7 +16,6 @@ * under the License. */ -use crate::binary::dispatch::identifier_to_wire_id; use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::shared::AppState; @@ -34,6 +33,7 @@ use iggy_common::Identifier; use iggy_common::Validatable; use iggy_common::create_partitions::CreatePartitions; use iggy_common::delete_partitions::DeletePartitions; +use iggy_common::wire_conversions::identifier_to_wire; use std::sync::Arc; use tracing::instrument; @@ -59,8 +59,8 @@ async fn create_partitions( command.validate()?; let wire_command = WireCreatePartitions { - stream_id: identifier_to_wire_id(&command.stream_id)?, - topic_id: identifier_to_wire_id(&command.topic_id)?, + stream_id: identifier_to_wire(&command.stream_id)?, + topic_id: identifier_to_wire(&command.topic_id)?, partitions_count: command.partitions_count, }; let request = ShardRequest::control_plane(ShardRequestPayload::CreatePartitionsRequest { @@ -88,8 +88,8 @@ async fn delete_partitions( query.validate()?; let wire_command = WireDeletePartitions { - stream_id: identifier_to_wire_id(&query.stream_id)?, - topic_id: identifier_to_wire_id(&query.topic_id)?, + stream_id: identifier_to_wire(&query.stream_id)?, + topic_id: identifier_to_wire(&query.topic_id)?, partitions_count: query.partitions_count, }; let request = ShardRequest::control_plane(ShardRequestPayload::DeletePartitionsRequest { diff --git a/core/server/src/http/segments.rs b/core/server/src/http/segments.rs index b2b8f9b52..c06f119eb 100644 --- a/core/server/src/http/segments.rs +++ b/core/server/src/http/segments.rs @@ -16,7 +16,6 @@ * under the License. */ -use crate::binary::dispatch::identifier_to_wire_id; use crate::http::COMPONENT; use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; @@ -33,6 +32,7 @@ use iggy_common::Identifier; use iggy_common::Validatable; use iggy_common::delete_segments::DeleteSegments; use iggy_common::sharding::IggyNamespace; +use iggy_common::wire_conversions::identifier_to_wire; use send_wrapper::SendWrapper; use std::sync::Arc; use tracing::instrument; @@ -99,8 +99,8 @@ async fn delete_segments( } let wire_command = DeleteSegmentsRequest { - stream_id: identifier_to_wire_id(&query.stream_id)?, - topic_id: identifier_to_wire_id(&query.topic_id)?, + stream_id: identifier_to_wire(&query.stream_id)?, + topic_id: identifier_to_wire(&query.topic_id)?, partition_id: query.partition_id, segments_count, }; diff --git a/core/server/src/http/streams.rs b/core/server/src/http/streams.rs index 3ca89f346..101e7270b 100644 --- a/core/server/src/http/streams.rs +++ b/core/server/src/http/streams.rs @@ -16,7 +16,6 @@ * under the License. */ -use crate::binary::dispatch::identifier_to_wire_id; use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; use crate::http::shared::AppState; @@ -35,6 +34,7 @@ use iggy_common::Identifier; use iggy_common::Validatable; use iggy_common::create_stream::CreateStream; use iggy_common::update_stream::UpdateStream; +use iggy_common::wire_conversions::identifier_to_wire; use iggy_common::{IggyError, Stream, StreamDetails}; use std::sync::Arc; use tracing::instrument; @@ -139,7 +139,7 @@ async fn update_stream( command.validate()?; let wire_command = WireUpdateStream { - stream_id: identifier_to_wire_id(&command.stream_id)?, + stream_id: identifier_to_wire(&command.stream_id)?, name: WireName::new(&command.name).map_err(|_| IggyError::InvalidStreamName)?, }; let request = ShardRequest::control_plane(ShardRequestPayload::UpdateStreamRequest { @@ -166,7 +166,7 @@ async fn delete_stream( let request = ShardRequest::control_plane(ShardRequestPayload::DeleteStreamRequest { user_id: identity.user_id, command: WireDeleteStream { - stream_id: identifier_to_wire_id(&stream_id)?, + stream_id: identifier_to_wire(&stream_id)?, }, }); @@ -189,7 +189,7 @@ async fn purge_stream( let request = ShardRequest::control_plane(ShardRequestPayload::PurgeStreamRequest { user_id: identity.user_id, command: WirePurgeStream { - stream_id: identifier_to_wire_id(&stream_id)?, + stream_id: identifier_to_wire(&stream_id)?, }, }); diff --git a/core/server/src/http/topics.rs b/core/server/src/http/topics.rs index b72fc4ad3..40961259f 100644 --- a/core/server/src/http/topics.rs +++ b/core/server/src/http/topics.rs @@ -16,7 +16,6 @@ * under the License. */ -use crate::binary::dispatch::identifier_to_wire_id; use crate::http::COMPONENT; use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; @@ -37,6 +36,7 @@ use iggy_common::Identifier; use iggy_common::Validatable; use iggy_common::create_topic::CreateTopic; use iggy_common::update_topic::UpdateTopic; +use iggy_common::wire_conversions::identifier_to_wire; use iggy_common::{IggyError, Topic, TopicDetails}; use std::sync::Arc; use tracing::instrument; @@ -151,7 +151,7 @@ async fn create_topic( .ok_or(CustomError::ResourceNotFound)?; let wire_command = WireCreateTopic { - stream_id: identifier_to_wire_id(&command.stream_id)?, + stream_id: identifier_to_wire(&command.stream_id)?, partitions_count: command.partitions_count, compression_algorithm: command.compression_algorithm.as_code(), message_expiry: command.message_expiry.into(), @@ -193,8 +193,8 @@ async fn update_topic( command.validate()?; let wire_command = WireUpdateTopic { - stream_id: identifier_to_wire_id(&command.stream_id)?, - topic_id: identifier_to_wire_id(&command.topic_id)?, + stream_id: identifier_to_wire(&command.stream_id)?, + topic_id: identifier_to_wire(&command.topic_id)?, compression_algorithm: command.compression_algorithm.as_code(), message_expiry: command.message_expiry.into(), max_topic_size: command.max_topic_size.into(), @@ -226,8 +226,8 @@ async fn delete_topic( let request = ShardRequest::control_plane(ShardRequestPayload::DeleteTopicRequest { user_id: identity.user_id, command: WireDeleteTopic { - stream_id: identifier_to_wire_id(&stream_id)?, - topic_id: identifier_to_wire_id(&topic_id)?, + stream_id: identifier_to_wire(&stream_id)?, + topic_id: identifier_to_wire(&topic_id)?, }, }); @@ -251,8 +251,8 @@ async fn purge_topic( let request = ShardRequest::control_plane(ShardRequestPayload::PurgeTopicRequest { user_id: identity.user_id, command: WirePurgeTopic { - stream_id: identifier_to_wire_id(&stream_id)?, - topic_id: identifier_to_wire_id(&topic_id)?, + stream_id: identifier_to_wire(&stream_id)?, + topic_id: identifier_to_wire(&topic_id)?, }, }); diff --git a/core/server/src/http/users.rs b/core/server/src/http/users.rs index 4dcc7f73d..71927a1e3 100644 --- a/core/server/src/http/users.rs +++ b/core/server/src/http/users.rs @@ -16,7 +16,6 @@ * under the License. */ -use crate::binary::dispatch::{domain_permissions_to_wire, identifier_to_wire_id}; use crate::http::COMPONENT; use crate::http::error::CustomError; use crate::http::jwt::json_web_token::Identity; @@ -46,6 +45,7 @@ use iggy_common::Identifier; use iggy_common::IdentityInfo; use iggy_common::Validatable; use iggy_common::login_user::LoginUser; +use iggy_common::wire_conversions::{identifier_to_wire, permissions_to_wire}; use iggy_common::{IggyError, UserInfo, UserInfoDetails}; use secrecy::ExposeSecret; use send_wrapper::SendWrapper; @@ -124,7 +124,7 @@ async fn create_user( username: WireName::new(&command.username).map_err(|_| IggyError::InvalidUsername)?, password: command.password.expose_secret().to_string(), status: command.status.as_code(), - permissions: command.permissions.as_ref().map(domain_permissions_to_wire), + permissions: command.permissions.as_ref().map(permissions_to_wire), }; let request = ShardRequest::control_plane(ShardRequestPayload::CreateUserRequest { user_id: identity.user_id, @@ -153,7 +153,7 @@ async fn update_user( command.validate()?; let wire_command = WireUpdateUser { - user_id: identifier_to_wire_id(&command.user_id)?, + user_id: identifier_to_wire(&command.user_id)?, username: command .username .as_deref() @@ -186,8 +186,8 @@ async fn update_permissions( command.validate()?; let wire_command = WireUpdatePermissions { - user_id: identifier_to_wire_id(&command.user_id)?, - permissions: command.permissions.as_ref().map(domain_permissions_to_wire), + user_id: identifier_to_wire(&command.user_id)?, + permissions: command.permissions.as_ref().map(permissions_to_wire), }; let request = ShardRequest::control_plane(ShardRequestPayload::UpdatePermissionsRequest { user_id: identity.user_id, @@ -213,7 +213,7 @@ async fn change_password( command.validate()?; let wire_command = WireChangePassword { - user_id: identifier_to_wire_id(&command.user_id)?, + user_id: identifier_to_wire(&command.user_id)?, current_password: command.current_password.expose_secret().to_string(), new_password: command.new_password.expose_secret().to_string(), }; @@ -239,7 +239,7 @@ async fn delete_user( let user_id = Identifier::from_str_value(&user_id)?; let wire_command = WireDeleteUser { - user_id: identifier_to_wire_id(&user_id)?, + user_id: identifier_to_wire(&user_id)?, }; let request = ShardRequest::control_plane(ShardRequestPayload::DeleteUserRequest { user_id: identity.user_id, diff --git a/core/server/src/shard/execution.rs b/core/server/src/shard/execution.rs index 26c73cfc2..14664aa9b 100644 --- a/core/server/src/shard/execution.rs +++ b/core/server/src/shard/execution.rs @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::binary::dispatch::{wire_id_to_identifier, wire_permissions_to_permissions}; +use crate::binary::dispatch::wire_id_to_identifier; use crate::streaming::users::user::User; use crate::streaming::utils::crypto; use crate::{ @@ -39,6 +39,7 @@ use crate::{ use iggy_binary_protocol::requests::{ consumer_groups::*, partitions::*, personal_access_tokens::*, streams::*, topics::*, users::*, }; +use iggy_common::wire_conversions::wire_permissions_to_permissions; use iggy_common::{ CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize, PersonalAccessToken, UserStatus, diff --git a/core/server/src/state/command.rs b/core/server/src/state/command.rs index 4d544e6e8..d2a2f890c 100644 --- a/core/server/src/state/command.rs +++ b/core/server/src/state/command.rs @@ -163,6 +163,13 @@ impl WireDecode for EntryCommand { } let code = u32::from_le_bytes(buf[0..4].try_into().unwrap()); let length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize; + if buf.len() < 8 + length { + return Err(WireError::UnexpectedEof { + offset: 8, + need: length, + have: buf.len() - 8, + }); + } let payload = &buf[8..8 + length]; let consumed = 8 + length; let cmd = match code { diff --git a/core/server/src/state/models.rs b/core/server/src/state/models.rs index 7d885b29d..86af7917c 100644 --- a/core/server/src/state/models.rs +++ b/core/server/src/state/models.rs @@ -106,7 +106,6 @@ impl Display for CreatePersonalAccessTokenWithHash { } } -// Wire format for WithId wrappers: id:u32_le | inner_length:u32_le | inner_bytes // Wire format for WithId wrappers: id:u32_le | inner_length:u32_le | inner_bytes impl WireEncode for CreateStreamWithId { diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs index 784f6df7a..678e5e2dc 100644 --- a/core/server/src/state/system.rs +++ b/core/server/src/state/system.rs @@ -16,7 +16,6 @@ * under the License. */ -use crate::binary::dispatch::{domain_permissions_to_wire, wire_permissions_to_permissions}; use crate::bootstrap::create_root_user; use crate::state::file::FileState; use crate::state::models::CreateUserWithId; @@ -32,6 +31,7 @@ use iggy_common::IggyTimestamp; use iggy_common::MaxTopicSize; use iggy_common::PersonalAccessToken; use iggy_common::defaults::DEFAULT_ROOT_USER_ID; +use iggy_common::wire_conversions::{permissions_to_wire, wire_permissions_to_permissions}; use iggy_common::{Permissions, UserStatus}; use std::collections::BTreeMap; use std::fmt::Display; @@ -153,7 +153,7 @@ impl SystemState { .expect("root username must be valid"), password: root.password.clone(), status: root.status.as_code(), - permissions: root.permissions.as_ref().map(domain_permissions_to_wire), + permissions: root.permissions.as_ref().map(permissions_to_wire), }; state .apply(0, &EntryCommand::CreateUser(CreateUserWithId {
