This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch refactor-binary-7-http in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 5ee559225d7df35aba826f834bd9c61745f6c346 Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Mar 27 11:02:38 2026 +0100 refactor(server): migrate WAL to WireEncode/WireDecode traits EntryCommand and StateEntry used BytesSerializable for WAL persistence despite the codebase already having WireEncode/ WireDecode in iggy_binary_protocol. This dual-trait situation meant WAL code couldn't benefit from WireEncode's zero-alloc encode() path. EntryCommand now implements both WireEncode and WireDecode. StateEntry implements WireEncode only - from_bytes had zero callers since load_entries() uses hand-rolled async cursor reads for incremental validation and encryption interleaving. --- core/integration/tests/state/file.rs | 2 +- core/server/src/state/command.rs | 255 ++++++++++++++++++++--------------- core/server/src/state/entry.rs | 70 +++------- core/server/src/state/file.rs | 13 +- 4 files changed, 179 insertions(+), 161 deletions(-) diff --git a/core/integration/tests/state/file.rs b/core/integration/tests/state/file.rs index 944f9a0e1..ca3c16235 100644 --- a/core/integration/tests/state/file.rs +++ b/core/integration/tests/state/file.rs @@ -18,7 +18,7 @@ use crate::state::StateSetup; use bytes::Bytes; -use iggy::prelude::BytesSerializable; +use iggy_binary_protocol::WireEncode; use iggy_binary_protocol::WireName; use iggy_binary_protocol::requests::{streams::CreateStreamRequest, users::CreateUserRequest}; use server::state::command::EntryCommand; diff --git a/core/server/src/state/command.rs b/core/server/src/state/command.rs index ccb19c05f..4d544e6e8 100644 --- a/core/server/src/state/command.rs +++ b/core/server/src/state/command.rs @@ -20,7 +20,7 @@ use crate::state::models::{ CreateConsumerGroupWithId, CreatePersonalAccessTokenWithHash, CreateStreamWithId, CreateTopicWithId, CreateUserWithId, }; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{BufMut, BytesMut}; use iggy_binary_protocol::codes::{ CHANGE_PASSWORD_CODE, CREATE_CONSUMER_GROUP_CODE, CREATE_PARTITIONS_CODE, CREATE_PERSONAL_ACCESS_TOKEN_CODE, CREATE_STREAM_CODE, CREATE_TOPIC_CODE, CREATE_USER_CODE, @@ -40,9 +40,7 @@ use iggy_binary_protocol::requests::{ ChangePasswordRequest, DeleteUserRequest, UpdatePermissionsRequest, UpdateUserRequest, }, }; -use iggy_binary_protocol::{WireDecode, WireEncode}; -use iggy_common::BytesSerializable; -use iggy_common::IggyError; +use iggy_binary_protocol::{WireDecode, WireEncode, WireError}; use std::fmt::{Display, Formatter}; #[derive(Debug)] @@ -69,119 +67,160 @@ pub enum EntryCommand { DeletePersonalAccessToken(DeletePersonalAccessTokenRequest), } -fn wire_error_to_iggy(e: iggy_binary_protocol::WireError) -> IggyError { - tracing::warn!("wire decode error during WAL replay: {e}"); - IggyError::InvalidCommand -} +impl WireEncode for EntryCommand { + fn encoded_size(&self) -> usize { + let inner_size = match self { + EntryCommand::CreateStream(cmd) => cmd.encoded_size(), + EntryCommand::UpdateStream(cmd) => cmd.encoded_size(), + EntryCommand::DeleteStream(cmd) => cmd.encoded_size(), + EntryCommand::PurgeStream(cmd) => cmd.encoded_size(), + EntryCommand::CreateTopic(cmd) => cmd.encoded_size(), + EntryCommand::UpdateTopic(cmd) => cmd.encoded_size(), + EntryCommand::DeleteTopic(cmd) => cmd.encoded_size(), + EntryCommand::PurgeTopic(cmd) => cmd.encoded_size(), + EntryCommand::CreatePartitions(cmd) => cmd.encoded_size(), + EntryCommand::DeletePartitions(cmd) => cmd.encoded_size(), + EntryCommand::DeleteSegments(cmd) => cmd.encoded_size(), + EntryCommand::CreateConsumerGroup(cmd) => cmd.encoded_size(), + EntryCommand::DeleteConsumerGroup(cmd) => cmd.encoded_size(), + EntryCommand::CreateUser(cmd) => cmd.encoded_size(), + EntryCommand::UpdateUser(cmd) => cmd.encoded_size(), + EntryCommand::DeleteUser(cmd) => cmd.encoded_size(), + EntryCommand::ChangePassword(cmd) => cmd.encoded_size(), + EntryCommand::UpdatePermissions(cmd) => cmd.encoded_size(), + EntryCommand::CreatePersonalAccessToken(cmd) => cmd.encoded_size(), + EntryCommand::DeletePersonalAccessToken(cmd) => cmd.encoded_size(), + }; + 4 + 4 + inner_size + } -impl BytesSerializable for EntryCommand { - fn to_bytes(&self) -> Bytes { - let (code, command) = match self { - EntryCommand::CreateStream(cmd) => (CREATE_STREAM_CODE, cmd.to_bytes()), - EntryCommand::UpdateStream(cmd) => (UPDATE_STREAM_CODE, cmd.to_bytes()), - EntryCommand::DeleteStream(cmd) => (DELETE_STREAM_CODE, cmd.to_bytes()), - EntryCommand::PurgeStream(cmd) => (PURGE_STREAM_CODE, cmd.to_bytes()), - EntryCommand::CreateTopic(cmd) => (CREATE_TOPIC_CODE, cmd.to_bytes()), - EntryCommand::UpdateTopic(cmd) => (UPDATE_TOPIC_CODE, cmd.to_bytes()), - EntryCommand::DeleteTopic(cmd) => (DELETE_TOPIC_CODE, cmd.to_bytes()), - EntryCommand::PurgeTopic(cmd) => (PURGE_TOPIC_CODE, cmd.to_bytes()), - EntryCommand::CreatePartitions(cmd) => (CREATE_PARTITIONS_CODE, cmd.to_bytes()), - EntryCommand::DeletePartitions(cmd) => (DELETE_PARTITIONS_CODE, cmd.to_bytes()), - EntryCommand::DeleteSegments(cmd) => (DELETE_SEGMENTS_CODE, cmd.to_bytes()), - EntryCommand::CreateConsumerGroup(cmd) => (CREATE_CONSUMER_GROUP_CODE, cmd.to_bytes()), - EntryCommand::DeleteConsumerGroup(cmd) => (DELETE_CONSUMER_GROUP_CODE, cmd.to_bytes()), - EntryCommand::CreateUser(cmd) => (CREATE_USER_CODE, cmd.to_bytes()), - EntryCommand::UpdateUser(cmd) => (UPDATE_USER_CODE, cmd.to_bytes()), - EntryCommand::DeleteUser(cmd) => (DELETE_USER_CODE, cmd.to_bytes()), - EntryCommand::ChangePassword(cmd) => (CHANGE_PASSWORD_CODE, cmd.to_bytes()), - EntryCommand::UpdatePermissions(cmd) => (UPDATE_PERMISSIONS_CODE, cmd.to_bytes()), + fn encode(&self, buf: &mut BytesMut) { + let (code, inner_size) = match self { + EntryCommand::CreateStream(cmd) => (CREATE_STREAM_CODE, cmd.encoded_size()), + EntryCommand::UpdateStream(cmd) => (UPDATE_STREAM_CODE, cmd.encoded_size()), + EntryCommand::DeleteStream(cmd) => (DELETE_STREAM_CODE, cmd.encoded_size()), + EntryCommand::PurgeStream(cmd) => (PURGE_STREAM_CODE, cmd.encoded_size()), + EntryCommand::CreateTopic(cmd) => (CREATE_TOPIC_CODE, cmd.encoded_size()), + EntryCommand::UpdateTopic(cmd) => (UPDATE_TOPIC_CODE, cmd.encoded_size()), + EntryCommand::DeleteTopic(cmd) => (DELETE_TOPIC_CODE, cmd.encoded_size()), + EntryCommand::PurgeTopic(cmd) => (PURGE_TOPIC_CODE, cmd.encoded_size()), + EntryCommand::CreatePartitions(cmd) => (CREATE_PARTITIONS_CODE, cmd.encoded_size()), + EntryCommand::DeletePartitions(cmd) => (DELETE_PARTITIONS_CODE, cmd.encoded_size()), + EntryCommand::DeleteSegments(cmd) => (DELETE_SEGMENTS_CODE, cmd.encoded_size()), + EntryCommand::CreateConsumerGroup(cmd) => { + (CREATE_CONSUMER_GROUP_CODE, cmd.encoded_size()) + } + EntryCommand::DeleteConsumerGroup(cmd) => { + (DELETE_CONSUMER_GROUP_CODE, cmd.encoded_size()) + } + EntryCommand::CreateUser(cmd) => (CREATE_USER_CODE, cmd.encoded_size()), + EntryCommand::UpdateUser(cmd) => (UPDATE_USER_CODE, cmd.encoded_size()), + EntryCommand::DeleteUser(cmd) => (DELETE_USER_CODE, cmd.encoded_size()), + EntryCommand::ChangePassword(cmd) => (CHANGE_PASSWORD_CODE, cmd.encoded_size()), + EntryCommand::UpdatePermissions(cmd) => (UPDATE_PERMISSIONS_CODE, cmd.encoded_size()), EntryCommand::CreatePersonalAccessToken(cmd) => { - (CREATE_PERSONAL_ACCESS_TOKEN_CODE, cmd.to_bytes()) + (CREATE_PERSONAL_ACCESS_TOKEN_CODE, cmd.encoded_size()) } EntryCommand::DeletePersonalAccessToken(cmd) => { - (DELETE_PERSONAL_ACCESS_TOKEN_CODE, cmd.to_bytes()) + (DELETE_PERSONAL_ACCESS_TOKEN_CODE, cmd.encoded_size()) } }; - - let mut bytes = BytesMut::with_capacity(4 + 4 + command.len()); - bytes.put_u32_le(code); - bytes.put_u32_le(command.len() as u32); - bytes.extend(command); - bytes.freeze() + buf.put_u32_le(code); + buf.put_u32_le(inner_size as u32); + match self { + EntryCommand::CreateStream(cmd) => cmd.encode(buf), + EntryCommand::UpdateStream(cmd) => cmd.encode(buf), + EntryCommand::DeleteStream(cmd) => cmd.encode(buf), + EntryCommand::PurgeStream(cmd) => cmd.encode(buf), + EntryCommand::CreateTopic(cmd) => cmd.encode(buf), + EntryCommand::UpdateTopic(cmd) => cmd.encode(buf), + EntryCommand::DeleteTopic(cmd) => cmd.encode(buf), + EntryCommand::PurgeTopic(cmd) => cmd.encode(buf), + EntryCommand::CreatePartitions(cmd) => cmd.encode(buf), + EntryCommand::DeletePartitions(cmd) => cmd.encode(buf), + EntryCommand::DeleteSegments(cmd) => cmd.encode(buf), + EntryCommand::CreateConsumerGroup(cmd) => cmd.encode(buf), + EntryCommand::DeleteConsumerGroup(cmd) => cmd.encode(buf), + EntryCommand::CreateUser(cmd) => cmd.encode(buf), + EntryCommand::UpdateUser(cmd) => cmd.encode(buf), + EntryCommand::DeleteUser(cmd) => cmd.encode(buf), + EntryCommand::ChangePassword(cmd) => cmd.encode(buf), + EntryCommand::UpdatePermissions(cmd) => cmd.encode(buf), + EntryCommand::CreatePersonalAccessToken(cmd) => cmd.encode(buf), + EntryCommand::DeletePersonalAccessToken(cmd) => cmd.encode(buf), + } } +} - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> - where - Self: Sized, - { - let code = bytes.slice(0..4).get_u32_le(); - let length = bytes.slice(4..8).get_u32_le(); - let payload = &bytes[8..8 + length as usize]; - match code { - CREATE_STREAM_CODE => Ok(EntryCommand::CreateStream( - CreateStreamWithId::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - UPDATE_STREAM_CODE => Ok(EntryCommand::UpdateStream( - UpdateStreamRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - DELETE_STREAM_CODE => Ok(EntryCommand::DeleteStream( - DeleteStreamRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - PURGE_STREAM_CODE => Ok(EntryCommand::PurgeStream( - PurgeStreamRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - CREATE_TOPIC_CODE => Ok(EntryCommand::CreateTopic( - CreateTopicWithId::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - UPDATE_TOPIC_CODE => Ok(EntryCommand::UpdateTopic( - UpdateTopicRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - DELETE_TOPIC_CODE => Ok(EntryCommand::DeleteTopic( - DeleteTopicRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - PURGE_TOPIC_CODE => Ok(EntryCommand::PurgeTopic( - PurgeTopicRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - CREATE_PARTITIONS_CODE => Ok(EntryCommand::CreatePartitions( - CreatePartitionsRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - DELETE_PARTITIONS_CODE => Ok(EntryCommand::DeletePartitions( - DeletePartitionsRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - DELETE_SEGMENTS_CODE => Ok(EntryCommand::DeleteSegments( - DeleteSegmentsRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - CREATE_CONSUMER_GROUP_CODE => Ok(EntryCommand::CreateConsumerGroup( - CreateConsumerGroupWithId::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - DELETE_CONSUMER_GROUP_CODE => Ok(EntryCommand::DeleteConsumerGroup( - DeleteConsumerGroupRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - CREATE_USER_CODE => Ok(EntryCommand::CreateUser( - CreateUserWithId::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - UPDATE_USER_CODE => Ok(EntryCommand::UpdateUser( - UpdateUserRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - DELETE_USER_CODE => Ok(EntryCommand::DeleteUser( - DeleteUserRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - CHANGE_PASSWORD_CODE => Ok(EntryCommand::ChangePassword( - ChangePasswordRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - UPDATE_PERMISSIONS_CODE => Ok(EntryCommand::UpdatePermissions( - UpdatePermissionsRequest::decode_from(payload).map_err(wire_error_to_iggy)?, - )), - CREATE_PERSONAL_ACCESS_TOKEN_CODE => Ok(EntryCommand::CreatePersonalAccessToken( - CreatePersonalAccessTokenWithHash::decode_from(payload) - .map_err(wire_error_to_iggy)?, - )), - DELETE_PERSONAL_ACCESS_TOKEN_CODE => Ok(EntryCommand::DeletePersonalAccessToken( - DeletePersonalAccessTokenRequest::decode_from(payload) - .map_err(wire_error_to_iggy)?, - )), - _ => Err(IggyError::InvalidCommand), +impl WireDecode for EntryCommand { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + if buf.len() < 8 { + return Err(WireError::UnexpectedEof { + offset: 0, + need: 8, + have: buf.len(), + }); } + let code = u32::from_le_bytes(buf[0..4].try_into().unwrap()); + let length = u32::from_le_bytes(buf[4..8].try_into().unwrap()) as usize; + let payload = &buf[8..8 + length]; + let consumed = 8 + length; + let cmd = match code { + CREATE_STREAM_CODE => { + EntryCommand::CreateStream(CreateStreamWithId::decode_from(payload)?) + } + UPDATE_STREAM_CODE => { + EntryCommand::UpdateStream(UpdateStreamRequest::decode_from(payload)?) + } + DELETE_STREAM_CODE => { + EntryCommand::DeleteStream(DeleteStreamRequest::decode_from(payload)?) + } + PURGE_STREAM_CODE => { + EntryCommand::PurgeStream(PurgeStreamRequest::decode_from(payload)?) + } + CREATE_TOPIC_CODE => { + EntryCommand::CreateTopic(CreateTopicWithId::decode_from(payload)?) + } + UPDATE_TOPIC_CODE => { + EntryCommand::UpdateTopic(UpdateTopicRequest::decode_from(payload)?) + } + DELETE_TOPIC_CODE => { + EntryCommand::DeleteTopic(DeleteTopicRequest::decode_from(payload)?) + } + PURGE_TOPIC_CODE => EntryCommand::PurgeTopic(PurgeTopicRequest::decode_from(payload)?), + CREATE_PARTITIONS_CODE => { + EntryCommand::CreatePartitions(CreatePartitionsRequest::decode_from(payload)?) + } + DELETE_PARTITIONS_CODE => { + EntryCommand::DeletePartitions(DeletePartitionsRequest::decode_from(payload)?) + } + DELETE_SEGMENTS_CODE => { + EntryCommand::DeleteSegments(DeleteSegmentsRequest::decode_from(payload)?) + } + CREATE_CONSUMER_GROUP_CODE => { + EntryCommand::CreateConsumerGroup(CreateConsumerGroupWithId::decode_from(payload)?) + } + DELETE_CONSUMER_GROUP_CODE => { + EntryCommand::DeleteConsumerGroup(DeleteConsumerGroupRequest::decode_from(payload)?) + } + CREATE_USER_CODE => EntryCommand::CreateUser(CreateUserWithId::decode_from(payload)?), + UPDATE_USER_CODE => EntryCommand::UpdateUser(UpdateUserRequest::decode_from(payload)?), + DELETE_USER_CODE => EntryCommand::DeleteUser(DeleteUserRequest::decode_from(payload)?), + CHANGE_PASSWORD_CODE => { + EntryCommand::ChangePassword(ChangePasswordRequest::decode_from(payload)?) + } + UPDATE_PERMISSIONS_CODE => { + EntryCommand::UpdatePermissions(UpdatePermissionsRequest::decode_from(payload)?) + } + CREATE_PERSONAL_ACCESS_TOKEN_CODE => EntryCommand::CreatePersonalAccessToken( + CreatePersonalAccessTokenWithHash::decode_from(payload)?, + ), + DELETE_PERSONAL_ACCESS_TOKEN_CODE => EntryCommand::DeletePersonalAccessToken( + DeletePersonalAccessTokenRequest::decode_from(payload)?, + ), + _ => return Err(WireError::UnknownCommand(code)), + }; + Ok((cmd, consumed)) } } diff --git a/core/server/src/state/entry.rs b/core/server/src/state/entry.rs index 6370bd2dc..e10091ce8 100644 --- a/core/server/src/state/entry.rs +++ b/core/server/src/state/entry.rs @@ -17,10 +17,11 @@ */ use crate::state::command::EntryCommand; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{BufMut, Bytes, BytesMut}; +use iggy_binary_protocol::{WireDecode, WireEncode}; use iggy_common::IggyError; use iggy_common::IggyTimestamp; -use iggy_common::{BytesSerializable, calculate_checksum}; +use iggy_common::calculate_checksum; use std::fmt::{Display, Formatter}; /// State entry in the log @@ -78,7 +79,10 @@ impl StateEntry { } pub fn command(&self) -> Result<EntryCommand, IggyError> { - EntryCommand::from_bytes(self.command.clone()) + EntryCommand::decode_from(&self.command).map_err(|e| { + tracing::warn!("wire decode error during WAL replay: {e}"); + IggyError::InvalidCommand + }) } #[allow(clippy::too_many_arguments)] @@ -126,52 +130,22 @@ impl Display for StateEntry { } } -impl BytesSerializable for StateEntry { - fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity( - 8 + 8 + 4 + 4 + 8 + 8 + 4 + 4 + 4 + self.context.len() + self.command.len(), - ); - bytes.put_u64_le(self.index); - bytes.put_u64_le(self.term); - bytes.put_u32_le(self.leader_id); - bytes.put_u32_le(self.version); - bytes.put_u64_le(self.flags); - bytes.put_u64_le(self.timestamp.into()); - bytes.put_u32_le(self.user_id); - bytes.put_u64_le(self.checksum); - bytes.put_u32_le(self.context.len() as u32); - bytes.put_slice(&self.context); - bytes.extend(&self.command); - bytes.freeze() +impl WireEncode for StateEntry { + fn encoded_size(&self) -> usize { + 8 + 8 + 4 + 4 + 8 + 8 + 4 + 8 + 4 + self.context.len() + self.command.len() } - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> - where - Self: Sized, - { - let index = bytes.slice(0..8).get_u64_le(); - let term = bytes.slice(8..16).get_u64_le(); - let leader_id = bytes.slice(16..20).get_u32_le(); - let version = bytes.slice(20..24).get_u32_le(); - let flags = bytes.slice(24..32).get_u64_le(); - let timestamp = IggyTimestamp::from(bytes.slice(32..40).get_u64_le()); - let user_id = bytes.slice(40..44).get_u32_le(); - let checksum = bytes.slice(44..52).get_u64_le(); - let context_length = bytes.slice(52..56).get_u32_le() as usize; - let context = bytes.slice(56..56 + context_length); - let command = bytes.slice(56 + context_length..); - - Ok(StateEntry { - index, - term, - leader_id, - version, - flags, - timestamp, - user_id, - checksum, - context, - command, - }) + fn encode(&self, buf: &mut BytesMut) { + buf.put_u64_le(self.index); + buf.put_u64_le(self.term); + buf.put_u32_le(self.leader_id); + buf.put_u32_le(self.version); + buf.put_u64_le(self.flags); + buf.put_u64_le(self.timestamp.into()); + buf.put_u32_le(self.user_id); + buf.put_u64_le(self.checksum); + buf.put_u32_le(self.context.len() as u32); + buf.put_slice(&self.context); + buf.extend_from_slice(&self.command); } } diff --git a/core/server/src/state/file.rs b/core/server/src/state/file.rs index 08f91baf8..652d9aedd 100644 --- a/core/server/src/state/file.rs +++ b/core/server/src/state/file.rs @@ -24,7 +24,7 @@ use crate::streaming::utils::file; use bytes::{Buf, BufMut, Bytes, BytesMut}; use compio::io::AsyncReadExt; use err_trail::ErrContext; -use iggy_common::BytesSerializable; +use iggy_binary_protocol::{WireDecode, WireEncode}; use iggy_common::EncryptorKind; use iggy_common::IggyByteSize; use iggy_common::IggyError; @@ -261,9 +261,14 @@ impl FileState { entry_command.put_u32_le(command_length as u32); entry_command.extend(command_payload); let command = entry_command.freeze(); - EntryCommand::from_bytes(command.clone()).error(|e: &IggyError| { - format!("{COMPONENT} (error: {e}) - failed to parse entry command from bytes") - })?; + EntryCommand::decode_from(&command) + .map_err(|e| { + tracing::warn!("wire decode error during WAL replay: {e}"); + IggyError::InvalidCommand + }) + .error(|e: &IggyError| { + format!("{COMPONENT} (error: {e}) - failed to parse entry command from bytes") + })?; let calculated_checksum = StateEntry::calculate_checksum( index, term, leader_id, version, flags, timestamp, user_id, &context, &command, );
