This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch refactor-binary-7-http
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 92b607110d4217c9d9fc8a6ee4e05370b057d044
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Mar 27 14:52:46 2026 +0100

    fix(server,sdk): harden wire type migration
    
    The wire type refactor (prior commits) left several issues
    identified during 4-expert review:
    
    - WithId WireDecode impls panicked on corrupted WAL entries
      with inflated length fields instead of returning errors.
      All 5 wrapper types now bounds-check before slicing.
    - Binary handlers lost domain validation when command structs
      were replaced with wire types. Username (3-50), password
      (3-100), PAT name (3-30), and partition count constraints
      were only enforced on the HTTP path. Restored in 6 handlers.
    - Topic wire conversion silently degraded unknown compression
      codes to None via unwrap_or. Changed From to TryFrom with
      error propagation through SDK callers.
    - Duplicate 54-line permissions conversion (owned From impl
      + borrowing free function) collapsed to single delegation.
---
 core/common/src/traits/binary_impls/streams.rs     |   4 +-
 core/common/src/traits/binary_impls/topics.rs      |   6 +-
 core/common/src/wire_conversions.rs                | 104 +++++++--------------
 .../partitions/delete_partitions_handler.rs        |   4 +
 .../create_personal_access_token_handler.rs        |  10 ++
 .../handlers/users/change_password_handler.rs      |  10 ++
 .../binary/handlers/users/create_user_handler.rs   |  12 +++
 .../binary/handlers/users/login_user_handler.rs    |  12 +++
 .../binary/handlers/users/update_user_handler.rs   |   8 ++
 core/server/src/state/models.rs                    |  93 ++++++++++++++++--
 10 files changed, 179 insertions(+), 84 deletions(-)

diff --git a/core/common/src/traits/binary_impls/streams.rs 
b/core/common/src/traits/binary_impls/streams.rs
index b64c99261..bbfb015d3 100644
--- a/core/common/src/traits/binary_impls/streams.rs
+++ b/core/common/src/traits/binary_impls/streams.rs
@@ -47,7 +47,7 @@ impl<B: BinaryClient> StreamClient for B {
             return Ok(None);
         }
         let wire_resp = 
super::decode_response::<GetStreamResponse>(&response)?;
-        Ok(Some(StreamDetails::from(wire_resp)))
+        Ok(Some(StreamDetails::try_from(wire_resp)?))
     }
 
     async fn get_streams(&self) -> Result<Vec<Stream>, IggyError> {
@@ -72,7 +72,7 @@ impl<B: BinaryClient> StreamClient for B {
             )
             .await?;
         let wire_resp = 
super::decode_response::<GetStreamResponse>(&response)?;
-        Ok(StreamDetails::from(wire_resp))
+        Ok(StreamDetails::try_from(wire_resp)?)
     }
 
     async fn update_stream(&self, stream_id: &Identifier, name: &str) -> 
Result<(), IggyError> {
diff --git a/core/common/src/traits/binary_impls/topics.rs 
b/core/common/src/traits/binary_impls/topics.rs
index 9304ed54c..4c5c4ddbb 100644
--- a/core/common/src/traits/binary_impls/topics.rs
+++ b/core/common/src/traits/binary_impls/topics.rs
@@ -59,7 +59,7 @@ impl<B: BinaryClient> TopicClient for B {
             return Ok(None);
         }
         let wire_resp = super::decode_response::<GetTopicResponse>(&response)?;
-        Ok(Some(TopicDetails::from(wire_resp)))
+        Ok(Some(TopicDetails::try_from(wire_resp)?))
     }
 
     async fn get_topics(&self, stream_id: &Identifier) -> Result<Vec<Topic>, 
IggyError> {
@@ -78,7 +78,7 @@ impl<B: BinaryClient> TopicClient for B {
             return Ok(Vec::new());
         }
         let wire_resp = 
super::decode_response::<GetTopicsResponse>(&response)?;
-        Ok(topics_from_wire(wire_resp))
+        Ok(topics_from_wire(wire_resp)?)
     }
 
     async fn create_topic(
@@ -110,7 +110,7 @@ impl<B: BinaryClient> TopicClient for B {
             )
             .await?;
         let wire_resp = super::decode_response::<GetTopicResponse>(&response)?;
-        Ok(TopicDetails::from(wire_resp))
+        Ok(TopicDetails::try_from(wire_resp)?)
     }
 
     async fn update_topic(
diff --git a/core/common/src/wire_conversions.rs 
b/core/common/src/wire_conversions.rs
index a1bb47556..eaa728cdb 100644
--- a/core/common/src/wire_conversions.rs
+++ b/core/common/src/wire_conversions.rs
@@ -84,11 +84,17 @@ impl From<StreamResponse> for Stream {
     }
 }
 
-impl From<GetStreamResponse> for StreamDetails {
-    fn from(w: GetStreamResponse) -> Self {
-        let mut topics: Vec<Topic> = 
w.topics.into_iter().map(Topic::from).collect();
+impl TryFrom<GetStreamResponse> for StreamDetails {
+    type Error = IggyError;
+
+    fn try_from(w: GetStreamResponse) -> Result<Self, Self::Error> {
+        let mut topics: Vec<Topic> = w
+            .topics
+            .into_iter()
+            .map(Topic::try_from)
+            .collect::<Result<_, _>>()?;
         topics.sort_by_key(|t| t.id);
-        Self {
+        Ok(Self {
             id: w.stream.id,
             created_at: w.stream.created_at.into(),
             name: w.stream.name.to_string(),
@@ -96,7 +102,7 @@ impl From<GetStreamResponse> for StreamDetails {
             messages_count: w.stream.messages_count,
             topics_count: w.stream.topics_count,
             topics,
-        }
+        })
     }
 }
 
@@ -110,14 +116,16 @@ pub fn streams_from_wire(w: GetStreamsResponse) -> 
Vec<Stream> {
 // Topics
 // ---------------------------------------------------------------------------
 
-impl From<TopicHeader> for Topic {
-    fn from(w: TopicHeader) -> Self {
+impl TryFrom<TopicHeader> for Topic {
+    type Error = IggyError;
+
+    fn try_from(w: TopicHeader) -> Result<Self, Self::Error> {
         let message_expiry = match w.message_expiry {
             0 => IggyExpiry::NeverExpire,
             v => v.into(),
         };
         let max_topic_size: MaxTopicSize = w.max_topic_size.into();
-        Self {
+        Ok(Self {
             id: w.id,
             created_at: w.created_at.into(),
             name: w.name.to_string(),
@@ -125,11 +133,10 @@ impl From<TopicHeader> for Topic {
             size: IggyByteSize::from(w.size_bytes),
             messages_count: w.messages_count,
             message_expiry,
-            compression_algorithm: 
CompressionAlgorithm::from_code(w.compression_algorithm)
-                .unwrap_or(CompressionAlgorithm::None),
+            compression_algorithm: 
CompressionAlgorithm::from_code(w.compression_algorithm)?,
             max_topic_size,
             replication_factor: w.replication_factor,
-        }
+        })
     }
 }
 
@@ -146,13 +153,15 @@ impl From<PartitionResponse> for Partition {
     }
 }
 
-impl From<GetTopicResponse> for TopicDetails {
-    fn from(w: GetTopicResponse) -> Self {
-        let topic = Topic::from(w.topic);
+impl TryFrom<GetTopicResponse> for TopicDetails {
+    type Error = IggyError;
+
+    fn try_from(w: GetTopicResponse) -> Result<Self, Self::Error> {
+        let topic = Topic::try_from(w.topic)?;
         let mut partitions: Vec<Partition> =
             w.partitions.into_iter().map(Partition::from).collect();
         partitions.sort_by_key(|p| p.id);
-        Self {
+        Ok(Self {
             id: topic.id,
             created_at: topic.created_at,
             name: topic.name,
@@ -164,14 +173,18 @@ impl From<GetTopicResponse> for TopicDetails {
             replication_factor: topic.replication_factor,
             partitions_count: topic.partitions_count,
             partitions,
-        }
+        })
     }
 }
 
-pub fn topics_from_wire(w: GetTopicsResponse) -> Vec<Topic> {
-    let mut topics: Vec<Topic> = 
w.topics.into_iter().map(Topic::from).collect();
+pub fn topics_from_wire(w: GetTopicsResponse) -> Result<Vec<Topic>, IggyError> 
{
+    let mut topics: Vec<Topic> = w
+        .topics
+        .into_iter()
+        .map(Topic::try_from)
+        .collect::<Result<_, _>>()?;
     topics.sort_by_key(|t| t.id);
-    topics
+    Ok(topics)
 }
 
 // ---------------------------------------------------------------------------
@@ -635,58 +648,7 @@ impl From<WirePermissions> for Permissions {
 
 /// Convert `&WirePermissions` to domain `Permissions` without consuming the 
input.
 pub fn wire_permissions_to_permissions(wp: &WirePermissions) -> Permissions {
-    let streams = if wp.streams.is_empty() {
-        None
-    } else {
-        let mut map = BTreeMap::new();
-        for ws in &wp.streams {
-            let topics = if ws.topics.is_empty() {
-                None
-            } else {
-                let mut tmap = BTreeMap::new();
-                for wt in &ws.topics {
-                    tmap.insert(
-                        wt.topic_id as usize,
-                        TopicPermissions {
-                            manage_topic: wt.manage_topic,
-                            read_topic: wt.read_topic,
-                            poll_messages: wt.poll_messages,
-                            send_messages: wt.send_messages,
-                        },
-                    );
-                }
-                Some(tmap)
-            };
-            map.insert(
-                ws.stream_id as usize,
-                StreamPermissions {
-                    manage_stream: ws.manage_stream,
-                    read_stream: ws.read_stream,
-                    manage_topics: ws.manage_topics,
-                    read_topics: ws.read_topics,
-                    poll_messages: ws.poll_messages,
-                    send_messages: ws.send_messages,
-                    topics,
-                },
-            );
-        }
-        Some(map)
-    };
-    Permissions {
-        global: GlobalPermissions {
-            manage_servers: wp.global.manage_servers,
-            read_servers: wp.global.read_servers,
-            manage_users: wp.global.manage_users,
-            read_users: wp.global.read_users,
-            manage_streams: wp.global.manage_streams,
-            read_streams: wp.global.read_streams,
-            manage_topics: wp.global.manage_topics,
-            read_topics: wp.global.read_topics,
-            poll_messages: wp.global.poll_messages,
-            send_messages: wp.global.send_messages,
-        },
-        streams,
-    }
+    Permissions::from(wp.clone())
 }
 
 // ---------------------------------------------------------------------------
diff --git 
a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs 
b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
index f925d958b..14fcd577e 100644
--- a/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
+++ b/core/server/src/binary/handlers/partitions/delete_partitions_handler.rs
@@ -39,6 +39,10 @@ pub async fn handle_delete_partitions(
     );
     shard.ensure_authenticated(session)?;
 
+    if req.partitions_count == 0 {
+        return Err(IggyError::TooManyPartitions);
+    }
+
     let request = 
ShardRequest::control_plane(ShardRequestPayload::DeletePartitionsRequest {
         user_id: session.get_user_id(),
         command: req,
diff --git 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
index 263872da2..6bfc77fa0 100644
--- 
a/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
+++ 
b/core/server/src/binary/handlers/personal_access_tokens/create_personal_access_token_handler.rs
@@ -25,6 +25,9 @@ use iggy_binary_protocol::WireName;
 use iggy_binary_protocol::codec::WireEncode;
 use 
iggy_binary_protocol::requests::personal_access_tokens::CreatePersonalAccessTokenRequest;
 use 
iggy_binary_protocol::responses::personal_access_tokens::RawPersonalAccessTokenResponse;
+use iggy_common::defaults::{
+    MAX_PERSONAL_ACCESS_TOKEN_NAME_LENGTH, 
MIN_PERSONAL_ACCESS_TOKEN_NAME_LENGTH,
+};
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
@@ -42,6 +45,13 @@ pub async fn handle_create_personal_access_token(
     );
     shard.ensure_authenticated(session)?;
 
+    let name_len = req.name.as_str().len();
+    if 
!(MIN_PERSONAL_ACCESS_TOKEN_NAME_LENGTH..=MAX_PERSONAL_ACCESS_TOKEN_NAME_LENGTH)
+        .contains(&name_len)
+    {
+        return Err(IggyError::InvalidPersonalAccessTokenName);
+    }
+
     let request =
         
ShardRequest::control_plane(ShardRequestPayload::CreatePersonalAccessTokenRequest
 {
             user_id: session.get_user_id(),
diff --git a/core/server/src/binary/handlers/users/change_password_handler.rs 
b/core/server/src/binary/handlers/users/change_password_handler.rs
index 17fe0d071..860b0d82e 100644
--- a/core/server/src/binary/handlers/users/change_password_handler.rs
+++ b/core/server/src/binary/handlers/users/change_password_handler.rs
@@ -22,6 +22,7 @@ use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload};
 use crate::streaming::session::Session;
 use iggy_binary_protocol::requests::users::ChangePasswordRequest;
+use iggy_common::defaults::{MAX_PASSWORD_LENGTH, MIN_PASSWORD_LENGTH};
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
@@ -39,6 +40,15 @@ pub async fn handle_change_password(
     );
     shard.ensure_authenticated(session)?;
 
+    let current_len = req.current_password.len();
+    if !(MIN_PASSWORD_LENGTH..=MAX_PASSWORD_LENGTH).contains(&current_len) {
+        return Err(IggyError::InvalidPassword);
+    }
+    let new_len = req.new_password.len();
+    if !(MIN_PASSWORD_LENGTH..=MAX_PASSWORD_LENGTH).contains(&new_len) {
+        return Err(IggyError::InvalidPassword);
+    }
+
     let request = 
ShardRequest::control_plane(ShardRequestPayload::ChangePasswordRequest {
         user_id: session.get_user_id(),
         command: req,
diff --git a/core/server/src/binary/handlers/users/create_user_handler.rs 
b/core/server/src/binary/handlers/users/create_user_handler.rs
index 04aaf8a2e..7532523a4 100644
--- a/core/server/src/binary/handlers/users/create_user_handler.rs
+++ b/core/server/src/binary/handlers/users/create_user_handler.rs
@@ -25,6 +25,9 @@ use iggy_binary_protocol::WireName;
 use iggy_binary_protocol::codec::WireEncode;
 use iggy_binary_protocol::requests::users::CreateUserRequest;
 use iggy_binary_protocol::responses::users::{UserDetailsResponse, 
UserResponse};
+use iggy_common::defaults::{
+    MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH, 
MIN_USERNAME_LENGTH,
+};
 use iggy_common::wire_conversions::permissions_to_wire;
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
@@ -44,6 +47,15 @@ pub async fn handle_create_user(
     shard.ensure_authenticated(session)?;
     shard.metadata.perm_create_user(session.get_user_id())?;
 
+    let username_len = req.username.as_str().len();
+    if !(MIN_USERNAME_LENGTH..=MAX_USERNAME_LENGTH).contains(&username_len) {
+        return Err(IggyError::InvalidUsername);
+    }
+    let password_len = req.password.len();
+    if !(MIN_PASSWORD_LENGTH..=MAX_PASSWORD_LENGTH).contains(&password_len) {
+        return Err(IggyError::InvalidPassword);
+    }
+
     let request = 
ShardRequest::control_plane(ShardRequestPayload::CreateUserRequest {
         user_id: session.get_user_id(),
         command: req,
diff --git a/core/server/src/binary/handlers/users/login_user_handler.rs 
b/core/server/src/binary/handlers/users/login_user_handler.rs
index b3d603552..aa0924427 100644
--- a/core/server/src/binary/handlers/users/login_user_handler.rs
+++ b/core/server/src/binary/handlers/users/login_user_handler.rs
@@ -26,6 +26,9 @@ use iggy_binary_protocol::requests::users::LoginUserRequest;
 use iggy_binary_protocol::responses::users::IdentityResponse;
 use iggy_common::IggyError;
 use iggy_common::SenderKind;
+use iggy_common::defaults::{
+    MAX_PASSWORD_LENGTH, MAX_USERNAME_LENGTH, MIN_PASSWORD_LENGTH, 
MIN_USERNAME_LENGTH,
+};
 use std::rc::Rc;
 use tracing::{debug, info, instrument, warn};
 
@@ -42,6 +45,15 @@ pub async fn handle_login_user(
     }
 
     let username = req.username.as_str();
+    let username_len = username.len();
+    if !(MIN_USERNAME_LENGTH..=MAX_USERNAME_LENGTH).contains(&username_len) {
+        return Err(IggyError::InvalidUsername);
+    }
+    let password_len = req.password.len();
+    if !(MIN_PASSWORD_LENGTH..=MAX_PASSWORD_LENGTH).contains(&password_len) {
+        return Err(IggyError::InvalidPassword);
+    }
+
     debug!("session: {session}, command: login_user, username: {username}");
 
     info!("Logging in user: {username} ...");
diff --git a/core/server/src/binary/handlers/users/update_user_handler.rs 
b/core/server/src/binary/handlers/users/update_user_handler.rs
index 014bc9730..23b2e51cd 100644
--- a/core/server/src/binary/handlers/users/update_user_handler.rs
+++ b/core/server/src/binary/handlers/users/update_user_handler.rs
@@ -22,6 +22,7 @@ use crate::shard::transmission::frame::ShardResponse;
 use crate::shard::transmission::message::{ShardRequest, ShardRequestPayload};
 use crate::streaming::session::Session;
 use iggy_binary_protocol::requests::users::UpdateUserRequest;
+use iggy_common::defaults::{MAX_USERNAME_LENGTH, MIN_USERNAME_LENGTH};
 use iggy_common::{IggyError, SenderKind};
 use std::rc::Rc;
 use tracing::{debug, instrument};
@@ -40,6 +41,13 @@ pub async fn handle_update_user(
     shard.ensure_authenticated(session)?;
     shard.metadata.perm_update_user(session.get_user_id())?;
 
+    if let Some(ref username) = req.username {
+        let username_len = username.as_str().len();
+        if 
!(MIN_USERNAME_LENGTH..=MAX_USERNAME_LENGTH).contains(&username_len) {
+            return Err(IggyError::InvalidUsername);
+        }
+    }
+
     let request = 
ShardRequest::control_plane(ShardRequestPayload::UpdateUserRequest {
         user_id: session.get_user_id(),
         command: req,
diff --git a/core/server/src/state/models.rs b/core/server/src/state/models.rs
index 86af7917c..619e1a82e 100644
--- a/core/server/src/state/models.rs
+++ b/core/server/src/state/models.rs
@@ -131,8 +131,22 @@ impl WireDecode for CreateStreamWithId {
         }
         let stream_id = u32::from_le_bytes(buf[0..4].try_into().unwrap());
         let command_length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) 
as usize;
-        let (command, _) = CreateStreamRequest::decode(&buf[8..8 + 
command_length])?;
-        Ok((Self { stream_id, command }, 8 + command_length))
+        let total = 8usize.checked_add(command_length).ok_or(
+            iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: 4,
+                need: command_length,
+                have: buf.len() - 8,
+            },
+        )?;
+        if buf.len() < total {
+            return Err(iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: 8,
+                need: command_length,
+                have: buf.len() - 8,
+            });
+        }
+        let (command, _) = CreateStreamRequest::decode(&buf[8..total])?;
+        Ok((Self { stream_id, command }, total))
     }
 }
 
@@ -159,8 +173,22 @@ impl WireDecode for CreateTopicWithId {
         }
         let topic_id = u32::from_le_bytes(buf[0..4].try_into().unwrap());
         let command_length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) 
as usize;
-        let (command, _) = CreateTopicRequest::decode(&buf[8..8 + 
command_length])?;
-        Ok((Self { topic_id, command }, 8 + command_length))
+        let total = 8usize.checked_add(command_length).ok_or(
+            iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: 4,
+                need: command_length,
+                have: buf.len() - 8,
+            },
+        )?;
+        if buf.len() < total {
+            return Err(iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: 8,
+                need: command_length,
+                have: buf.len() - 8,
+            });
+        }
+        let (command, _) = CreateTopicRequest::decode(&buf[8..total])?;
+        Ok((Self { topic_id, command }, total))
     }
 }
 
@@ -187,8 +215,22 @@ impl WireDecode for CreateConsumerGroupWithId {
         }
         let group_id = u32::from_le_bytes(buf[0..4].try_into().unwrap());
         let command_length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) 
as usize;
-        let (command, _) = CreateConsumerGroupRequest::decode(&buf[8..8 + 
command_length])?;
-        Ok((Self { group_id, command }, 8 + command_length))
+        let total = 8usize.checked_add(command_length).ok_or(
+            iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: 4,
+                need: command_length,
+                have: buf.len() - 8,
+            },
+        )?;
+        if buf.len() < total {
+            return Err(iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: 8,
+                need: command_length,
+                have: buf.len() - 8,
+            });
+        }
+        let (command, _) = CreateConsumerGroupRequest::decode(&buf[8..total])?;
+        Ok((Self { group_id, command }, total))
     }
 }
 
@@ -215,8 +257,22 @@ impl WireDecode for CreateUserWithId {
         }
         let user_id = u32::from_le_bytes(buf[0..4].try_into().unwrap());
         let command_length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) 
as usize;
-        let (command, _) = CreateUserRequest::decode(&buf[8..8 + 
command_length])?;
-        Ok((Self { user_id, command }, 8 + command_length))
+        let total = 8usize.checked_add(command_length).ok_or(
+            iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: 4,
+                need: command_length,
+                have: buf.len() - 8,
+            },
+        )?;
+        if buf.len() < total {
+            return Err(iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: 8,
+                need: command_length,
+                have: buf.len() - 8,
+            });
+        }
+        let (command, _) = CreateUserRequest::decode(&buf[8..total])?;
+        Ok((Self { user_id, command }, total))
     }
 }
 
@@ -244,12 +300,33 @@ impl WireDecode for CreatePersonalAccessTokenWithHash {
         }
         let hash_length = u32::from_le_bytes(buf[0..4].try_into().unwrap()) as 
usize;
         let mut pos = 4;
+        if buf.len() < pos + hash_length {
+            return Err(iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: pos,
+                need: hash_length,
+                have: buf.len() - pos,
+            });
+        }
         let hash = std::str::from_utf8(&buf[pos..pos + hash_length])
             .map_err(|_| iggy_binary_protocol::WireError::InvalidUtf8 { 
offset: pos })?
             .to_string();
         pos += hash_length;
+        if buf.len() < pos + 4 {
+            return Err(iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: pos,
+                need: 4,
+                have: buf.len() - pos,
+            });
+        }
         let command_length = u32::from_le_bytes(buf[pos..pos + 
4].try_into().unwrap()) as usize;
         pos += 4;
+        if buf.len() < pos + command_length {
+            return Err(iggy_binary_protocol::WireError::UnexpectedEof {
+                offset: pos,
+                need: command_length,
+                have: buf.len() - pos,
+            });
+        }
         let (command, _) =
             CreatePersonalAccessTokenRequest::decode(&buf[pos..pos + 
command_length])?;
         pos += command_length;

Reply via email to