This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch refactor-binary-7-http
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit fe7c92d2ca7c2a915c3dec4c61fd11a4e1ec7d69
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Fri Mar 27 11:02:38 2026 +0100

    refactor(server): migrate WAL to WireEncode/WireDecode traits
    
    EntryCommand and StateEntry used BytesSerializable for WAL
    persistence despite the codebase already having WireEncode/
    WireDecode in iggy_binary_protocol. This dual-trait situation
    meant WAL code couldn't benefit from WireEncode's zero-alloc
    encode() path.
    
    EntryCommand now implements both WireEncode and WireDecode.
    StateEntry implements WireEncode only - from_bytes had zero
    callers since load_entries() uses hand-rolled async cursor
    reads for incremental validation and encryption interleaving.
---
 core/integration/tests/state/file.rs |   2 +-
 core/server/src/state/command.rs     | 255 ++++++++++++++++++++---------------
 core/server/src/state/entry.rs       |  70 +++-------
 core/server/src/state/file.rs        |  13 +-
 4 files changed, 179 insertions(+), 161 deletions(-)

diff --git a/core/integration/tests/state/file.rs 
b/core/integration/tests/state/file.rs
index 944f9a0e1..ca3c16235 100644
--- a/core/integration/tests/state/file.rs
+++ b/core/integration/tests/state/file.rs
@@ -18,7 +18,7 @@
 
 use crate::state::StateSetup;
 use bytes::Bytes;
-use iggy::prelude::BytesSerializable;
+use iggy_binary_protocol::WireEncode;
 use iggy_binary_protocol::WireName;
 use iggy_binary_protocol::requests::{streams::CreateStreamRequest, 
users::CreateUserRequest};
 use server::state::command::EntryCommand;
diff --git a/core/server/src/state/command.rs b/core/server/src/state/command.rs
index ccb19c05f..4d544e6e8 100644
--- a/core/server/src/state/command.rs
+++ b/core/server/src/state/command.rs
@@ -20,7 +20,7 @@ use crate::state::models::{
     CreateConsumerGroupWithId, CreatePersonalAccessTokenWithHash, 
CreateStreamWithId,
     CreateTopicWithId, CreateUserWithId,
 };
-use bytes::{Buf, BufMut, Bytes, BytesMut};
+use bytes::{BufMut, BytesMut};
 use iggy_binary_protocol::codes::{
     CHANGE_PASSWORD_CODE, CREATE_CONSUMER_GROUP_CODE, CREATE_PARTITIONS_CODE,
     CREATE_PERSONAL_ACCESS_TOKEN_CODE, CREATE_STREAM_CODE, CREATE_TOPIC_CODE, 
CREATE_USER_CODE,
@@ -40,9 +40,7 @@ use iggy_binary_protocol::requests::{
         ChangePasswordRequest, DeleteUserRequest, UpdatePermissionsRequest, 
UpdateUserRequest,
     },
 };
-use iggy_binary_protocol::{WireDecode, WireEncode};
-use iggy_common::BytesSerializable;
-use iggy_common::IggyError;
+use iggy_binary_protocol::{WireDecode, WireEncode, WireError};
 use std::fmt::{Display, Formatter};
 
 #[derive(Debug)]
@@ -69,119 +67,160 @@ pub enum EntryCommand {
     DeletePersonalAccessToken(DeletePersonalAccessTokenRequest),
 }
 
-fn wire_error_to_iggy(e: iggy_binary_protocol::WireError) -> IggyError {
-    tracing::warn!("wire decode error during WAL replay: {e}");
-    IggyError::InvalidCommand
-}
+impl WireEncode for EntryCommand {
+    fn encoded_size(&self) -> usize {
+        let inner_size = match self {
+            EntryCommand::CreateStream(cmd) => cmd.encoded_size(),
+            EntryCommand::UpdateStream(cmd) => cmd.encoded_size(),
+            EntryCommand::DeleteStream(cmd) => cmd.encoded_size(),
+            EntryCommand::PurgeStream(cmd) => cmd.encoded_size(),
+            EntryCommand::CreateTopic(cmd) => cmd.encoded_size(),
+            EntryCommand::UpdateTopic(cmd) => cmd.encoded_size(),
+            EntryCommand::DeleteTopic(cmd) => cmd.encoded_size(),
+            EntryCommand::PurgeTopic(cmd) => cmd.encoded_size(),
+            EntryCommand::CreatePartitions(cmd) => cmd.encoded_size(),
+            EntryCommand::DeletePartitions(cmd) => cmd.encoded_size(),
+            EntryCommand::DeleteSegments(cmd) => cmd.encoded_size(),
+            EntryCommand::CreateConsumerGroup(cmd) => cmd.encoded_size(),
+            EntryCommand::DeleteConsumerGroup(cmd) => cmd.encoded_size(),
+            EntryCommand::CreateUser(cmd) => cmd.encoded_size(),
+            EntryCommand::UpdateUser(cmd) => cmd.encoded_size(),
+            EntryCommand::DeleteUser(cmd) => cmd.encoded_size(),
+            EntryCommand::ChangePassword(cmd) => cmd.encoded_size(),
+            EntryCommand::UpdatePermissions(cmd) => cmd.encoded_size(),
+            EntryCommand::CreatePersonalAccessToken(cmd) => cmd.encoded_size(),
+            EntryCommand::DeletePersonalAccessToken(cmd) => cmd.encoded_size(),
+        };
+        4 + 4 + inner_size
+    }
 
-impl BytesSerializable for EntryCommand {
-    fn to_bytes(&self) -> Bytes {
-        let (code, command) = match self {
-            EntryCommand::CreateStream(cmd) => (CREATE_STREAM_CODE, 
cmd.to_bytes()),
-            EntryCommand::UpdateStream(cmd) => (UPDATE_STREAM_CODE, 
cmd.to_bytes()),
-            EntryCommand::DeleteStream(cmd) => (DELETE_STREAM_CODE, 
cmd.to_bytes()),
-            EntryCommand::PurgeStream(cmd) => (PURGE_STREAM_CODE, 
cmd.to_bytes()),
-            EntryCommand::CreateTopic(cmd) => (CREATE_TOPIC_CODE, 
cmd.to_bytes()),
-            EntryCommand::UpdateTopic(cmd) => (UPDATE_TOPIC_CODE, 
cmd.to_bytes()),
-            EntryCommand::DeleteTopic(cmd) => (DELETE_TOPIC_CODE, 
cmd.to_bytes()),
-            EntryCommand::PurgeTopic(cmd) => (PURGE_TOPIC_CODE, 
cmd.to_bytes()),
-            EntryCommand::CreatePartitions(cmd) => (CREATE_PARTITIONS_CODE, 
cmd.to_bytes()),
-            EntryCommand::DeletePartitions(cmd) => (DELETE_PARTITIONS_CODE, 
cmd.to_bytes()),
-            EntryCommand::DeleteSegments(cmd) => (DELETE_SEGMENTS_CODE, 
cmd.to_bytes()),
-            EntryCommand::CreateConsumerGroup(cmd) => 
(CREATE_CONSUMER_GROUP_CODE, cmd.to_bytes()),
-            EntryCommand::DeleteConsumerGroup(cmd) => 
(DELETE_CONSUMER_GROUP_CODE, cmd.to_bytes()),
-            EntryCommand::CreateUser(cmd) => (CREATE_USER_CODE, 
cmd.to_bytes()),
-            EntryCommand::UpdateUser(cmd) => (UPDATE_USER_CODE, 
cmd.to_bytes()),
-            EntryCommand::DeleteUser(cmd) => (DELETE_USER_CODE, 
cmd.to_bytes()),
-            EntryCommand::ChangePassword(cmd) => (CHANGE_PASSWORD_CODE, 
cmd.to_bytes()),
-            EntryCommand::UpdatePermissions(cmd) => (UPDATE_PERMISSIONS_CODE, 
cmd.to_bytes()),
+    fn encode(&self, buf: &mut BytesMut) {
+        let (code, inner_size) = match self {
+            EntryCommand::CreateStream(cmd) => (CREATE_STREAM_CODE, 
cmd.encoded_size()),
+            EntryCommand::UpdateStream(cmd) => (UPDATE_STREAM_CODE, 
cmd.encoded_size()),
+            EntryCommand::DeleteStream(cmd) => (DELETE_STREAM_CODE, 
cmd.encoded_size()),
+            EntryCommand::PurgeStream(cmd) => (PURGE_STREAM_CODE, 
cmd.encoded_size()),
+            EntryCommand::CreateTopic(cmd) => (CREATE_TOPIC_CODE, 
cmd.encoded_size()),
+            EntryCommand::UpdateTopic(cmd) => (UPDATE_TOPIC_CODE, 
cmd.encoded_size()),
+            EntryCommand::DeleteTopic(cmd) => (DELETE_TOPIC_CODE, 
cmd.encoded_size()),
+            EntryCommand::PurgeTopic(cmd) => (PURGE_TOPIC_CODE, 
cmd.encoded_size()),
+            EntryCommand::CreatePartitions(cmd) => (CREATE_PARTITIONS_CODE, 
cmd.encoded_size()),
+            EntryCommand::DeletePartitions(cmd) => (DELETE_PARTITIONS_CODE, 
cmd.encoded_size()),
+            EntryCommand::DeleteSegments(cmd) => (DELETE_SEGMENTS_CODE, 
cmd.encoded_size()),
+            EntryCommand::CreateConsumerGroup(cmd) => {
+                (CREATE_CONSUMER_GROUP_CODE, cmd.encoded_size())
+            }
+            EntryCommand::DeleteConsumerGroup(cmd) => {
+                (DELETE_CONSUMER_GROUP_CODE, cmd.encoded_size())
+            }
+            EntryCommand::CreateUser(cmd) => (CREATE_USER_CODE, 
cmd.encoded_size()),
+            EntryCommand::UpdateUser(cmd) => (UPDATE_USER_CODE, 
cmd.encoded_size()),
+            EntryCommand::DeleteUser(cmd) => (DELETE_USER_CODE, 
cmd.encoded_size()),
+            EntryCommand::ChangePassword(cmd) => (CHANGE_PASSWORD_CODE, 
cmd.encoded_size()),
+            EntryCommand::UpdatePermissions(cmd) => (UPDATE_PERMISSIONS_CODE, 
cmd.encoded_size()),
             EntryCommand::CreatePersonalAccessToken(cmd) => {
-                (CREATE_PERSONAL_ACCESS_TOKEN_CODE, cmd.to_bytes())
+                (CREATE_PERSONAL_ACCESS_TOKEN_CODE, cmd.encoded_size())
             }
             EntryCommand::DeletePersonalAccessToken(cmd) => {
-                (DELETE_PERSONAL_ACCESS_TOKEN_CODE, cmd.to_bytes())
+                (DELETE_PERSONAL_ACCESS_TOKEN_CODE, cmd.encoded_size())
             }
         };
-
-        let mut bytes = BytesMut::with_capacity(4 + 4 + command.len());
-        bytes.put_u32_le(code);
-        bytes.put_u32_le(command.len() as u32);
-        bytes.extend(command);
-        bytes.freeze()
+        buf.put_u32_le(code);
+        buf.put_u32_le(inner_size as u32);
+        match self {
+            EntryCommand::CreateStream(cmd) => cmd.encode(buf),
+            EntryCommand::UpdateStream(cmd) => cmd.encode(buf),
+            EntryCommand::DeleteStream(cmd) => cmd.encode(buf),
+            EntryCommand::PurgeStream(cmd) => cmd.encode(buf),
+            EntryCommand::CreateTopic(cmd) => cmd.encode(buf),
+            EntryCommand::UpdateTopic(cmd) => cmd.encode(buf),
+            EntryCommand::DeleteTopic(cmd) => cmd.encode(buf),
+            EntryCommand::PurgeTopic(cmd) => cmd.encode(buf),
+            EntryCommand::CreatePartitions(cmd) => cmd.encode(buf),
+            EntryCommand::DeletePartitions(cmd) => cmd.encode(buf),
+            EntryCommand::DeleteSegments(cmd) => cmd.encode(buf),
+            EntryCommand::CreateConsumerGroup(cmd) => cmd.encode(buf),
+            EntryCommand::DeleteConsumerGroup(cmd) => cmd.encode(buf),
+            EntryCommand::CreateUser(cmd) => cmd.encode(buf),
+            EntryCommand::UpdateUser(cmd) => cmd.encode(buf),
+            EntryCommand::DeleteUser(cmd) => cmd.encode(buf),
+            EntryCommand::ChangePassword(cmd) => cmd.encode(buf),
+            EntryCommand::UpdatePermissions(cmd) => cmd.encode(buf),
+            EntryCommand::CreatePersonalAccessToken(cmd) => cmd.encode(buf),
+            EntryCommand::DeletePersonalAccessToken(cmd) => cmd.encode(buf),
+        }
     }
+}
 
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
-    where
-        Self: Sized,
-    {
-        let code = bytes.slice(0..4).get_u32_le();
-        let length = bytes.slice(4..8).get_u32_le();
-        let payload = &bytes[8..8 + length as usize];
-        match code {
-            CREATE_STREAM_CODE => Ok(EntryCommand::CreateStream(
-                
CreateStreamWithId::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            UPDATE_STREAM_CODE => Ok(EntryCommand::UpdateStream(
-                
UpdateStreamRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            DELETE_STREAM_CODE => Ok(EntryCommand::DeleteStream(
-                
DeleteStreamRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            PURGE_STREAM_CODE => Ok(EntryCommand::PurgeStream(
-                
PurgeStreamRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            CREATE_TOPIC_CODE => Ok(EntryCommand::CreateTopic(
-                
CreateTopicWithId::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            UPDATE_TOPIC_CODE => Ok(EntryCommand::UpdateTopic(
-                
UpdateTopicRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            DELETE_TOPIC_CODE => Ok(EntryCommand::DeleteTopic(
-                
DeleteTopicRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            PURGE_TOPIC_CODE => Ok(EntryCommand::PurgeTopic(
-                
PurgeTopicRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            CREATE_PARTITIONS_CODE => Ok(EntryCommand::CreatePartitions(
-                
CreatePartitionsRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            DELETE_PARTITIONS_CODE => Ok(EntryCommand::DeletePartitions(
-                
DeletePartitionsRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            DELETE_SEGMENTS_CODE => Ok(EntryCommand::DeleteSegments(
-                
DeleteSegmentsRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            CREATE_CONSUMER_GROUP_CODE => Ok(EntryCommand::CreateConsumerGroup(
-                
CreateConsumerGroupWithId::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            DELETE_CONSUMER_GROUP_CODE => Ok(EntryCommand::DeleteConsumerGroup(
-                
DeleteConsumerGroupRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            CREATE_USER_CODE => Ok(EntryCommand::CreateUser(
-                
CreateUserWithId::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            UPDATE_USER_CODE => Ok(EntryCommand::UpdateUser(
-                
UpdateUserRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            DELETE_USER_CODE => Ok(EntryCommand::DeleteUser(
-                
DeleteUserRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            CHANGE_PASSWORD_CODE => Ok(EntryCommand::ChangePassword(
-                
ChangePasswordRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            UPDATE_PERMISSIONS_CODE => Ok(EntryCommand::UpdatePermissions(
-                
UpdatePermissionsRequest::decode_from(payload).map_err(wire_error_to_iggy)?,
-            )),
-            CREATE_PERSONAL_ACCESS_TOKEN_CODE => 
Ok(EntryCommand::CreatePersonalAccessToken(
-                CreatePersonalAccessTokenWithHash::decode_from(payload)
-                    .map_err(wire_error_to_iggy)?,
-            )),
-            DELETE_PERSONAL_ACCESS_TOKEN_CODE => 
Ok(EntryCommand::DeletePersonalAccessToken(
-                DeletePersonalAccessTokenRequest::decode_from(payload)
-                    .map_err(wire_error_to_iggy)?,
-            )),
-            _ => Err(IggyError::InvalidCommand),
+impl WireDecode for EntryCommand {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        if buf.len() < 8 {
+            return Err(WireError::UnexpectedEof {
+                offset: 0,
+                need: 8,
+                have: buf.len(),
+            });
         }
+        let code = u32::from_le_bytes(buf[0..4].try_into().unwrap());
+        let length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as 
usize;
+        let payload = &buf[8..8 + length];
+        let consumed = 8 + length;
+        let cmd = match code {
+            CREATE_STREAM_CODE => {
+                
EntryCommand::CreateStream(CreateStreamWithId::decode_from(payload)?)
+            }
+            UPDATE_STREAM_CODE => {
+                
EntryCommand::UpdateStream(UpdateStreamRequest::decode_from(payload)?)
+            }
+            DELETE_STREAM_CODE => {
+                
EntryCommand::DeleteStream(DeleteStreamRequest::decode_from(payload)?)
+            }
+            PURGE_STREAM_CODE => {
+                
EntryCommand::PurgeStream(PurgeStreamRequest::decode_from(payload)?)
+            }
+            CREATE_TOPIC_CODE => {
+                
EntryCommand::CreateTopic(CreateTopicWithId::decode_from(payload)?)
+            }
+            UPDATE_TOPIC_CODE => {
+                
EntryCommand::UpdateTopic(UpdateTopicRequest::decode_from(payload)?)
+            }
+            DELETE_TOPIC_CODE => {
+                
EntryCommand::DeleteTopic(DeleteTopicRequest::decode_from(payload)?)
+            }
+            PURGE_TOPIC_CODE => 
EntryCommand::PurgeTopic(PurgeTopicRequest::decode_from(payload)?),
+            CREATE_PARTITIONS_CODE => {
+                
EntryCommand::CreatePartitions(CreatePartitionsRequest::decode_from(payload)?)
+            }
+            DELETE_PARTITIONS_CODE => {
+                
EntryCommand::DeletePartitions(DeletePartitionsRequest::decode_from(payload)?)
+            }
+            DELETE_SEGMENTS_CODE => {
+                
EntryCommand::DeleteSegments(DeleteSegmentsRequest::decode_from(payload)?)
+            }
+            CREATE_CONSUMER_GROUP_CODE => {
+                
EntryCommand::CreateConsumerGroup(CreateConsumerGroupWithId::decode_from(payload)?)
+            }
+            DELETE_CONSUMER_GROUP_CODE => {
+                
EntryCommand::DeleteConsumerGroup(DeleteConsumerGroupRequest::decode_from(payload)?)
+            }
+            CREATE_USER_CODE => 
EntryCommand::CreateUser(CreateUserWithId::decode_from(payload)?),
+            UPDATE_USER_CODE => 
EntryCommand::UpdateUser(UpdateUserRequest::decode_from(payload)?),
+            DELETE_USER_CODE => 
EntryCommand::DeleteUser(DeleteUserRequest::decode_from(payload)?),
+            CHANGE_PASSWORD_CODE => {
+                
EntryCommand::ChangePassword(ChangePasswordRequest::decode_from(payload)?)
+            }
+            UPDATE_PERMISSIONS_CODE => {
+                
EntryCommand::UpdatePermissions(UpdatePermissionsRequest::decode_from(payload)?)
+            }
+            CREATE_PERSONAL_ACCESS_TOKEN_CODE => 
EntryCommand::CreatePersonalAccessToken(
+                CreatePersonalAccessTokenWithHash::decode_from(payload)?,
+            ),
+            DELETE_PERSONAL_ACCESS_TOKEN_CODE => 
EntryCommand::DeletePersonalAccessToken(
+                DeletePersonalAccessTokenRequest::decode_from(payload)?,
+            ),
+            _ => return Err(WireError::UnknownCommand(code)),
+        };
+        Ok((cmd, consumed))
     }
 }
 
diff --git a/core/server/src/state/entry.rs b/core/server/src/state/entry.rs
index 6370bd2dc..e10091ce8 100644
--- a/core/server/src/state/entry.rs
+++ b/core/server/src/state/entry.rs
@@ -17,10 +17,11 @@
  */
 
 use crate::state::command::EntryCommand;
-use bytes::{Buf, BufMut, Bytes, BytesMut};
+use bytes::{BufMut, Bytes, BytesMut};
+use iggy_binary_protocol::{WireDecode, WireEncode};
 use iggy_common::IggyError;
 use iggy_common::IggyTimestamp;
-use iggy_common::{BytesSerializable, calculate_checksum};
+use iggy_common::calculate_checksum;
 use std::fmt::{Display, Formatter};
 
 /// State entry in the log
@@ -78,7 +79,10 @@ impl StateEntry {
     }
 
     pub fn command(&self) -> Result<EntryCommand, IggyError> {
-        EntryCommand::from_bytes(self.command.clone())
+        EntryCommand::decode_from(&self.command).map_err(|e| {
+            tracing::warn!("wire decode error during WAL replay: {e}");
+            IggyError::InvalidCommand
+        })
     }
 
     #[allow(clippy::too_many_arguments)]
@@ -126,52 +130,22 @@ impl Display for StateEntry {
     }
 }
 
-impl BytesSerializable for StateEntry {
-    fn to_bytes(&self) -> Bytes {
-        let mut bytes = BytesMut::with_capacity(
-            8 + 8 + 4 + 4 + 8 + 8 + 4 + 4 + 4 + self.context.len() + 
self.command.len(),
-        );
-        bytes.put_u64_le(self.index);
-        bytes.put_u64_le(self.term);
-        bytes.put_u32_le(self.leader_id);
-        bytes.put_u32_le(self.version);
-        bytes.put_u64_le(self.flags);
-        bytes.put_u64_le(self.timestamp.into());
-        bytes.put_u32_le(self.user_id);
-        bytes.put_u64_le(self.checksum);
-        bytes.put_u32_le(self.context.len() as u32);
-        bytes.put_slice(&self.context);
-        bytes.extend(&self.command);
-        bytes.freeze()
+impl WireEncode for StateEntry {
+    fn encoded_size(&self) -> usize {
+        8 + 8 + 4 + 4 + 8 + 8 + 4 + 8 + 4 + self.context.len() + 
self.command.len()
     }
 
-    fn from_bytes(bytes: Bytes) -> Result<Self, IggyError>
-    where
-        Self: Sized,
-    {
-        let index = bytes.slice(0..8).get_u64_le();
-        let term = bytes.slice(8..16).get_u64_le();
-        let leader_id = bytes.slice(16..20).get_u32_le();
-        let version = bytes.slice(20..24).get_u32_le();
-        let flags = bytes.slice(24..32).get_u64_le();
-        let timestamp = IggyTimestamp::from(bytes.slice(32..40).get_u64_le());
-        let user_id = bytes.slice(40..44).get_u32_le();
-        let checksum = bytes.slice(44..52).get_u64_le();
-        let context_length = bytes.slice(52..56).get_u32_le() as usize;
-        let context = bytes.slice(56..56 + context_length);
-        let command = bytes.slice(56 + context_length..);
-
-        Ok(StateEntry {
-            index,
-            term,
-            leader_id,
-            version,
-            flags,
-            timestamp,
-            user_id,
-            checksum,
-            context,
-            command,
-        })
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u64_le(self.index);
+        buf.put_u64_le(self.term);
+        buf.put_u32_le(self.leader_id);
+        buf.put_u32_le(self.version);
+        buf.put_u64_le(self.flags);
+        buf.put_u64_le(self.timestamp.into());
+        buf.put_u32_le(self.user_id);
+        buf.put_u64_le(self.checksum);
+        buf.put_u32_le(self.context.len() as u32);
+        buf.put_slice(&self.context);
+        buf.extend_from_slice(&self.command);
     }
 }
diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs
index 08f91baf8..652d9aedd 100644
--- a/core/server/src/state/file.rs
+++ b/core/server/src/state/file.rs
@@ -24,7 +24,7 @@ use crate::streaming::utils::file;
 use bytes::{Buf, BufMut, Bytes, BytesMut};
 use compio::io::AsyncReadExt;
 use err_trail::ErrContext;
-use iggy_common::BytesSerializable;
+use iggy_binary_protocol::{WireDecode, WireEncode};
 use iggy_common::EncryptorKind;
 use iggy_common::IggyByteSize;
 use iggy_common::IggyError;
@@ -261,9 +261,14 @@ impl FileState {
             entry_command.put_u32_le(command_length as u32);
             entry_command.extend(command_payload);
             let command = entry_command.freeze();
-            EntryCommand::from_bytes(command.clone()).error(|e: &IggyError| {
-                format!("{COMPONENT} (error: {e}) - failed to parse entry 
command from bytes")
-            })?;
+            EntryCommand::decode_from(&command)
+                .map_err(|e| {
+                    tracing::warn!("wire decode error during WAL replay: {e}");
+                    IggyError::InvalidCommand
+                })
+                .error(|e: &IggyError| {
+                    format!("{COMPONENT} (error: {e}) - failed to parse entry 
command from bytes")
+                })?;
             let calculated_checksum = StateEntry::calculate_checksum(
                 index, term, leader_id, version, flags, timestamp, user_id, 
&context, &command,
             );

Reply via email to