This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch refactor-binary-7-http
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 71a23e170c94069368dce15f4ea6e71d7a3b9ca9
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Mar 27 13:50:55 2026 +0100

    fix(server,sdk): harden wire type migration from review findings
    
    The wire type refactor (prior commits) left several issues
    identified during expert review:
    
    - EntryCommand::decode panicked on corrupted WAL entries
      with inflated length fields instead of returning an error
    - partitioning_to_wire panicked via unwrap on malformed
      Partitioning structs reachable through public fields
    - Permission/identifier conversion code was duplicated
      between dispatch.rs and wire_conversions.rs (maintenance
      hazard for wire format divergence)
    - SDK binary impls erased all WireError context into a
      generic InvalidFormat, making decode failures opaque
    - MAX_NAME_LENGTH and MAX_PARTITIONS_COUNT were defined
      3 times across HTTP submodules
    
    Consolidates wire conversions into iggy_common, adds
    decode_response helper with tracing, and moves shared
    constants to lib.rs.
---
 core/common/src/http/consumer_groups/mod.rs        |   2 +-
 core/common/src/http/partitions/mod.rs             |   2 +-
 core/common/src/http/streams/mod.rs                |   2 +-
 core/common/src/http/topics/mod.rs                 |   3 +-
 core/common/src/http/users/login_user.rs           |  26 +++-
 core/common/src/lib.rs                             |   3 +
 core/common/src/traits/binary_impls/cluster.rs     |   5 +-
 .../src/traits/binary_impls/consumer_groups.rs     |  11 +-
 .../src/traits/binary_impls/consumer_offsets.rs    |   5 +-
 core/common/src/traits/binary_impls/messages.rs    |   2 +-
 core/common/src/traits/binary_impls/mod.rs         |  11 ++
 .../traits/binary_impls/personal_access_tokens.rs  |  11 +-
 core/common/src/traits/binary_impls/streams.rs     |  11 +-
 core/common/src/traits/binary_impls/system.rs      |  14 +--
 core/common/src/traits/binary_impls/topics.rs      |  11 +-
 core/common/src/traits/binary_impls/users.rs       |  14 +--
 core/common/src/wire_conversions.rs                |  75 ++++++++++-
 core/server/src/binary/dispatch.rs                 | 137 +--------------------
 .../binary/handlers/users/create_user_handler.rs   |   5 +-
 .../src/binary/handlers/users/get_user_handler.rs  |   5 +-
 core/server/src/http/consumer_groups.rs            |  12 +-
 core/server/src/http/partitions.rs                 |  10 +-
 core/server/src/http/segments.rs                   |   6 +-
 core/server/src/http/streams.rs                    |   8 +-
 core/server/src/http/topics.rs                     |  16 +--
 core/server/src/http/users.rs                      |  14 +--
 core/server/src/shard/execution.rs                 |   3 +-
 core/server/src/state/command.rs                   |   7 ++
 core/server/src/state/models.rs                    |   1 -
 core/server/src/state/system.rs                    |   4 +-
 30 files changed, 194 insertions(+), 242 deletions(-)

diff --git a/core/common/src/http/consumer_groups/mod.rs 
b/core/common/src/http/consumer_groups/mod.rs
index c51892da9..6805f5b61 100644
--- a/core/common/src/http/consumer_groups/mod.rs
+++ b/core/common/src/http/consumer_groups/mod.rs
@@ -19,4 +19,4 @@
 pub mod create_consumer_group;
 pub mod delete_consumer_group;
 
-const MAX_NAME_LENGTH: usize = 255;
+use crate::MAX_NAME_LENGTH;
diff --git a/core/common/src/http/partitions/mod.rs 
b/core/common/src/http/partitions/mod.rs
index e56644529..adfe76df0 100644
--- a/core/common/src/http/partitions/mod.rs
+++ b/core/common/src/http/partitions/mod.rs
@@ -19,4 +19,4 @@
 pub mod create_partitions;
 pub mod delete_partitions;
 
-const MAX_PARTITIONS_COUNT: u32 = 1000;
+use crate::MAX_PARTITIONS_COUNT;
diff --git a/core/common/src/http/streams/mod.rs 
b/core/common/src/http/streams/mod.rs
index 429b4ed2d..f0da8a2b4 100644
--- a/core/common/src/http/streams/mod.rs
+++ b/core/common/src/http/streams/mod.rs
@@ -21,4 +21,4 @@ pub mod delete_stream;
 pub mod purge_stream;
 pub mod update_stream;
 
-const MAX_NAME_LENGTH: usize = 255;
+use crate::MAX_NAME_LENGTH;
diff --git a/core/common/src/http/topics/mod.rs 
b/core/common/src/http/topics/mod.rs
index c1122e723..07a6e250b 100644
--- a/core/common/src/http/topics/mod.rs
+++ b/core/common/src/http/topics/mod.rs
@@ -21,5 +21,4 @@ pub mod delete_topic;
 pub mod purge_topic;
 pub mod update_topic;
 
-const MAX_NAME_LENGTH: usize = 255;
-const MAX_PARTITIONS_COUNT: u32 = 1000;
+use crate::{MAX_NAME_LENGTH, MAX_PARTITIONS_COUNT};
diff --git a/core/common/src/http/users/login_user.rs 
b/core/common/src/http/users/login_user.rs
index ae0757407..9130f9151 100644
--- a/core/common/src/http/users/login_user.rs
+++ b/core/common/src/http/users/login_user.rs
@@ -16,7 +16,10 @@
  * under the License.
  */
 
-use secrecy::SecretString;
+use super::defaults::*;
+use crate::Validatable;
+use crate::error::IggyError;
+use secrecy::{ExposeSecret, SecretString};
 use serde::{Deserialize, Serialize};
 
 #[derive(Debug, Serialize, Deserialize)]
@@ -29,3 +32,24 @@ pub struct LoginUser {
     #[serde(default)]
     pub context: Option<String>,
 }
+
+impl Validatable<IggyError> for LoginUser {
+    fn validate(&self) -> Result<(), IggyError> {
+        if self.username.is_empty()
+            || self.username.len() > MAX_USERNAME_LENGTH
+            || self.username.len() < MIN_USERNAME_LENGTH
+        {
+            return Err(IggyError::InvalidUsername);
+        }
+
+        let password = self.password.expose_secret();
+        if password.is_empty()
+            || password.len() > MAX_PASSWORD_LENGTH
+            || password.len() < MIN_PASSWORD_LENGTH
+        {
+            return Err(IggyError::InvalidPassword);
+        }
+
+        Ok(())
+    }
+}
diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs
index 67ff2dd2d..3e9b20bdf 100644
--- a/core/common/src/lib.rs
+++ b/core/common/src/lib.rs
@@ -134,3 +134,6 @@ pub use utils::text;
 pub use utils::timestamp::*;
 pub use utils::topic_size::MaxTopicSize;
 pub use utils::versioning::SemanticVersion;
+
+pub const MAX_NAME_LENGTH: usize = 255;
+pub const MAX_PARTITIONS_COUNT: u32 = 1000;
diff --git a/core/common/src/traits/binary_impls/cluster.rs 
b/core/common/src/traits/binary_impls/cluster.rs
index 779b6a427..c51f17906 100644
--- a/core/common/src/traits/binary_impls/cluster.rs
+++ b/core/common/src/traits/binary_impls/cluster.rs
@@ -19,7 +19,7 @@
 use crate::traits::binary_auth::fail_if_not_authenticated;
 use crate::wire_conversions::cluster_metadata_from_wire;
 use crate::{BinaryClient, ClusterClient, ClusterMetadata, IggyError};
-use iggy_binary_protocol::codec::{WireDecode, WireEncode};
+use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::GET_CLUSTER_METADATA_CODE;
 use iggy_binary_protocol::requests::system::GetClusterMetadataRequest;
 use 
iggy_binary_protocol::responses::system::get_cluster_metadata::ClusterMetadataResponse;
@@ -34,8 +34,7 @@ impl<B: BinaryClient> ClusterClient for B {
                 GetClusterMetadataRequest.to_bytes(),
             )
             .await?;
-        let wire_resp = ClusterMetadataResponse::decode_from(&response)
-            .map_err(|_| IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<ClusterMetadataResponse>(&response)?;
         cluster_metadata_from_wire(wire_resp)
     }
 }
diff --git a/core/common/src/traits/binary_impls/consumer_groups.rs 
b/core/common/src/traits/binary_impls/consumer_groups.rs
index 23d78cac3..25e21ab75 100644
--- a/core/common/src/traits/binary_impls/consumer_groups.rs
+++ b/core/common/src/traits/binary_impls/consumer_groups.rs
@@ -22,7 +22,7 @@ use crate::{
     BinaryClient, ConsumerGroup, ConsumerGroupClient, ConsumerGroupDetails, 
Identifier, IggyError,
 };
 use iggy_binary_protocol::WireName;
-use iggy_binary_protocol::codec::{WireDecode, WireEncode};
+use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::{
     CREATE_CONSUMER_GROUP_CODE, DELETE_CONSUMER_GROUP_CODE, 
GET_CONSUMER_GROUP_CODE,
     GET_CONSUMER_GROUPS_CODE, JOIN_CONSUMER_GROUP_CODE, 
LEAVE_CONSUMER_GROUP_CODE,
@@ -60,8 +60,7 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
         if response.is_empty() {
             return Ok(None);
         }
-        let wire_resp = ConsumerGroupDetailsResponse::decode_from(&response)
-            .map_err(|_| IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<ConsumerGroupDetailsResponse>(&response)?;
         Ok(Some(ConsumerGroupDetails::from(wire_resp)))
     }
 
@@ -86,8 +85,7 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
         if response.is_empty() {
             return Ok(Vec::new());
         }
-        let wire_resp = GetConsumerGroupsResponse::decode_from(&response)
-            .map_err(|_| IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<GetConsumerGroupsResponse>(&response)?;
         Ok(consumer_groups_from_wire(wire_resp))
     }
 
@@ -112,8 +110,7 @@ impl<B: BinaryClient> ConsumerGroupClient for B {
                 .to_bytes(),
             )
             .await?;
-        let wire_resp = ConsumerGroupDetailsResponse::decode_from(&response)
-            .map_err(|_| IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<ConsumerGroupDetailsResponse>(&response)?;
         Ok(ConsumerGroupDetails::from(wire_resp))
     }
 
diff --git a/core/common/src/traits/binary_impls/consumer_offsets.rs 
b/core/common/src/traits/binary_impls/consumer_offsets.rs
index 4b0d9720e..af665a513 100644
--- a/core/common/src/traits/binary_impls/consumer_offsets.rs
+++ b/core/common/src/traits/binary_impls/consumer_offsets.rs
@@ -21,7 +21,7 @@ use crate::wire_conversions::{consumer_to_wire, 
identifier_to_wire};
 use crate::{
     BinaryClient, Consumer, ConsumerOffsetClient, ConsumerOffsetInfo, 
Identifier, IggyError,
 };
-use iggy_binary_protocol::codec::{WireDecode, WireEncode};
+use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::{
     DELETE_CONSUMER_OFFSET_CODE, GET_CONSUMER_OFFSET_CODE, 
STORE_CONSUMER_OFFSET_CODE,
 };
@@ -85,8 +85,7 @@ impl<B: BinaryClient> ConsumerOffsetClient for B {
         if response.is_empty() {
             return Ok(None);
         }
-        let wire_resp =
-            ConsumerOffsetResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<ConsumerOffsetResponse>(&response)?;
         Ok(Some(ConsumerOffsetInfo::from(wire_resp)))
     }
 
diff --git a/core/common/src/traits/binary_impls/messages.rs 
b/core/common/src/traits/binary_impls/messages.rs
index d77d07b61..783e5b693 100644
--- a/core/common/src/traits/binary_impls/messages.rs
+++ b/core/common/src/traits/binary_impls/messages.rs
@@ -71,7 +71,7 @@ impl<B: BinaryClient> MessageClient for B {
         fail_if_not_authenticated(self).await?;
         let wire_stream_id = identifier_to_wire(stream_id)?;
         let wire_topic_id = identifier_to_wire(topic_id)?;
-        let wire_partitioning = partitioning_to_wire(partitioning);
+        let wire_partitioning = partitioning_to_wire(partitioning)?;
         let raw_messages: Vec<RawMessage<'_>> = messages
             .iter()
             .map(|m| RawMessage {
diff --git a/core/common/src/traits/binary_impls/mod.rs 
b/core/common/src/traits/binary_impls/mod.rs
index 3d039130d..cbdd3edb9 100644
--- a/core/common/src/traits/binary_impls/mod.rs
+++ b/core/common/src/traits/binary_impls/mod.rs
@@ -26,3 +26,14 @@ mod streams;
 mod system;
 mod topics;
 mod users;
+
+use crate::IggyError;
+use iggy_binary_protocol::WireDecode;
+
+/// Decode a wire response, logging the error details before converting to 
`IggyError`.
+pub(crate) fn decode_response<T: WireDecode>(response: &[u8]) -> Result<T, 
IggyError> {
+    T::decode_from(response).map_err(|e| {
+        tracing::warn!("failed to decode {}: {e}", std::any::type_name::<T>());
+        IggyError::InvalidFormat
+    })
+}
diff --git a/core/common/src/traits/binary_impls/personal_access_tokens.rs 
b/core/common/src/traits/binary_impls/personal_access_tokens.rs
index 202c60a72..f85f34d36 100644
--- a/core/common/src/traits/binary_impls/personal_access_tokens.rs
+++ b/core/common/src/traits/binary_impls/personal_access_tokens.rs
@@ -24,7 +24,7 @@ use crate::{
     PersonalAccessTokenExpiry, PersonalAccessTokenInfo, RawPersonalAccessToken,
 };
 use iggy_binary_protocol::WireName;
-use iggy_binary_protocol::codec::{WireDecode, WireEncode};
+use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::{
     CREATE_PERSONAL_ACCESS_TOKEN_CODE, DELETE_PERSONAL_ACCESS_TOKEN_CODE,
     GET_PERSONAL_ACCESS_TOKENS_CODE, LOGIN_WITH_PERSONAL_ACCESS_TOKEN_CODE,
@@ -50,8 +50,7 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B {
         if response.is_empty() {
             return Ok(Vec::new());
         }
-        let wire_resp = GetPersonalAccessTokensResponse::decode_from(&response)
-            .map_err(|_| IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<GetPersonalAccessTokensResponse>(&response)?;
         Ok(personal_access_tokens_from_wire(wire_resp))
     }
 
@@ -72,8 +71,7 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B {
                 .to_bytes(),
             )
             .await?;
-        let wire_resp = RawPersonalAccessTokenResponse::decode_from(&response)
-            .map_err(|_| IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<RawPersonalAccessTokenResponse>(&response)?;
         Ok(RawPersonalAccessToken::from(wire_resp))
     }
 
@@ -101,8 +99,7 @@ impl<B: BinaryClient> PersonalAccessTokenClient for B {
             .await?;
         self.set_state(ClientState::Authenticated).await;
         self.publish_event(DiagnosticEvent::SignedIn).await;
-        let wire_resp =
-            IdentityResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = super::decode_response::<IdentityResponse>(&response)?;
         Ok(IdentityInfo::from(wire_resp))
     }
 }
diff --git a/core/common/src/traits/binary_impls/streams.rs 
b/core/common/src/traits/binary_impls/streams.rs
index f170e97b1..b64c99261 100644
--- a/core/common/src/traits/binary_impls/streams.rs
+++ b/core/common/src/traits/binary_impls/streams.rs
@@ -20,7 +20,7 @@ use crate::traits::binary_auth::fail_if_not_authenticated;
 use crate::wire_conversions::{identifier_to_wire, streams_from_wire};
 use crate::{BinaryClient, Identifier, IggyError, Stream, StreamClient, 
StreamDetails};
 use iggy_binary_protocol::WireName;
-use iggy_binary_protocol::codec::{WireDecode, WireEncode};
+use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::{
     CREATE_STREAM_CODE, DELETE_STREAM_CODE, GET_STREAM_CODE, GET_STREAMS_CODE, 
PURGE_STREAM_CODE,
     UPDATE_STREAM_CODE,
@@ -46,8 +46,7 @@ impl<B: BinaryClient> StreamClient for B {
         if response.is_empty() {
             return Ok(None);
         }
-        let wire_resp =
-            GetStreamResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<GetStreamResponse>(&response)?;
         Ok(Some(StreamDetails::from(wire_resp)))
     }
 
@@ -59,8 +58,7 @@ impl<B: BinaryClient> StreamClient for B {
         if response.is_empty() {
             return Ok(Vec::new());
         }
-        let wire_resp =
-            GetStreamsResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<GetStreamsResponse>(&response)?;
         Ok(streams_from_wire(wire_resp))
     }
 
@@ -73,8 +71,7 @@ impl<B: BinaryClient> StreamClient for B {
                 CreateStreamRequest { name: wire_name }.to_bytes(),
             )
             .await?;
-        let wire_resp =
-            GetStreamResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<GetStreamResponse>(&response)?;
         Ok(StreamDetails::from(wire_resp))
     }
 
diff --git a/core/common/src/traits/binary_impls/system.rs 
b/core/common/src/traits/binary_impls/system.rs
index ebb88bf17..365f2b0b8 100644
--- a/core/common/src/traits/binary_impls/system.rs
+++ b/core/common/src/traits/binary_impls/system.rs
@@ -22,7 +22,7 @@ use crate::{
     BinaryClient, ClientInfo, ClientInfoDetails, IggyDuration, IggyError, 
Snapshot,
     SnapshotCompression, Stats, SystemClient, SystemSnapshotType,
 };
-use iggy_binary_protocol::codec::{WireDecode, WireEncode};
+use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::{
     GET_CLIENT_CODE, GET_CLIENTS_CODE, GET_ME_CODE, GET_SNAPSHOT_FILE_CODE, 
GET_STATS_CODE,
     PING_CODE,
@@ -41,8 +41,7 @@ impl<B: BinaryClient> SystemClient for B {
         let response = self
             .send_raw_with_response(GET_STATS_CODE, GetStatsRequest.to_bytes())
             .await?;
-        let wire_resp =
-            StatsResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = super::decode_response::<StatsResponse>(&response)?;
         Ok(Stats::from(wire_resp))
     }
 
@@ -51,8 +50,7 @@ impl<B: BinaryClient> SystemClient for B {
         let response = self
             .send_raw_with_response(GET_ME_CODE, GetMeRequest.to_bytes())
             .await?;
-        let wire_resp =
-            ClientDetailsResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<ClientDetailsResponse>(&response)?;
         Ok(ClientInfoDetails::from(wire_resp))
     }
 
@@ -64,8 +62,7 @@ impl<B: BinaryClient> SystemClient for B {
         if response.is_empty() {
             return Ok(None);
         }
-        let wire_resp =
-            ClientDetailsResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<ClientDetailsResponse>(&response)?;
         Ok(Some(ClientInfoDetails::from(wire_resp)))
     }
 
@@ -77,8 +74,7 @@ impl<B: BinaryClient> SystemClient for B {
         if response.is_empty() {
             return Ok(Vec::new());
         }
-        let wire_resp =
-            GetClientsResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<GetClientsResponse>(&response)?;
         Ok(clients_from_wire(wire_resp))
     }
 
diff --git a/core/common/src/traits/binary_impls/topics.rs 
b/core/common/src/traits/binary_impls/topics.rs
index 8e6547ddf..9304ed54c 100644
--- a/core/common/src/traits/binary_impls/topics.rs
+++ b/core/common/src/traits/binary_impls/topics.rs
@@ -23,7 +23,7 @@ use crate::{
     TopicClient, TopicDetails,
 };
 use iggy_binary_protocol::WireName;
-use iggy_binary_protocol::codec::{WireDecode, WireEncode};
+use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::{
     CREATE_TOPIC_CODE, DELETE_TOPIC_CODE, GET_TOPIC_CODE, GET_TOPICS_CODE, 
PURGE_TOPIC_CODE,
     UPDATE_TOPIC_CODE,
@@ -58,8 +58,7 @@ impl<B: BinaryClient> TopicClient for B {
         if response.is_empty() {
             return Ok(None);
         }
-        let wire_resp =
-            GetTopicResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = super::decode_response::<GetTopicResponse>(&response)?;
         Ok(Some(TopicDetails::from(wire_resp)))
     }
 
@@ -78,8 +77,7 @@ impl<B: BinaryClient> TopicClient for B {
         if response.is_empty() {
             return Ok(Vec::new());
         }
-        let wire_resp =
-            GetTopicsResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<GetTopicsResponse>(&response)?;
         Ok(topics_from_wire(wire_resp))
     }
 
@@ -111,8 +109,7 @@ impl<B: BinaryClient> TopicClient for B {
                 .to_bytes(),
             )
             .await?;
-        let wire_resp =
-            GetTopicResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = super::decode_response::<GetTopicResponse>(&response)?;
         Ok(TopicDetails::from(wire_resp))
     }
 
diff --git a/core/common/src/traits/binary_impls/users.rs 
b/core/common/src/traits/binary_impls/users.rs
index bf2f06fd6..24a877a4c 100644
--- a/core/common/src/traits/binary_impls/users.rs
+++ b/core/common/src/traits/binary_impls/users.rs
@@ -23,7 +23,7 @@ use crate::{
     UserClient, UserInfo, UserInfoDetails, UserStatus,
 };
 use iggy_binary_protocol::WireName;
-use iggy_binary_protocol::codec::{WireDecode, WireEncode};
+use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::codes::{
     CHANGE_PASSWORD_CODE, CREATE_USER_CODE, DELETE_USER_CODE, GET_USER_CODE, 
GET_USERS_CODE,
     LOGIN_USER_CODE, LOGOUT_USER_CODE, UPDATE_PERMISSIONS_CODE, 
UPDATE_USER_CODE,
@@ -49,8 +49,7 @@ impl<B: BinaryClient> UserClient for B {
         if response.is_empty() {
             return Ok(None);
         }
-        let wire_resp =
-            UserDetailsResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<UserDetailsResponse>(&response)?;
         UserInfoDetails::try_from(wire_resp).map(Some)
     }
 
@@ -62,8 +61,7 @@ impl<B: BinaryClient> UserClient for B {
         if response.is_empty() {
             return Ok(Vec::new());
         }
-        let wire_resp =
-            GetUsersResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = super::decode_response::<GetUsersResponse>(&response)?;
         users_from_wire(wire_resp)
     }
 
@@ -89,8 +87,7 @@ impl<B: BinaryClient> UserClient for B {
                 .to_bytes(),
             )
             .await?;
-        let wire_resp =
-            UserDetailsResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = 
super::decode_response::<UserDetailsResponse>(&response)?;
         UserInfoDetails::try_from(wire_resp)
     }
 
@@ -187,8 +184,7 @@ impl<B: BinaryClient> UserClient for B {
             .await?;
         self.set_state(ClientState::Authenticated).await;
         self.publish_event(DiagnosticEvent::SignedIn).await;
-        let wire_resp =
-            IdentityResponse::decode_from(&response).map_err(|_| 
IggyError::InvalidFormat)?;
+        let wire_resp = super::decode_response::<IdentityResponse>(&response)?;
         Ok(IdentityInfo::from(wire_resp))
     }
 
diff --git a/core/common/src/wire_conversions.rs 
b/core/common/src/wire_conversions.rs
index c5a48916c..a1bb47556 100644
--- a/core/common/src/wire_conversions.rs
+++ b/core/common/src/wire_conversions.rs
@@ -64,6 +64,9 @@ use 
iggy_binary_protocol::responses::users::user_response::UserResponse;
 use iggy_binary_protocol::responses::users::{GetUsersResponse, 
UserDetailsResponse};
 use std::collections::{BTreeMap, HashMap};
 
+/// Sentinel value in the wire protocol indicating no authenticated user.
+const WIRE_NO_USER_ID: u32 = u32::MAX;
+
 // ---------------------------------------------------------------------------
 // Streams
 // ---------------------------------------------------------------------------
@@ -240,7 +243,7 @@ impl From<ConsumerGroupInfoResponse> for ConsumerGroupInfo {
 impl From<ClientResponse> for ClientInfo {
     fn from(w: ClientResponse) -> Self {
         let user_id = match w.user_id {
-            u32::MAX => None,
+            WIRE_NO_USER_ID => None,
             id => Some(id),
         };
         let transport = match w.transport {
@@ -535,16 +538,20 @@ pub fn polling_strategy_to_wire(
 /// Convert a domain `Partitioning` to `WirePartitioning`.
 pub fn partitioning_to_wire(
     partitioning: &crate::Partitioning,
-) -> iggy_binary_protocol::primitives::partitioning::WirePartitioning {
+) -> Result<iggy_binary_protocol::primitives::partitioning::WirePartitioning, 
IggyError> {
     use iggy_binary_protocol::primitives::partitioning::WirePartitioning;
     match partitioning.kind {
-        crate::PartitioningKind::Balanced => WirePartitioning::Balanced,
+        crate::PartitioningKind::Balanced => Ok(WirePartitioning::Balanced),
         crate::PartitioningKind::PartitionId => {
-            let id = 
u32::from_le_bytes(partitioning.value[..4].try_into().unwrap());
-            WirePartitioning::PartitionId(id)
+            let bytes: [u8; 4] = partitioning
+                .value
+                .get(..4)
+                .and_then(|s| s.try_into().ok())
+                .ok_or(IggyError::InvalidCommand)?;
+            Ok(WirePartitioning::PartitionId(u32::from_le_bytes(bytes)))
         }
         crate::PartitioningKind::MessagesKey => {
-            WirePartitioning::MessagesKey(partitioning.value.clone())
+            Ok(WirePartitioning::MessagesKey(partitioning.value.clone()))
         }
     }
 }
@@ -626,6 +633,62 @@ impl From<WirePermissions> for Permissions {
     }
 }
 
+/// Convert `&WirePermissions` to domain `Permissions` without consuming the 
input.
+pub fn wire_permissions_to_permissions(wp: &WirePermissions) -> Permissions {
+    let streams = if wp.streams.is_empty() {
+        None
+    } else {
+        let mut map = BTreeMap::new();
+        for ws in &wp.streams {
+            let topics = if ws.topics.is_empty() {
+                None
+            } else {
+                let mut tmap = BTreeMap::new();
+                for wt in &ws.topics {
+                    tmap.insert(
+                        wt.topic_id as usize,
+                        TopicPermissions {
+                            manage_topic: wt.manage_topic,
+                            read_topic: wt.read_topic,
+                            poll_messages: wt.poll_messages,
+                            send_messages: wt.send_messages,
+                        },
+                    );
+                }
+                Some(tmap)
+            };
+            map.insert(
+                ws.stream_id as usize,
+                StreamPermissions {
+                    manage_stream: ws.manage_stream,
+                    read_stream: ws.read_stream,
+                    manage_topics: ws.manage_topics,
+                    read_topics: ws.read_topics,
+                    poll_messages: ws.poll_messages,
+                    send_messages: ws.send_messages,
+                    topics,
+                },
+            );
+        }
+        Some(map)
+    };
+    Permissions {
+        global: GlobalPermissions {
+            manage_servers: wp.global.manage_servers,
+            read_servers: wp.global.read_servers,
+            manage_users: wp.global.manage_users,
+            read_users: wp.global.read_users,
+            manage_streams: wp.global.manage_streams,
+            read_streams: wp.global.read_streams,
+            manage_topics: wp.global.manage_topics,
+            read_topics: wp.global.read_topics,
+            poll_messages: wp.global.poll_messages,
+            send_messages: wp.global.send_messages,
+        },
+        streams,
+    }
+}
+
 // ---------------------------------------------------------------------------
 // Permissions (domain -> wire)
 // ---------------------------------------------------------------------------
diff --git a/core/server/src/binary/dispatch.rs 
b/core/server/src/binary/dispatch.rs
index b5e485783..9138483e1 100644
--- a/core/server/src/binary/dispatch.rs
+++ b/core/server/src/binary/dispatch.rs
@@ -23,9 +23,6 @@ use bytes::BytesMut;
 use iggy_binary_protocol::RequestFrame;
 use iggy_binary_protocol::codec::WireDecode;
 use iggy_binary_protocol::codes::*;
-use iggy_binary_protocol::primitives::permissions::{
-    WireGlobalPermissions, WirePermissions, WireStreamPermissions, 
WireTopicPermissions,
-};
 use iggy_binary_protocol::requests::consumer_groups::*;
 use iggy_binary_protocol::requests::consumer_offsets::*;
 use iggy_binary_protocol::requests::messages::*;
@@ -37,10 +34,8 @@ use iggy_binary_protocol::requests::system::*;
 use iggy_binary_protocol::requests::topics::*;
 use iggy_binary_protocol::requests::users::*;
 use iggy_common::{
-    Consumer, ConsumerKind, GlobalPermissions, Identifier, IggyError, 
Permissions, PollingKind,
-    PollingStrategy, SenderKind, StreamPermissions, TopicPermissions,
+    Consumer, ConsumerKind, Identifier, IggyError, PollingKind, 
PollingStrategy, SenderKind,
 };
-use std::collections::BTreeMap;
 use std::rc::Rc;
 use tracing::{error, warn};
 
@@ -102,18 +97,6 @@ pub fn wire_id_to_identifier(
     }
 }
 
-/// Convert a domain `Identifier` to `WireIdentifier`.
-pub fn identifier_to_wire_id(
-    id: &Identifier,
-) -> Result<iggy_binary_protocol::WireIdentifier, IggyError> {
-    if let Ok(value) = id.get_u32_value() {
-        Ok(iggy_binary_protocol::WireIdentifier::numeric(value))
-    } else {
-        let name = id.get_string_value()?;
-        iggy_binary_protocol::WireIdentifier::named(name).map_err(|_| 
IggyError::InvalidIdentifier)
-    }
-}
-
 /// Convert a `WireConsumer` to the domain `Consumer`.
 pub fn wire_consumer_to_consumer(
     wire: &iggy_binary_protocol::WireConsumer,
@@ -133,124 +116,6 @@ pub fn wire_polling_to_strategy(
     })
 }
 
-fn wire_topic_to_domain(wt: &WireTopicPermissions) -> TopicPermissions {
-    TopicPermissions {
-        manage_topic: wt.manage_topic,
-        read_topic: wt.read_topic,
-        poll_messages: wt.poll_messages,
-        send_messages: wt.send_messages,
-    }
-}
-
-fn wire_stream_to_domain(ws: &WireStreamPermissions) -> StreamPermissions {
-    let topics = if ws.topics.is_empty() {
-        None
-    } else {
-        let mut map = BTreeMap::new();
-        for wt in &ws.topics {
-            map.insert(wt.topic_id as usize, wire_topic_to_domain(wt));
-        }
-        Some(map)
-    };
-    StreamPermissions {
-        manage_stream: ws.manage_stream,
-        read_stream: ws.read_stream,
-        manage_topics: ws.manage_topics,
-        read_topics: ws.read_topics,
-        poll_messages: ws.poll_messages,
-        send_messages: ws.send_messages,
-        topics,
-    }
-}
-
-/// Convert `WirePermissions` to the domain `Permissions`.
-pub fn wire_permissions_to_permissions(wp: &WirePermissions) -> Permissions {
-    let streams = if wp.streams.is_empty() {
-        None
-    } else {
-        let mut map = BTreeMap::new();
-        for ws in &wp.streams {
-            map.insert(ws.stream_id as usize, wire_stream_to_domain(ws));
-        }
-        Some(map)
-    };
-    Permissions {
-        global: GlobalPermissions {
-            manage_servers: wp.global.manage_servers,
-            read_servers: wp.global.read_servers,
-            manage_users: wp.global.manage_users,
-            read_users: wp.global.read_users,
-            manage_streams: wp.global.manage_streams,
-            read_streams: wp.global.read_streams,
-            manage_topics: wp.global.manage_topics,
-            read_topics: wp.global.read_topics,
-            poll_messages: wp.global.poll_messages,
-            send_messages: wp.global.send_messages,
-        },
-        streams,
-    }
-}
-
-fn domain_topic_to_wire(topic_id: usize, tp: &TopicPermissions) -> 
WireTopicPermissions {
-    WireTopicPermissions {
-        topic_id: topic_id as u32,
-        manage_topic: tp.manage_topic,
-        read_topic: tp.read_topic,
-        poll_messages: tp.poll_messages,
-        send_messages: tp.send_messages,
-    }
-}
-
-fn domain_stream_to_wire(stream_id: usize, sp: &StreamPermissions) -> 
WireStreamPermissions {
-    let topics: Vec<WireTopicPermissions> = sp
-        .topics
-        .as_ref()
-        .map(|map| {
-            map.iter()
-                .map(|(&tid, tp)| domain_topic_to_wire(tid, tp))
-                .collect()
-        })
-        .unwrap_or_default();
-    WireStreamPermissions {
-        stream_id: stream_id as u32,
-        manage_stream: sp.manage_stream,
-        read_stream: sp.read_stream,
-        manage_topics: sp.manage_topics,
-        read_topics: sp.read_topics,
-        poll_messages: sp.poll_messages,
-        send_messages: sp.send_messages,
-        topics,
-    }
-}
-
-/// Convert domain `Permissions` to `WirePermissions`.
-pub fn domain_permissions_to_wire(perms: &Permissions) -> WirePermissions {
-    let streams: Vec<WireStreamPermissions> = perms
-        .streams
-        .as_ref()
-        .map(|map| {
-            map.iter()
-                .map(|(&sid, sp)| domain_stream_to_wire(sid, sp))
-                .collect()
-        })
-        .unwrap_or_default();
-    WirePermissions {
-        global: WireGlobalPermissions {
-            manage_servers: perms.global.manage_servers,
-            read_servers: perms.global.read_servers,
-            manage_users: perms.global.manage_users,
-            read_users: perms.global.read_users,
-            manage_streams: perms.global.manage_streams,
-            read_streams: perms.global.read_streams,
-            manage_topics: perms.global.manage_topics,
-            read_topics: perms.global.read_topics,
-            poll_messages: perms.global.poll_messages,
-            send_messages: perms.global.send_messages,
-        },
-        streams,
-    }
-}
-
 /// Maximum payload size for control-plane commands (non-SendMessages).
 /// Prevents OOM from malicious clients sending `length = u32::MAX`.
 /// SendMessages has its own size validation via `total_payload_size` checks.
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs 
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 909e69410..04aaf8a2e 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::binary::dispatch::{HandlerResult, domain_permissions_to_wire};
+use crate::binary::dispatch::HandlerResult;
 use crate::shard::IggyShard;
 use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload};
@@ -25,6 +25,7 @@ use iggy_binary_protocol::WireName;
 use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::requests::users::CreateUserRequest;
 use iggy_binary_protocol::responses::users::{UserDetailsResponse, 
UserResponse};
+use iggy_common::wire_conversions::permissions_to_wire;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
@@ -58,7 +59,7 @@ pub async fn handle_create_user(
                     username: WireName::new(&user.username)
                         .map_err(|_| IggyError::InvalidCommand)?,
                 },
-                permissions: 
user.permissions.as_ref().map(domain_permissions_to_wire),
+                permissions: 
user.permissions.as_ref().map(permissions_to_wire),
             };
             sender.send_ok_response(&response.to_bytes()).await?;
         }
diff --git a/core/server/src/binary/handlers/users/get_user_handler.rs 
b/core/server/src/binary/handlers/users/get_user_handler.rs
index 66251e46c..f7415c8f5 100644
--- a/core/server/src/binary/handlers/users/get_user_handler.rs
+++ b/core/server/src/binary/handlers/users/get_user_handler.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use crate::binary::dispatch::{HandlerResult, domain_permissions_to_wire, 
wire_id_to_identifier};
+use crate::binary::dispatch::{HandlerResult, wire_id_to_identifier};
 use crate::shard::IggyShard;
 use crate::streaming::session::Session;
 use iggy_binary_protocol::WireName;
@@ -25,6 +25,7 @@ use iggy_binary_protocol::requests::users::GetUserRequest;
 use iggy_binary_protocol::responses::users::{UserDetailsResponse, 
UserResponse};
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
+use iggy_common::wire_conversions::permissions_to_wire;
 use std::rc::Rc;
 use tracing::debug;
 
@@ -55,7 +56,7 @@ pub async fn handle_get_user(
             username: WireName::new(user.username.as_ref())
                 .map_err(|_| IggyError::InvalidCommand)?,
         },
-        permissions: 
user.permissions.as_deref().map(domain_permissions_to_wire),
+        permissions: user.permissions.as_deref().map(permissions_to_wire),
     };
     sender.send_ok_response(&response.to_bytes()).await?;
     Ok(HandlerResult::Finished)
diff --git a/core/server/src/http/consumer_groups.rs 
b/core/server/src/http/consumer_groups.rs
index 35ae2a03f..fe98ff56b 100644
--- a/core/server/src/http/consumer_groups.rs
+++ b/core/server/src/http/consumer_groups.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-use crate::binary::dispatch::identifier_to_wire_id;
 use crate::http::error::CustomError;
 use crate::http::jwt::json_web_token::Identity;
 use crate::http::mapper;
@@ -36,6 +35,7 @@ use iggy_binary_protocol::requests::consumer_groups::{
 use iggy_common::Identifier;
 use iggy_common::Validatable;
 use iggy_common::create_consumer_group::CreateConsumerGroup;
+use iggy_common::wire_conversions::identifier_to_wire;
 use iggy_common::{ConsumerGroup, ConsumerGroupDetails, IggyError};
 use std::sync::Arc;
 use tracing::instrument;
@@ -123,8 +123,8 @@ async fn create_consumer_group(
     let topic = shard.resolve_topic(&command.stream_id, &command.topic_id)?;
 
     let wire_command = WireCreateConsumerGroup {
-        stream_id: identifier_to_wire_id(&command.stream_id)?,
-        topic_id: identifier_to_wire_id(&command.topic_id)?,
+        stream_id: identifier_to_wire(&command.stream_id)?,
+        topic_id: identifier_to_wire(&command.topic_id)?,
         name: WireName::new(&command.name).map_err(|_| 
IggyError::InvalidConsumerGroupName)?,
     };
     let request = 
ShardRequest::control_plane(ShardRequestPayload::CreateConsumerGroupRequest {
@@ -160,9 +160,9 @@ async fn delete_consumer_group(
     let group_id = Identifier::from_str_value(&group_id)?;
 
     let wire_command = WireDeleteConsumerGroup {
-        stream_id: identifier_to_wire_id(&stream_id)?,
-        topic_id: identifier_to_wire_id(&topic_id)?,
-        group_id: identifier_to_wire_id(&group_id)?,
+        stream_id: identifier_to_wire(&stream_id)?,
+        topic_id: identifier_to_wire(&topic_id)?,
+        group_id: identifier_to_wire(&group_id)?,
     };
     let request = 
ShardRequest::control_plane(ShardRequestPayload::DeleteConsumerGroupRequest {
         user_id: identity.user_id,
diff --git a/core/server/src/http/partitions.rs 
b/core/server/src/http/partitions.rs
index ff696099c..b7883c55e 100644
--- a/core/server/src/http/partitions.rs
+++ b/core/server/src/http/partitions.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-use crate::binary::dispatch::identifier_to_wire_id;
 use crate::http::error::CustomError;
 use crate::http::jwt::json_web_token::Identity;
 use crate::http::shared::AppState;
@@ -34,6 +33,7 @@ use iggy_common::Identifier;
 use iggy_common::Validatable;
 use iggy_common::create_partitions::CreatePartitions;
 use iggy_common::delete_partitions::DeletePartitions;
+use iggy_common::wire_conversions::identifier_to_wire;
 use std::sync::Arc;
 use tracing::instrument;
 
@@ -59,8 +59,8 @@ async fn create_partitions(
     command.validate()?;
 
     let wire_command = WireCreatePartitions {
-        stream_id: identifier_to_wire_id(&command.stream_id)?,
-        topic_id: identifier_to_wire_id(&command.topic_id)?,
+        stream_id: identifier_to_wire(&command.stream_id)?,
+        topic_id: identifier_to_wire(&command.topic_id)?,
         partitions_count: command.partitions_count,
     };
     let request = 
ShardRequest::control_plane(ShardRequestPayload::CreatePartitionsRequest {
@@ -88,8 +88,8 @@ async fn delete_partitions(
     query.validate()?;
 
     let wire_command = WireDeletePartitions {
-        stream_id: identifier_to_wire_id(&query.stream_id)?,
-        topic_id: identifier_to_wire_id(&query.topic_id)?,
+        stream_id: identifier_to_wire(&query.stream_id)?,
+        topic_id: identifier_to_wire(&query.topic_id)?,
         partitions_count: query.partitions_count,
     };
     let request = 
ShardRequest::control_plane(ShardRequestPayload::DeletePartitionsRequest {
diff --git a/core/server/src/http/segments.rs b/core/server/src/http/segments.rs
index b2b8f9b52..c06f119eb 100644
--- a/core/server/src/http/segments.rs
+++ b/core/server/src/http/segments.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-use crate::binary::dispatch::identifier_to_wire_id;
 use crate::http::COMPONENT;
 use crate::http::error::CustomError;
 use crate::http::jwt::json_web_token::Identity;
@@ -33,6 +32,7 @@ use iggy_common::Identifier;
 use iggy_common::Validatable;
 use iggy_common::delete_segments::DeleteSegments;
 use iggy_common::sharding::IggyNamespace;
+use iggy_common::wire_conversions::identifier_to_wire;
 use send_wrapper::SendWrapper;
 use std::sync::Arc;
 use tracing::instrument;
@@ -99,8 +99,8 @@ async fn delete_segments(
     }
 
     let wire_command = DeleteSegmentsRequest {
-        stream_id: identifier_to_wire_id(&query.stream_id)?,
-        topic_id: identifier_to_wire_id(&query.topic_id)?,
+        stream_id: identifier_to_wire(&query.stream_id)?,
+        topic_id: identifier_to_wire(&query.topic_id)?,
         partition_id: query.partition_id,
         segments_count,
     };
diff --git a/core/server/src/http/streams.rs b/core/server/src/http/streams.rs
index 3ca89f346..101e7270b 100644
--- a/core/server/src/http/streams.rs
+++ b/core/server/src/http/streams.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-use crate::binary::dispatch::identifier_to_wire_id;
 use crate::http::error::CustomError;
 use crate::http::jwt::json_web_token::Identity;
 use crate::http::shared::AppState;
@@ -35,6 +34,7 @@ use iggy_common::Identifier;
 use iggy_common::Validatable;
 use iggy_common::create_stream::CreateStream;
 use iggy_common::update_stream::UpdateStream;
+use iggy_common::wire_conversions::identifier_to_wire;
 use iggy_common::{IggyError, Stream, StreamDetails};
 use std::sync::Arc;
 use tracing::instrument;
@@ -139,7 +139,7 @@ async fn update_stream(
     command.validate()?;
 
     let wire_command = WireUpdateStream {
-        stream_id: identifier_to_wire_id(&command.stream_id)?,
+        stream_id: identifier_to_wire(&command.stream_id)?,
         name: WireName::new(&command.name).map_err(|_| 
IggyError::InvalidStreamName)?,
     };
     let request = 
ShardRequest::control_plane(ShardRequestPayload::UpdateStreamRequest {
@@ -166,7 +166,7 @@ async fn delete_stream(
     let request = 
ShardRequest::control_plane(ShardRequestPayload::DeleteStreamRequest {
         user_id: identity.user_id,
         command: WireDeleteStream {
-            stream_id: identifier_to_wire_id(&stream_id)?,
+            stream_id: identifier_to_wire(&stream_id)?,
         },
     });
 
@@ -189,7 +189,7 @@ async fn purge_stream(
     let request = 
ShardRequest::control_plane(ShardRequestPayload::PurgeStreamRequest {
         user_id: identity.user_id,
         command: WirePurgeStream {
-            stream_id: identifier_to_wire_id(&stream_id)?,
+            stream_id: identifier_to_wire(&stream_id)?,
         },
     });
 
diff --git a/core/server/src/http/topics.rs b/core/server/src/http/topics.rs
index b72fc4ad3..40961259f 100644
--- a/core/server/src/http/topics.rs
+++ b/core/server/src/http/topics.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-use crate::binary::dispatch::identifier_to_wire_id;
 use crate::http::COMPONENT;
 use crate::http::error::CustomError;
 use crate::http::jwt::json_web_token::Identity;
@@ -37,6 +36,7 @@ use iggy_common::Identifier;
 use iggy_common::Validatable;
 use iggy_common::create_topic::CreateTopic;
 use iggy_common::update_topic::UpdateTopic;
+use iggy_common::wire_conversions::identifier_to_wire;
 use iggy_common::{IggyError, Topic, TopicDetails};
 use std::sync::Arc;
 use tracing::instrument;
@@ -151,7 +151,7 @@ async fn create_topic(
         .ok_or(CustomError::ResourceNotFound)?;
 
     let wire_command = WireCreateTopic {
-        stream_id: identifier_to_wire_id(&command.stream_id)?,
+        stream_id: identifier_to_wire(&command.stream_id)?,
         partitions_count: command.partitions_count,
         compression_algorithm: command.compression_algorithm.as_code(),
         message_expiry: command.message_expiry.into(),
@@ -193,8 +193,8 @@ async fn update_topic(
     command.validate()?;
 
     let wire_command = WireUpdateTopic {
-        stream_id: identifier_to_wire_id(&command.stream_id)?,
-        topic_id: identifier_to_wire_id(&command.topic_id)?,
+        stream_id: identifier_to_wire(&command.stream_id)?,
+        topic_id: identifier_to_wire(&command.topic_id)?,
         compression_algorithm: command.compression_algorithm.as_code(),
         message_expiry: command.message_expiry.into(),
         max_topic_size: command.max_topic_size.into(),
@@ -226,8 +226,8 @@ async fn delete_topic(
     let request = 
ShardRequest::control_plane(ShardRequestPayload::DeleteTopicRequest {
         user_id: identity.user_id,
         command: WireDeleteTopic {
-            stream_id: identifier_to_wire_id(&stream_id)?,
-            topic_id: identifier_to_wire_id(&topic_id)?,
+            stream_id: identifier_to_wire(&stream_id)?,
+            topic_id: identifier_to_wire(&topic_id)?,
         },
     });
 
@@ -251,8 +251,8 @@ async fn purge_topic(
     let request = 
ShardRequest::control_plane(ShardRequestPayload::PurgeTopicRequest {
         user_id: identity.user_id,
         command: WirePurgeTopic {
-            stream_id: identifier_to_wire_id(&stream_id)?,
-            topic_id: identifier_to_wire_id(&topic_id)?,
+            stream_id: identifier_to_wire(&stream_id)?,
+            topic_id: identifier_to_wire(&topic_id)?,
         },
     });
 
diff --git a/core/server/src/http/users.rs b/core/server/src/http/users.rs
index 4dcc7f73d..71927a1e3 100644
--- a/core/server/src/http/users.rs
+++ b/core/server/src/http/users.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-use crate::binary::dispatch::{domain_permissions_to_wire, 
identifier_to_wire_id};
 use crate::http::COMPONENT;
 use crate::http::error::CustomError;
 use crate::http::jwt::json_web_token::Identity;
@@ -46,6 +45,7 @@ use iggy_common::Identifier;
 use iggy_common::IdentityInfo;
 use iggy_common::Validatable;
 use iggy_common::login_user::LoginUser;
+use iggy_common::wire_conversions::{identifier_to_wire, permissions_to_wire};
 use iggy_common::{IggyError, UserInfo, UserInfoDetails};
 use secrecy::ExposeSecret;
 use send_wrapper::SendWrapper;
@@ -124,7 +124,7 @@ async fn create_user(
         username: WireName::new(&command.username).map_err(|_| 
IggyError::InvalidUsername)?,
         password: command.password.expose_secret().to_string(),
         status: command.status.as_code(),
-        permissions: 
command.permissions.as_ref().map(domain_permissions_to_wire),
+        permissions: command.permissions.as_ref().map(permissions_to_wire),
     };
     let request = 
ShardRequest::control_plane(ShardRequestPayload::CreateUserRequest {
         user_id: identity.user_id,
@@ -153,7 +153,7 @@ async fn update_user(
     command.validate()?;
 
     let wire_command = WireUpdateUser {
-        user_id: identifier_to_wire_id(&command.user_id)?,
+        user_id: identifier_to_wire(&command.user_id)?,
         username: command
             .username
             .as_deref()
@@ -186,8 +186,8 @@ async fn update_permissions(
     command.validate()?;
 
     let wire_command = WireUpdatePermissions {
-        user_id: identifier_to_wire_id(&command.user_id)?,
-        permissions: 
command.permissions.as_ref().map(domain_permissions_to_wire),
+        user_id: identifier_to_wire(&command.user_id)?,
+        permissions: command.permissions.as_ref().map(permissions_to_wire),
     };
     let request = 
ShardRequest::control_plane(ShardRequestPayload::UpdatePermissionsRequest {
         user_id: identity.user_id,
@@ -213,7 +213,7 @@ async fn change_password(
     command.validate()?;
 
     let wire_command = WireChangePassword {
-        user_id: identifier_to_wire_id(&command.user_id)?,
+        user_id: identifier_to_wire(&command.user_id)?,
         current_password: command.current_password.expose_secret().to_string(),
         new_password: command.new_password.expose_secret().to_string(),
     };
@@ -239,7 +239,7 @@ async fn delete_user(
     let user_id = Identifier::from_str_value(&user_id)?;
 
     let wire_command = WireDeleteUser {
-        user_id: identifier_to_wire_id(&user_id)?,
+        user_id: identifier_to_wire(&user_id)?,
     };
     let request = 
ShardRequest::control_plane(ShardRequestPayload::DeleteUserRequest {
         user_id: identity.user_id,
diff --git a/core/server/src/shard/execution.rs 
b/core/server/src/shard/execution.rs
index 26c73cfc2..14664aa9b 100644
--- a/core/server/src/shard/execution.rs
+++ b/core/server/src/shard/execution.rs
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use crate::binary::dispatch::{wire_id_to_identifier, 
wire_permissions_to_permissions};
+use crate::binary::dispatch::wire_id_to_identifier;
 use crate::streaming::users::user::User;
 use crate::streaming::utils::crypto;
 use crate::{
@@ -39,6 +39,7 @@ use crate::{
 use iggy_binary_protocol::requests::{
     consumer_groups::*, partitions::*, personal_access_tokens::*, streams::*, 
topics::*, users::*,
 };
+use iggy_common::wire_conversions::wire_permissions_to_permissions;
 use iggy_common::{
     CompressionAlgorithm, Identifier, IggyError, IggyExpiry, MaxTopicSize, 
PersonalAccessToken,
     UserStatus,
diff --git a/core/server/src/state/command.rs b/core/server/src/state/command.rs
index 4d544e6e8..d2a2f890c 100644
--- a/core/server/src/state/command.rs
+++ b/core/server/src/state/command.rs
@@ -163,6 +163,13 @@ impl WireDecode for EntryCommand {
         }
         let code = u32::from_le_bytes(buf[0..4].try_into().unwrap());
         let length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as 
usize;
+        if buf.len() < 8 + length {
+            return Err(WireError::UnexpectedEof {
+                offset: 8,
+                need: length,
+                have: buf.len() - 8,
+            });
+        }
         let payload = &buf[8..8 + length];
         let consumed = 8 + length;
         let cmd = match code {
diff --git a/core/server/src/state/models.rs b/core/server/src/state/models.rs
index 7d885b29d..86af7917c 100644
--- a/core/server/src/state/models.rs
+++ b/core/server/src/state/models.rs
@@ -106,7 +106,6 @@ impl Display for CreatePersonalAccessTokenWithHash {
     }
 }
 
-// Wire format for WithId wrappers: id:u32_le | inner_length:u32_le | 
inner_bytes
 // Wire format for WithId wrappers: id:u32_le | inner_length:u32_le | 
inner_bytes
 
 impl WireEncode for CreateStreamWithId {
diff --git a/core/server/src/state/system.rs b/core/server/src/state/system.rs
index 784f6df7a..678e5e2dc 100644
--- a/core/server/src/state/system.rs
+++ b/core/server/src/state/system.rs
@@ -16,7 +16,6 @@
  * under the License.
  */
 
-use crate::binary::dispatch::{domain_permissions_to_wire, 
wire_permissions_to_permissions};
 use crate::bootstrap::create_root_user;
 use crate::state::file::FileState;
 use crate::state::models::CreateUserWithId;
@@ -32,6 +31,7 @@ use iggy_common::IggyTimestamp;
 use iggy_common::MaxTopicSize;
 use iggy_common::PersonalAccessToken;
 use iggy_common::defaults::DEFAULT_ROOT_USER_ID;
+use iggy_common::wire_conversions::{permissions_to_wire, 
wire_permissions_to_permissions};
 use iggy_common::{Permissions, UserStatus};
 use std::collections::BTreeMap;
 use std::fmt::Display;
@@ -153,7 +153,7 @@ impl SystemState {
                     .expect("root username must be valid"),
                 password: root.password.clone(),
                 status: root.status.as_code(),
-                permissions: 
root.permissions.as_ref().map(domain_permissions_to_wire),
+                permissions: 
root.permissions.as_ref().map(permissions_to_wire),
             };
             state
                 .apply(0, &EntryCommand::CreateUser(CreateUserWithId {

Reply via email to