This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch refactor-binary-7-http in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 056cff621975181226c91804d2999eb5837e73ba Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Mar 27 11:34:03 2026 +0100 refactor(sdk): delete BytesSerializable trait entirely BytesSerializable conflated wire protocol encoding with domain type serialization, duplicating WireEncode/WireDecode which already existed in iggy_binary_protocol. SDK poll/send paths built request bytes through 4-5 intermediate allocations per BytesSerializable::to_bytes() call. SDK poll_messages now constructs PollMessagesRequest directly from wire conversion functions. SDK send_messages uses the zero-copy SendMessagesEncoder with borrowed RawMessage refs. IggyMessage/IggyMessageHeader retain to_bytes/from_bytes as inherent methods for CLI and encryption callers. HashMap header serialization becomes standalone functions since inherent methods cannot be added to foreign types. --- .../src/commands/binary_message/poll_messages.rs | 8 +- .../src/commands/binary_message/send_messages.rs | 4 +- core/common/src/http/messages/poll_messages.rs | 44 +--- core/common/src/http/messages/send_messages.rs | 78 +------ core/common/src/lib.rs | 1 - core/common/src/traits/binary_impls/messages.rs | 68 ++++-- core/common/src/traits/bytes_serializable.rs | 41 ---- core/common/src/traits/mod.rs | 1 - core/common/src/types/consumer/consumer_kind.rs | 27 --- core/common/src/types/identifier/mod.rs | 76 +----- core/common/src/types/message/iggy_message.rs | 137 ++++++----- core/common/src/types/message/message_header.rs | 8 +- core/common/src/types/message/message_view.rs | 4 +- core/common/src/types/message/messages_batch.rs | 4 +- .../common/src/types/message/messages_batch_mut.rs | 6 +- core/common/src/types/message/mod.rs | 3 +- core/common/src/types/message/partitioning.rs | 71 +----- core/common/src/types/message/polled_messages.rs | 4 +- core/common/src/types/message/polling_strategy.rs | 27 --- core/common/src/types/message/user_headers.rs | 178 +++++++------- .../src/types/permissions/permissions_global.rs | 258 --------------------- core/common/src/wire_conversions.rs | 27 +++ .../tests/server/scenarios/offset_scenario.rs | 4 +- .../tests/server/scenarios/timestamp_scenario.rs | 4 +- core/sdk/src/prelude.rs | 9 +- core/server/src/shard/system/messages.rs | 3 +- core/server/src/state/models.rs | 2 +- .../message-compression/consumer/main.rs | 2 +- 28 files changed, 266 insertions(+), 833 deletions(-) diff --git a/core/cli/src/commands/binary_message/poll_messages.rs b/core/cli/src/commands/binary_message/poll_messages.rs index 3cb4c4abe..1b0a18422 100644 --- a/core/cli/src/commands/binary_message/poll_messages.rs +++ b/core/cli/src/commands/binary_message/poll_messages.rs @@ -22,10 +22,10 @@ use async_trait::async_trait; use comfy_table::{Cell, CellAlignment, Row, Table}; use iggy_common::Client; use iggy_common::{ - BytesSerializable, Consumer, HeaderKey, HeaderKind, HeaderValue, Identifier, IggyByteSize, - IggyDuration, IggyMessage, IggyTimestamp, PollMessages, PollingStrategy, Sizeable, + Consumer, HeaderKey, HeaderKind, Identifier, IggyByteSize, IggyDuration, IggyMessage, + IggyTimestamp, PollMessages, PollingStrategy, Sizeable, user_headers_from_bytes, }; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use tokio::io::AsyncWriteExt; use tracing::{Level, event}; @@ -85,7 +85,7 @@ impl PollMessagesCmd { .iter() .flat_map(|m| { if let Some(user_headers) = &m.user_headers { - match HashMap::<HeaderKey, HeaderValue>::from_bytes(user_headers.clone()) { + match user_headers_from_bytes(user_headers.clone()) { Ok(headers) => headers .iter() .map(|(k, v)| (k.clone(), v.kind())) diff --git a/core/cli/src/commands/binary_message/send_messages.rs b/core/cli/src/commands/binary_message/send_messages.rs index 9dbdcbe22..ebbd13c5d 100644 --- a/core/cli/src/commands/binary_message/send_messages.rs +++ b/core/cli/src/commands/binary_message/send_messages.rs @@ -21,9 +21,7 @@ use anyhow::Context; use async_trait::async_trait; use bytes::Bytes; use iggy_common::Client; -use iggy_common::{ - BytesSerializable, HeaderKey, HeaderValue, Identifier, IggyMessage, Partitioning, Sizeable, -}; +use iggy_common::{HeaderKey, HeaderValue, Identifier, IggyMessage, Partitioning, Sizeable}; use std::collections::HashMap; use std::io::{self, Read}; use tokio::io::AsyncReadExt; diff --git a/core/common/src/http/messages/poll_messages.rs b/core/common/src/http/messages/poll_messages.rs index e8ca86539..d0d2e7c0a 100644 --- a/core/common/src/http/messages/poll_messages.rs +++ b/core/common/src/http/messages/poll_messages.rs @@ -18,8 +18,7 @@ use crate::Consumer; use crate::error::IggyError; -use crate::{BytesSerializable, Identifier, PollingStrategy, Validatable}; -use bytes::{BufMut, Bytes, BytesMut}; +use crate::{Identifier, PollingStrategy, Validatable}; use serde::{Deserialize, Serialize}; pub const DEFAULT_PARTITION_ID: u32 = 0; @@ -60,47 +59,6 @@ pub struct PollMessages { } impl PollMessages { - pub fn bytes( - stream_id: &Identifier, - topic_id: &Identifier, - partition_id: Option<u32>, - consumer: &Consumer, - strategy: &PollingStrategy, - count: u32, - auto_commit: bool, - ) -> Bytes { - let consumer_bytes = consumer.to_bytes(); - let stream_id_bytes = stream_id.to_bytes(); - let topic_id_bytes = topic_id.to_bytes(); - let strategy_bytes = strategy.to_bytes(); - let mut bytes = BytesMut::with_capacity( - 10 + consumer_bytes.len() - + stream_id_bytes.len() - + topic_id_bytes.len() - + strategy_bytes.len(), - ); - bytes.put_slice(&consumer_bytes); - bytes.put_slice(&stream_id_bytes); - bytes.put_slice(&topic_id_bytes); - // Encode partition_id with a flag byte: 1 = Some, 0 = None - if let Some(partition_id) = partition_id { - bytes.put_u8(1); - bytes.put_u32_le(partition_id); - } else { - bytes.put_u8(0); - bytes.put_u32_le(0); // Padding to keep structure consistent - } - bytes.put_slice(&strategy_bytes); - bytes.put_u32_le(count); - if auto_commit { - bytes.put_u8(1); - } else { - bytes.put_u8(0); - } - - bytes.freeze() - } - pub fn default_number_of_messages_to_poll() -> u32 { DEFAULT_NUMBER_OF_MESSAGES_TO_POLL } diff --git a/core/common/src/http/messages/send_messages.rs b/core/common/src/http/messages/send_messages.rs index 0168caa2d..0f6b137c6 100644 --- a/core/common/src/http/messages/send_messages.rs +++ b/core/common/src/http/messages/send_messages.rs @@ -16,18 +16,15 @@ * under the License. */ -use crate::BytesSerializable; use crate::Identifier; use crate::IggyMessageView; use crate::PartitioningKind; -use crate::Sizeable; use crate::Validatable; use crate::error::IggyError; use crate::types::message::HeaderEntry; use crate::types::message::partitioning::Partitioning; -use crate::{INDEX_SIZE, IggyMessage, IggyMessagesBatch}; +use crate::{IggyMessage, IggyMessagesBatch}; use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64}; -use bytes::{BufMut, Bytes, BytesMut}; use serde::de::{self, MapAccess, Visitor}; use serde::ser::SerializeStruct; use serde::{Deserialize, Deserializer, Serialize, Serializer}; @@ -54,79 +51,6 @@ pub struct SendMessages { pub batch: IggyMessagesBatch, } -impl SendMessages { - pub fn bytes( - stream_id: &Identifier, - topic_id: &Identifier, - partitioning: &Partitioning, - messages: &[IggyMessage], - ) -> Bytes { - let stream_id_field_size = stream_id.get_buffer_size(); - let topic_id_field_size = topic_id.get_buffer_size(); - let partitioning_field_size = partitioning.get_buffer_size(); - let metadata_length_field_size = size_of::<u32>(); - let messages_count = messages.len(); - let messages_count_field_size = size_of::<u32>(); - let metadata_length = stream_id_field_size - + topic_id_field_size - + partitioning_field_size - + messages_count_field_size; - let indexes_size = messages_count * INDEX_SIZE; - let messages_size = messages - .iter() - .map(|m| m.get_size_bytes().as_bytes_usize()) - .sum::<usize>(); - - let total_size = metadata_length_field_size - + stream_id_field_size - + topic_id_field_size - + partitioning_field_size - + messages_count_field_size - + indexes_size - + messages_size; - - let mut bytes = BytesMut::with_capacity(total_size); - - bytes.put_u32_le(metadata_length as u32); - stream_id.write_to_buffer(&mut bytes); - topic_id.write_to_buffer(&mut bytes); - partitioning.write_to_buffer(&mut bytes); - bytes.put_u32_le(messages_count as u32); - - let mut current_position = bytes.len(); - - bytes.put_bytes(0, indexes_size); - - let mut msg_size: u32 = 0; - for message in messages.iter() { - message.write_to_buffer(&mut bytes); - msg_size += message.get_size_bytes().as_bytes_u64() as u32; - write_value_at(&mut bytes, 0u64.to_le_bytes(), current_position); - write_value_at(&mut bytes, msg_size.to_le_bytes(), current_position + 4); - write_value_at(&mut bytes, 0u64.to_le_bytes(), current_position + 8); - current_position += INDEX_SIZE; - } - - let out = bytes.freeze(); - - debug_assert_eq!( - total_size, - out.len(), - "Calculated SendMessages command byte size doesn't match actual command size", - ); - - out - } -} - -fn write_value_at<const N: usize>(slice: &mut [u8], value: [u8; N], position: usize) { - let slice = &mut slice[position..position + N]; - let ptr = slice.as_mut_ptr(); - unsafe { - std::ptr::copy_nonoverlapping(value.as_ptr(), ptr, N); - } -} - impl Default for SendMessages { fn default() -> Self { SendMessages { diff --git a/core/common/src/lib.rs b/core/common/src/lib.rs index f094461f3..dca080e70 100644 --- a/core/common/src/lib.rs +++ b/core/common/src/lib.rs @@ -54,7 +54,6 @@ pub use sender::{ }; pub use traits::binary_client::BinaryClient; pub use traits::binary_transport::BinaryTransport; -pub use traits::bytes_serializable::BytesSerializable; pub use traits::client::Client; pub use traits::cluster_client::ClusterClient; pub use traits::consumer_group_client::ConsumerGroupClient; diff --git a/core/common/src/traits/binary_impls/messages.rs b/core/common/src/traits/binary_impls/messages.rs index 623f0af90..d77d07b61 100644 --- a/core/common/src/traits/binary_impls/messages.rs +++ b/core/common/src/traits/binary_impls/messages.rs @@ -17,16 +17,21 @@ */ use crate::BinaryClient; use crate::traits::binary_auth::fail_if_not_authenticated; -use crate::wire_conversions::identifier_to_wire; +use crate::wire_conversions::{ + consumer_to_wire, identifier_to_wire, partitioning_to_wire, polling_strategy_to_wire, +}; use crate::{ - Consumer, Identifier, IggyError, IggyMessage, MessageClient, Partitioning, PollMessages, - PolledMessages, PollingStrategy, SendMessages, + Consumer, Identifier, IggyError, IggyMessage, MessageClient, Partitioning, PolledMessages, + PollingStrategy, }; +use bytes::BytesMut; use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ FLUSH_UNSAVED_BUFFER_CODE, POLL_MESSAGES_CODE, SEND_MESSAGES_CODE, }; -use iggy_binary_protocol::requests::messages::FlushUnsavedBufferRequest; +use iggy_binary_protocol::requests::messages::{ + FlushUnsavedBufferRequest, PollMessagesRequest, RawMessage, SendMessagesEncoder, +}; #[async_trait::async_trait] impl<B: BinaryClient> MessageClient for B { @@ -41,19 +46,17 @@ impl<B: BinaryClient> MessageClient for B { auto_commit: bool, ) -> Result<PolledMessages, IggyError> { fail_if_not_authenticated(self).await?; + let req = PollMessagesRequest { + consumer: consumer_to_wire(consumer)?, + stream_id: identifier_to_wire(stream_id)?, + topic_id: identifier_to_wire(topic_id)?, + partition_id, + strategy: polling_strategy_to_wire(strategy), + count, + auto_commit, + }; let response = self - .send_raw_with_response( - POLL_MESSAGES_CODE, - PollMessages::bytes( - stream_id, - topic_id, - partition_id, - consumer, - strategy, - count, - auto_commit, - ), - ) + .send_raw_with_response(POLL_MESSAGES_CODE, req.to_bytes()) .await?; PolledMessages::from_bytes(response) } @@ -66,11 +69,34 @@ impl<B: BinaryClient> MessageClient for B { messages: &mut [IggyMessage], ) -> Result<(), IggyError> { fail_if_not_authenticated(self).await?; - self.send_raw_with_response( - SEND_MESSAGES_CODE, - SendMessages::bytes(stream_id, topic_id, partitioning, messages), - ) - .await?; + let wire_stream_id = identifier_to_wire(stream_id)?; + let wire_topic_id = identifier_to_wire(topic_id)?; + let wire_partitioning = partitioning_to_wire(partitioning); + let raw_messages: Vec<RawMessage<'_>> = messages + .iter() + .map(|m| RawMessage { + id: m.header.id, + origin_timestamp: m.header.origin_timestamp, + headers: m.user_headers.as_deref(), + payload: &m.payload, + }) + .collect(); + let size = SendMessagesEncoder::encoded_size( + &wire_stream_id, + &wire_topic_id, + &wire_partitioning, + &raw_messages, + ); + let mut buf = BytesMut::with_capacity(size); + SendMessagesEncoder::encode( + &mut buf, + &wire_stream_id, + &wire_topic_id, + &wire_partitioning, + &raw_messages, + ); + self.send_raw_with_response(SEND_MESSAGES_CODE, buf.freeze()) + .await?; Ok(()) } diff --git a/core/common/src/traits/bytes_serializable.rs b/core/common/src/traits/bytes_serializable.rs deleted file mode 100644 index 06bbcaba2..000000000 --- a/core/common/src/traits/bytes_serializable.rs +++ /dev/null @@ -1,41 +0,0 @@ -/* Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -use crate::IggyError; -use bytes::{Bytes, BytesMut}; - -/// The trait represents the logic responsible for serializing and deserializing the struct to and from bytes. -pub trait BytesSerializable { - /// Serializes the struct to bytes. - fn to_bytes(&self) -> Bytes; - - /// Deserializes the struct from bytes. - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> - where - Self: Sized; - - /// Write the struct to a buffer. - fn write_to_buffer(&self, _buf: &mut BytesMut) { - unimplemented!(); - } - - /// Get the byte-size of the struct. - fn get_buffer_size(&self) -> usize { - unimplemented!(); - } -} diff --git a/core/common/src/traits/mod.rs b/core/common/src/traits/mod.rs index 7e30f99f1..1621fdb8a 100644 --- a/core/common/src/traits/mod.rs +++ b/core/common/src/traits/mod.rs @@ -19,7 +19,6 @@ pub(crate) mod binary_auth; pub(crate) mod binary_client; mod binary_impls; pub(crate) mod binary_transport; -pub(crate) mod bytes_serializable; pub(crate) mod client; pub(crate) mod cluster_client; pub(crate) mod consumer_group_client; diff --git a/core/common/src/types/consumer/consumer_kind.rs b/core/common/src/types/consumer/consumer_kind.rs index 65a4ffd3e..96b3fbb7b 100644 --- a/core/common/src/types/consumer/consumer_kind.rs +++ b/core/common/src/types/consumer/consumer_kind.rs @@ -16,11 +16,9 @@ * under the License. */ -use crate::BytesSerializable; use crate::Identifier; use crate::Validatable; use crate::error::IggyError; -use bytes::{BufMut, Bytes, BytesMut}; use clap::ValueEnum; use serde::{Deserialize, Deserializer, Serialize}; use std::fmt::Display; @@ -93,31 +91,6 @@ impl Consumer { } } -impl BytesSerializable for Consumer { - fn to_bytes(&self) -> Bytes { - let id_bytes = self.id.to_bytes(); - let mut bytes = BytesMut::with_capacity(1 + id_bytes.len()); - bytes.put_u8(self.kind.as_code()); - bytes.put_slice(&id_bytes); - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> - where - Self: Sized, - { - if bytes.len() < 4 { - return Err(IggyError::InvalidCommand); - } - - let kind = ConsumerKind::from_code(bytes[0])?; - let id = Identifier::from_bytes(bytes.slice(1..))?; - let consumer = Consumer { kind, id }; - consumer.validate()?; - Ok(consumer) - } -} - /// `ConsumerKind` is an enum that represents the type of consumer. impl ConsumerKind { /// Returns the code of the `ConsumerKind`. diff --git a/core/common/src/types/identifier/mod.rs b/core/common/src/types/identifier/mod.rs index 82c41eb8a..e80a39215 100644 --- a/core/common/src/types/identifier/mod.rs +++ b/core/common/src/types/identifier/mod.rs @@ -16,12 +16,10 @@ * under the License. */ -use crate::BytesSerializable; use crate::Sizeable; use crate::Validatable; use crate::error::IggyError; use crate::utils::byte_size::IggyByteSize; -use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use serde_with::base64::Base64; use serde_with::serde_as; @@ -215,46 +213,6 @@ impl Sizeable for Identifier { } } -impl BytesSerializable for Identifier { - fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity(2 + self.length as usize); - bytes.put_u8(self.kind.as_code()); - bytes.put_u8(self.length); - bytes.put_slice(&self.value); - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> - where - Self: Sized, - { - let kind = IdKind::from_code(*bytes.first().ok_or(IggyError::InvalidIdentifier)?)?; - let length = *bytes.get(1).ok_or(IggyError::InvalidIdentifier)?; - let value = bytes - .get(2..2 + length as usize) - .ok_or(IggyError::InvalidIdentifier)? - .to_vec(); - - let identifier = Identifier { - kind, - length, - value, - }; - identifier.validate()?; - Ok(identifier) - } - - fn write_to_buffer(&self, bytes: &mut BytesMut) { - bytes.put_u8(self.kind.as_code()); - bytes.put_u8(self.length); - bytes.put_slice(&self.value); - } - - fn get_buffer_size(&self) -> usize { - 2 + self.length as usize - } -} - impl IdKind { /// Returns the code of the identifier kind. pub fn as_code(&self) -> u8 { @@ -378,33 +336,6 @@ mod tests { assert!(Identifier::named(&"a".repeat(256)).is_err()); } - #[test] - fn from_bytes_should_fail_on_empty_input() { - assert!(Identifier::from_bytes(Bytes::new()).is_err()); - } - - #[test] - fn from_bytes_should_fail_on_truncated_input() { - let id = Identifier::numeric(42).unwrap(); - let bytes = id.to_bytes(); - for i in 0..bytes.len() - 1 { - let truncated = bytes.slice(..i); - assert!( - Identifier::from_bytes(truncated).is_err(), - "expected error for truncation at byte {i}" - ); - } - } - - #[test] - fn from_bytes_should_fail_on_corrupted_length() { - let mut buf = BytesMut::new(); - buf.put_u8(1); // Numeric kind - buf.put_u8(255); // length = 255 but only 2 bytes of value follow - buf.put_u16_le(0); - assert!(Identifier::from_bytes(buf.freeze()).is_err()); - } - #[test] fn from_raw_bytes_should_fail_on_empty_input() { assert!(Identifier::from_raw_bytes(&[]).is_err()); @@ -413,10 +344,11 @@ mod tests { #[test] fn from_raw_bytes_should_fail_on_truncated_input() { let id = Identifier::numeric(42).unwrap(); - let bytes = id.to_bytes(); - for i in 0..bytes.len() - 1 { + let mut full = vec![id.kind.as_code(), id.length]; + full.extend_from_slice(&id.value); + for i in 0..full.len() - 1 { assert!( - Identifier::from_raw_bytes(&bytes[..i]).is_err(), + Identifier::from_raw_bytes(&full[..i]).is_err(), "expected error for truncation at byte {i}" ); } diff --git a/core/common/src/types/message/iggy_message.rs b/core/common/src/types/message/iggy_message.rs index 4fc214d28..914fe27dd 100644 --- a/core/common/src/types/message/iggy_message.rs +++ b/core/common/src/types/message/iggy_message.rs @@ -17,8 +17,7 @@ */ use super::message_header::{IGGY_MESSAGE_HEADER_SIZE, IggyMessageHeader}; -use super::user_headers::get_user_headers_size; -use crate::BytesSerializable; +use super::user_headers::{get_user_headers_size, user_headers_from_bytes, user_headers_to_bytes}; use crate::Sizeable; use crate::error::IggyError; use crate::utils::byte_size::IggyByteSize; @@ -193,7 +192,7 @@ impl IggyMessage { reserved: 0, }; - let user_headers = user_headers.map(|h| h.to_bytes()); + let user_headers = user_headers.map(|h| user_headers_to_bytes(&h)); Ok(Self { header, @@ -236,7 +235,7 @@ impl IggyMessage { /// ``` pub fn user_headers_map(&self) -> Result<Option<HashMap<HeaderKey, HeaderValue>>, IggyError> { if let Some(user_headers) = &self.user_headers { - match HashMap::<HeaderKey, HeaderValue>::from_bytes(user_headers.clone()) { + match user_headers_from_bytes(user_headers.clone()) { Ok(h) => Ok(Some(h)), Err(e) => { warn!( @@ -354,70 +353,8 @@ impl IggyMessage { pub fn payload_as_string(&self) -> Result<String, IggyError> { String::from_utf8(self.payload.to_vec()).map_err(|_| IggyError::InvalidUtf8) } -} - -impl FromStr for IggyMessage { - type Err = IggyError; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - Self::builder().payload(Bytes::from(s.to_owned())).build() - } -} - -impl std::fmt::Display for IggyMessage { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match String::from_utf8(self.payload.to_vec()) { - Ok(payload) => { - write!( - f, - "[{offset}] ID:{id} '{preview}'", - offset = self.header.offset, - id = self.header.id, - preview = if payload.len() > 50 { - format!("{}... ({}B)", &payload[..47], self.payload.len()) - } else { - payload - } - ) - } - Err(_) => { - write!( - f, - "[{offset}] ID:{id} <binary {payload_len}B>", - offset = self.header.offset, - id = self.header.id, - payload_len = self.payload.len() - ) - } - } - } -} - -impl Sizeable for IggyMessage { - fn get_size_bytes(&self) -> IggyByteSize { - let message_header_len = IGGY_MESSAGE_HEADER_SIZE; - let payload_len = self.payload.len(); - let user_headers_len = self.user_headers.as_ref().map(|h| h.len()).unwrap_or(0); - - #[cfg(debug_assertions)] - { - assert_eq!( - user_headers_len, self.header.user_headers_length as usize, - "user_headers.len() != header.user_headers_length" - ); - - assert_eq!( - payload_len, self.header.payload_length as usize, - "payload.len() != header.payload_length" - ) - } - - IggyByteSize::from((message_header_len + payload_len + user_headers_len) as u64) - } -} -impl BytesSerializable for IggyMessage { - fn to_bytes(&self) -> Bytes { + pub fn to_bytes(&self) -> Bytes { let mut bytes = BytesMut::with_capacity(self.get_size_bytes().as_bytes_usize()); let message_header = self.header.to_bytes(); bytes.put_slice(&message_header); @@ -428,7 +365,7 @@ impl BytesSerializable for IggyMessage { bytes.freeze() } - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { + pub fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { if bytes.len() < IGGY_MESSAGE_HEADER_SIZE { return Err(IggyError::InvalidCommand); } @@ -478,7 +415,7 @@ impl BytesSerializable for IggyMessage { }) } - fn write_to_buffer(&self, buf: &mut BytesMut) { + pub fn write_to_buffer(&self, buf: &mut BytesMut) { buf.put_slice(&self.header.to_bytes()); buf.put_slice(&self.payload); if let Some(user_headers) = &self.user_headers { @@ -487,6 +424,66 @@ impl BytesSerializable for IggyMessage { } } +impl FromStr for IggyMessage { + type Err = IggyError; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + Self::builder().payload(Bytes::from(s.to_owned())).build() + } +} + +impl std::fmt::Display for IggyMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match String::from_utf8(self.payload.to_vec()) { + Ok(payload) => { + write!( + f, + "[{offset}] ID:{id} '{preview}'", + offset = self.header.offset, + id = self.header.id, + preview = if payload.len() > 50 { + format!("{}... ({}B)", &payload[..47], self.payload.len()) + } else { + payload + } + ) + } + Err(_) => { + write!( + f, + "[{offset}] ID:{id} <binary {payload_len}B>", + offset = self.header.offset, + id = self.header.id, + payload_len = self.payload.len() + ) + } + } + } +} + +impl Sizeable for IggyMessage { + fn get_size_bytes(&self) -> IggyByteSize { + let message_header_len = IGGY_MESSAGE_HEADER_SIZE; + let payload_len = self.payload.len(); + let user_headers_len = self.user_headers.as_ref().map(|h| h.len()).unwrap_or(0); + + #[cfg(debug_assertions)] + { + assert_eq!( + user_headers_len, self.header.user_headers_length as usize, + "user_headers.len() != header.user_headers_length" + ); + + assert_eq!( + payload_len, self.header.payload_length as usize, + "payload.len() != header.payload_length" + ) + } + + IggyByteSize::from((message_header_len + payload_len + user_headers_len) as u64) + } +} + impl From<IggyMessage> for Bytes { fn from(message: IggyMessage) -> Self { message.to_bytes() @@ -652,7 +649,7 @@ impl<'de> Deserialize<'de> for IggyMessage { let user_headers_bytes = if let Some(raw) = raw_user_headers { Some(raw) } else { - user_headers.map(|headers| headers.to_bytes()) + user_headers.map(|headers| user_headers_to_bytes(&headers)) }; let user_headers_length = user_headers_bytes diff --git a/core/common/src/types/message/message_header.rs b/core/common/src/types/message/message_header.rs index 0fc1121c3..8af4dbe51 100644 --- a/core/common/src/types/message/message_header.rs +++ b/core/common/src/types/message/message_header.rs @@ -16,7 +16,7 @@ * under the License. */ -use crate::{BytesSerializable, Sizeable, error::IggyError, utils::byte_size::IggyByteSize}; +use crate::{Sizeable, error::IggyError, utils::byte_size::IggyByteSize}; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use std::ops::Range; @@ -107,10 +107,8 @@ impl IggyMessageHeader { }, }) } -} -impl BytesSerializable for IggyMessageHeader { - fn to_bytes(&self) -> Bytes { + pub fn to_bytes(&self) -> Bytes { let mut bytes = BytesMut::with_capacity(self.get_size_bytes().as_bytes_usize()); bytes.put_u64_le(self.checksum); bytes.put_u128_le(self.id); @@ -123,7 +121,7 @@ impl BytesSerializable for IggyMessageHeader { bytes.freeze() } - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { + pub fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { if bytes.len() != IGGY_MESSAGE_HEADER_SIZE { return Err(IggyError::InvalidCommand); } diff --git a/core/common/src/types/message/message_view.rs b/core/common/src/types/message/message_view.rs index 23a63e711..c3db53154 100644 --- a/core/common/src/types/message/message_view.rs +++ b/core/common/src/types/message/message_view.rs @@ -19,7 +19,7 @@ use super::HeaderValue; use super::message_boundaries::IggyMessageBoundaries; use super::message_header::*; -use crate::BytesSerializable; +use super::user_headers::user_headers_from_bytes; use crate::IggyByteSize; use crate::Sizeable; use crate::error::IggyError; @@ -90,7 +90,7 @@ impl<'a> IggyMessageView<'a> { if let Some(headers) = self.user_headers() { let headers_bytes = Bytes::copy_from_slice(headers); - match HashMap::<HeaderKey, HeaderValue>::from_bytes(headers_bytes) { + match user_headers_from_bytes(headers_bytes) { Ok(h) => Ok(Some(h)), Err(e) => { tracing::error!( diff --git a/core/common/src/types/message/messages_batch.rs b/core/common/src/types/message/messages_batch.rs index 6346af08c..176376d4e 100644 --- a/core/common/src/types/message/messages_batch.rs +++ b/core/common/src/types/message/messages_batch.rs @@ -18,8 +18,8 @@ use super::message_boundaries::IggyMessageBoundaries; use crate::{ - BytesSerializable, INDEX_SIZE, IggyByteSize, IggyIndexes, IggyMessage, IggyMessageView, - IggyMessageViewIterator, MAX_PAYLOAD_SIZE, Sizeable, Validatable, error::IggyError, + INDEX_SIZE, IggyByteSize, IggyIndexes, IggyMessage, IggyMessageView, IggyMessageViewIterator, + MAX_PAYLOAD_SIZE, Sizeable, Validatable, error::IggyError, }; use bytes::{BufMut, Bytes, BytesMut}; use std::ops::{Deref, Index}; diff --git a/core/common/src/types/message/messages_batch_mut.rs b/core/common/src/types/message/messages_batch_mut.rs index 78ba9b3c0..ee0333bc4 100644 --- a/core/common/src/types/message/messages_batch_mut.rs +++ b/core/common/src/types/message/messages_batch_mut.rs @@ -20,9 +20,9 @@ use super::indexes_mut::IggyIndexesMut; use super::message_boundaries::IggyMessageBoundaries; use super::message_view_mut::IggyMessageViewMutIterator; use crate::{ - BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, IggyError, - IggyIndexView, IggyMessage, IggyMessageView, IggyMessageViewIterator, IggyMessagesBatch, - IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, Validatable, + IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, IggyByteSize, IggyError, IggyIndexView, IggyMessage, + IggyMessageView, IggyMessageViewIterator, IggyMessagesBatch, IggyTimestamp, MAX_PAYLOAD_SIZE, + MAX_USER_HEADERS_SIZE, Sizeable, Validatable, }; use crate::{MessageDeduplicator, PooledBuffer, random_id}; use lending_iterator::prelude::*; diff --git a/core/common/src/types/message/mod.rs b/core/common/src/types/message/mod.rs index 281cc3273..bd5688926 100644 --- a/core/common/src/types/message/mod.rs +++ b/core/common/src/types/message/mod.rs @@ -71,5 +71,6 @@ pub use polling_kind::PollingKind; pub use polling_strategy::PollingStrategy; pub use user_headers::{ HeaderEntry, HeaderField, HeaderKey, HeaderKind, HeaderValue, KeyMarker, UserHeaders, - ValueMarker, deserialize_headers, serialize_headers, + ValueMarker, deserialize_headers, serialize_headers, user_headers_from_bytes, + user_headers_to_bytes, }; diff --git a/core/common/src/types/message/partitioning.rs b/core/common/src/types/message/partitioning.rs index f0b2af3b0..5d80273d5 100644 --- a/core/common/src/types/message/partitioning.rs +++ b/core/common/src/types/message/partitioning.rs @@ -17,8 +17,7 @@ */ use super::PartitioningKind; -use crate::{BytesSerializable, IggyByteSize, Sizeable, error::IggyError}; -use bytes::{BufMut, Bytes, BytesMut}; +use crate::{IggyByteSize, Sizeable, error::IggyError}; use serde::{Deserialize, Serialize}; use serde_with::base64::Base64; use serde_with::serde_as; @@ -187,33 +186,6 @@ impl Sizeable for Partitioning { } } -impl BytesSerializable for Partitioning { - fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity(2 + self.length as usize); - bytes.put_u8(self.kind.as_code()); - bytes.put_u8(self.length); - bytes.put_slice(&self.value); - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> - where - Self: Sized, - { - Self::from_raw_bytes(&bytes) - } - - fn write_to_buffer(&self, bytes: &mut BytesMut) { - bytes.put_u8(self.kind.as_code()); - bytes.put_u8(self.length); - bytes.put_slice(&self.value); - } - - fn get_buffer_size(&self) -> usize { - 2 + self.length as usize - } -} - #[cfg(test)] mod tests { use super::*; @@ -240,24 +212,11 @@ mod tests { assert_eq!(p.value, 42u32.to_le_bytes()); } - #[test] - fn from_bytes_should_reject_partition_id_with_wrong_length() { - for bad_len in [0u8, 1, 2, 3, 5] { - let mut buf = BytesMut::new(); - buf.put_u8(PartitioningKind::PartitionId.as_code()); - buf.put_u8(bad_len); - buf.extend(vec![0u8; bad_len as usize]); - assert!( - Partitioning::from_bytes(buf.freeze()).is_err(), - "expected error for PartitionId with length={bad_len}" - ); - } - } - #[test] fn from_raw_bytes_should_fail_on_truncated_input() { let p = Partitioning::partition_id(99); - let full = p.to_bytes(); + let mut full = vec![p.kind.as_code(), p.length]; + full.extend_from_slice(&p.value); for i in 0..full.len() { assert!( Partitioning::from_raw_bytes(&full[..i]).is_err(), @@ -277,14 +236,6 @@ mod tests { assert!(s.contains("<invalid>")); } - #[test] - fn round_trip_partition_id() { - let original = Partitioning::partition_id(12345); - let bytes = original.to_bytes(); - let restored = Partitioning::from_bytes(bytes).unwrap(); - assert_eq!(original, restored); - } - #[test] fn from_raw_bytes_should_reject_balanced_with_nonzero_length() { for bad_len in [1u8, 2, 4, 255] { @@ -296,20 +247,4 @@ mod tests { ); } } - - #[test] - fn round_trip_balanced() { - let original = Partitioning::balanced(); - let bytes = original.to_bytes(); - let restored = Partitioning::from_bytes(bytes).unwrap(); - assert_eq!(original, restored); - } - - #[test] - fn round_trip_messages_key() { - let original = Partitioning::messages_key(b"my-key").unwrap(); - let bytes = original.to_bytes(); - let restored = Partitioning::from_bytes(bytes).unwrap(); - assert_eq!(original, restored); - } } diff --git a/core/common/src/types/message/polled_messages.rs b/core/common/src/types/message/polled_messages.rs index 208de1f0a..198f78cd7 100644 --- a/core/common/src/types/message/polled_messages.rs +++ b/core/common/src/types/message/polled_messages.rs @@ -16,9 +16,7 @@ * under the License. */ -use crate::{ - BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, IggyMessage, IggyMessageHeader, error::IggyError, -}; +use crate::{IGGY_MESSAGE_HEADER_SIZE, IggyMessage, IggyMessageHeader, error::IggyError}; use bytes::Bytes; use serde::{Deserialize, Serialize}; use tracing::error; diff --git a/core/common/src/types/message/polling_strategy.rs b/core/common/src/types/message/polling_strategy.rs index 35052a188..5035087a9 100644 --- a/core/common/src/types/message/polling_strategy.rs +++ b/core/common/src/types/message/polling_strategy.rs @@ -16,11 +16,8 @@ * under the License. */ -use crate::BytesSerializable; -use crate::error::IggyError; use crate::types::message::polling_kind::PollingKind; use crate::utils::timestamp::IggyTimestamp; -use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use serde_with::{DisplayFromStr, serde_as}; use std::fmt::Display; @@ -121,27 +118,3 @@ impl PollingStrategy { PollingKind::default() } } - -impl BytesSerializable for PollingStrategy { - fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity(9); - bytes.put_u8(self.kind.as_code()); - bytes.put_u64_le(self.value); - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { - if bytes.len() != 9 { - return Err(IggyError::InvalidCommand); - } - - let kind = PollingKind::from_code(bytes[0])?; - let value = u64::from_le_bytes( - bytes[1..9] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - let strategy = PollingStrategy { kind, value }; - Ok(strategy) - } -} diff --git a/core/common/src/types/message/user_headers.rs b/core/common/src/types/message/user_headers.rs index aaec4ffe0..396608199 100644 --- a/core/common/src/types/message/user_headers.rs +++ b/core/common/src/types/message/user_headers.rs @@ -16,7 +16,6 @@ * under the License. */ -use crate::BytesSerializable; use crate::error::IggyError; use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; @@ -930,103 +929,98 @@ impl<T> TryFrom<&HeaderField<T>> for Vec<u8> { } } -impl BytesSerializable for HashMap<HeaderKey, HeaderValue> { - fn to_bytes(&self) -> Bytes { - if self.is_empty() { - return Bytes::new(); - } - - let mut bytes = BytesMut::new(); - for (key, value) in self { - bytes.put_u8(key.kind().as_code()); - #[allow(clippy::cast_possible_truncation)] - bytes.put_u32_le(key.as_bytes().len() as u32); - bytes.put_slice(key.as_bytes()); - bytes.put_u8(value.kind().as_code()); - #[allow(clippy::cast_possible_truncation)] - bytes.put_u32_le(value.as_bytes().len() as u32); - bytes.put_slice(value.as_bytes()); - } +pub fn user_headers_to_bytes(headers: &HashMap<HeaderKey, HeaderValue>) -> Bytes { + if headers.is_empty() { + return Bytes::new(); + } - bytes.freeze() + let mut bytes = BytesMut::new(); + for (key, value) in headers { + bytes.put_u8(key.kind().as_code()); + #[allow(clippy::cast_possible_truncation)] + bytes.put_u32_le(key.as_bytes().len() as u32); + bytes.put_slice(key.as_bytes()); + bytes.put_u8(value.kind().as_code()); + #[allow(clippy::cast_possible_truncation)] + bytes.put_u32_le(value.as_bytes().len() as u32); + bytes.put_slice(value.as_bytes()); } - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> - where - Self: Sized, - { - if bytes.is_empty() { - return Ok(Self::new()); - } + bytes.freeze() +} - let mut headers = Self::new(); - let mut position = 0; - while position < bytes.len() { - let key_kind = HeaderKind::from_code(bytes[position])?; - position += 1; +pub fn user_headers_from_bytes(bytes: Bytes) -> Result<HashMap<HeaderKey, HeaderValue>, IggyError> { + if bytes.is_empty() { + return Ok(HashMap::new()); + } - if position + 4 > bytes.len() { - return Err(IggyError::InvalidHeaderKey); - } - let key_length = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ) as usize; - if key_length == 0 || key_length > 255 { - return Err(IggyError::InvalidHeaderKey); - } - position += 4; + let mut headers = HashMap::new(); + let mut position = 0; + while position < bytes.len() { + let key_kind = HeaderKind::from_code(bytes[position])?; + position += 1; - if position + key_length > bytes.len() { - return Err(IggyError::InvalidHeaderKey); - } - if let Some(expected) = key_kind.expected_size() - && key_length != expected - { - return Err(IggyError::InvalidHeaderKey); - } - let key_value = bytes[position..position + key_length].to_vec(); - position += key_length; + if position + 4 > bytes.len() { + return Err(IggyError::InvalidHeaderKey); + } + let key_length = u32::from_le_bytes( + bytes[position..position + 4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ) as usize; + if key_length == 0 || key_length > 255 { + return Err(IggyError::InvalidHeaderKey); + } + position += 4; - if position >= bytes.len() { - return Err(IggyError::InvalidHeaderValue); - } - let value_kind = HeaderKind::from_code(bytes[position])?; - position += 1; + if position + key_length > bytes.len() { + return Err(IggyError::InvalidHeaderKey); + } + if let Some(expected) = key_kind.expected_size() + && key_length != expected + { + return Err(IggyError::InvalidHeaderKey); + } + let key_value = bytes[position..position + key_length].to_vec(); + position += key_length; - if position + 4 > bytes.len() { - return Err(IggyError::InvalidHeaderValue); - } - let value_length = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ) as usize; - if value_length == 0 || value_length > 255 { - return Err(IggyError::InvalidHeaderValue); - } - position += 4; + if position >= bytes.len() { + return Err(IggyError::InvalidHeaderValue); + } + let value_kind = HeaderKind::from_code(bytes[position])?; + position += 1; - if position + value_length > bytes.len() { - return Err(IggyError::InvalidHeaderValue); - } - if let Some(expected) = value_kind.expected_size() - && value_length != expected - { - return Err(IggyError::InvalidHeaderValue); - } - let value_value = bytes[position..position + value_length].to_vec(); - position += value_length; + if position + 4 > bytes.len() { + return Err(IggyError::InvalidHeaderValue); + } + let value_length = u32::from_le_bytes( + bytes[position..position + 4] + .try_into() + .map_err(|_| IggyError::InvalidNumberEncoding)?, + ) as usize; + if value_length == 0 || value_length > 255 { + return Err(IggyError::InvalidHeaderValue); + } + position += 4; - headers.insert( - HeaderKey::new_unchecked(key_kind, &key_value), - HeaderValue::new_unchecked(value_kind, &value_value), - ); + if position + value_length > bytes.len() { + return Err(IggyError::InvalidHeaderValue); } + if let Some(expected) = value_kind.expected_size() + && value_length != expected + { + return Err(IggyError::InvalidHeaderValue); + } + let value_value = bytes[position..position + value_length].to_vec(); + position += value_length; - Ok(headers) + headers.insert( + HeaderKey::new_unchecked(key_kind, &key_value), + HeaderValue::new_unchecked(value_kind, &value_value), + ); } + + Ok(headers) } pub fn get_user_headers_size(headers: &Option<HashMap<HeaderKey, HeaderValue>>) -> Option<u32> { @@ -1370,7 +1364,7 @@ mod tests { headers.insert(HeaderKey::try_from("key 1").unwrap(), 12345u64.into()); headers.insert(HeaderKey::try_from("key_3").unwrap(), true.into()); - let bytes = headers.to_bytes(); + let bytes = user_headers_to_bytes(&headers); let mut position = 0; let mut headers_count = 0; @@ -1427,7 +1421,7 @@ mod tests { bytes.put_slice(&value.value); } - let deserialized_headers = HashMap::<HeaderKey, HeaderValue>::from_bytes(bytes.freeze()); + let deserialized_headers = user_headers_from_bytes(bytes.freeze()); assert!(deserialized_headers.is_ok()); let deserialized_headers = deserialized_headers.unwrap(); @@ -1451,8 +1445,8 @@ mod tests { ); headers.insert(999u64.into(), true.into()); - let bytes = headers.to_bytes(); - let deserialized = HashMap::<HeaderKey, HeaderValue>::from_bytes(bytes).unwrap(); + let bytes = user_headers_to_bytes(&headers); + let deserialized = user_headers_from_bytes(bytes).unwrap(); assert_eq!(deserialized.len(), headers.len()); for (key, value) in &headers { @@ -1661,7 +1655,7 @@ mod tests { bytes.put_u32_le(100); // key_len = 100 (lie!) bytes.put_slice(b"abc"); // only 3 bytes of key data - let result = HashMap::<HeaderKey, HeaderValue>::from_bytes(bytes.freeze()); + let result = user_headers_from_bytes(bytes.freeze()); assert!(result.is_err()); } @@ -1688,7 +1682,7 @@ mod tests { bytes.put_u32_le(4); // value_len = 4 (wrong, should be 2) bytes.put_slice(&[1, 2, 3, 4]); - let result = HashMap::<HeaderKey, HeaderValue>::from_bytes(bytes.freeze()); + let result = user_headers_from_bytes(bytes.freeze()); assert!(result.is_err()); } @@ -1703,7 +1697,7 @@ mod tests { bytes.put_u8(6); // value_kind = Int32 // Missing value length bytes - let result = HashMap::<HeaderKey, HeaderValue>::from_bytes(bytes.freeze()); + let result = user_headers_from_bytes(bytes.freeze()); assert!(result.is_err()); } diff --git a/core/common/src/types/permissions/permissions_global.rs b/core/common/src/types/permissions/permissions_global.rs index b0024e523..42f670715 100644 --- a/core/common/src/types/permissions/permissions_global.rs +++ b/core/common/src/types/permissions/permissions_global.rs @@ -16,9 +16,6 @@ * under the License. */ -use crate::BytesSerializable; -use crate::error::IggyError; -use bytes::{Buf, BufMut, Bytes, BytesMut}; use comfy_table::Table; use comfy_table::presets::ASCII_NO_BORDERS; use serde::{Deserialize, Serialize}; @@ -219,150 +216,6 @@ impl Display for Permissions { } } -impl BytesSerializable for Permissions { - fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::new(); - bytes.put_u8(if self.global.manage_servers { 1 } else { 0 }); - bytes.put_u8(if self.global.read_servers { 1 } else { 0 }); - bytes.put_u8(if self.global.manage_users { 1 } else { 0 }); - bytes.put_u8(if self.global.read_users { 1 } else { 0 }); - bytes.put_u8(if self.global.manage_streams { 1 } else { 0 }); - bytes.put_u8(if self.global.read_streams { 1 } else { 0 }); - bytes.put_u8(if self.global.manage_topics { 1 } else { 0 }); - bytes.put_u8(if self.global.read_topics { 1 } else { 0 }); - bytes.put_u8(if self.global.poll_messages { 1 } else { 0 }); - bytes.put_u8(if self.global.send_messages { 1 } else { 0 }); - if let Some(streams) = self.streams.as_ref().filter(|s| !s.is_empty()) { - bytes.put_u8(1); - let streams_count = streams.len(); - let mut current_stream = 1; - for (stream_id, stream) in streams { - bytes.put_u32_le(*stream_id as u32); - bytes.put_u8(if stream.manage_stream { 1 } else { 0 }); - bytes.put_u8(if stream.read_stream { 1 } else { 0 }); - bytes.put_u8(if stream.manage_topics { 1 } else { 0 }); - bytes.put_u8(if stream.read_topics { 1 } else { 0 }); - bytes.put_u8(if stream.poll_messages { 1 } else { 0 }); - bytes.put_u8(if stream.send_messages { 1 } else { 0 }); - if let Some(topics) = stream.topics.as_ref().filter(|t| !t.is_empty()) { - bytes.put_u8(1); - let topics_count = topics.len(); - let mut current_topic = 1; - for (topic_id, topic) in topics { - bytes.put_u32_le(*topic_id as u32); - bytes.put_u8(if topic.manage_topic { 1 } else { 0 }); - bytes.put_u8(if topic.read_topic { 1 } else { 0 }); - bytes.put_u8(if topic.poll_messages { 1 } else { 0 }); - bytes.put_u8(if topic.send_messages { 1 } else { 0 }); - if current_topic < topics_count { - current_topic += 1; - bytes.put_u8(1); - } else { - bytes.put_u8(0); - } - } - } else { - bytes.put_u8(0); - } - if current_stream < streams_count { - current_stream += 1; - bytes.put_u8(1); - } else { - bytes.put_u8(0); - } - } - } else { - bytes.put_u8(0); - } - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> - where - Self: Sized, - { - let mut bytes = bytes; - let manage_servers = bytes.get_u8() == 1; - let read_servers = bytes.get_u8() == 1; - let manage_users = bytes.get_u8() == 1; - let read_users = bytes.get_u8() == 1; - let manage_streams = bytes.get_u8() == 1; - let read_streams = bytes.get_u8() == 1; - let manage_topics = bytes.get_u8() == 1; - let read_topics = bytes.get_u8() == 1; - let poll_messages = bytes.get_u8() == 1; - let send_messages = bytes.get_u8() == 1; - let mut streams = None; - if bytes.get_u8() == 1 { - let mut streams_map = BTreeMap::new(); - loop { - let stream_id = bytes.get_u32_le() as usize; - let manage_stream = bytes.get_u8() == 1; - let read_stream = bytes.get_u8() == 1; - let manage_topics = bytes.get_u8() == 1; - let read_topics = bytes.get_u8() == 1; - let poll_messages = bytes.get_u8() == 1; - let send_messages = bytes.get_u8() == 1; - let mut topics = None; - if bytes.get_u8() == 1 { - let mut topics_map = BTreeMap::new(); - loop { - let topic_id = bytes.get_u32_le() as usize; - let manage_topic = bytes.get_u8() == 1; - let read_topic = bytes.get_u8() == 1; - let poll_messages = bytes.get_u8() == 1; - let send_messages = bytes.get_u8() == 1; - topics_map.insert( - topic_id, - TopicPermissions { - manage_topic, - read_topic, - poll_messages, - send_messages, - }, - ); - if bytes.get_u8() == 0 { - break; - } - } - topics = Some(topics_map); - } - streams_map.insert( - stream_id, - StreamPermissions { - manage_stream, - read_stream, - manage_topics, - read_topics, - poll_messages, - send_messages, - topics, - }, - ); - if bytes.get_u8() == 0 { - break; - } - } - streams = Some(streams_map); - } - Ok(Self { - global: GlobalPermissions { - manage_servers, - read_servers, - manage_users, - read_users, - manage_streams, - read_streams, - manage_topics, - read_topics, - poll_messages, - send_messages, - }, - streams, - }) - } -} - impl From<GlobalPermissions> for Table { fn from(value: GlobalPermissions) -> Self { let mut table = Self::new(); @@ -470,114 +323,3 @@ impl From<&StreamPermissions> for Table { table } } - -#[cfg(test)] -mod tests { - use super::*; - - #[test] - fn should_be_serialized_and_deserialized_from_bytes() { - let permissions = Permissions { - global: GlobalPermissions { - manage_servers: true, - read_servers: true, - manage_users: true, - read_users: true, - manage_streams: false, - read_streams: true, - manage_topics: false, - read_topics: true, - poll_messages: true, - send_messages: true, - }, - streams: Some(BTreeMap::from([ - ( - 1, - StreamPermissions { - manage_stream: true, - read_stream: true, - manage_topics: true, - read_topics: true, - poll_messages: true, - send_messages: true, - topics: Some(BTreeMap::from([ - ( - 1, - TopicPermissions { - manage_topic: true, - read_topic: true, - poll_messages: true, - send_messages: true, - }, - ), - ( - 2, - TopicPermissions { - manage_topic: true, - read_topic: false, - poll_messages: true, - send_messages: false, - }, - ), - ])), - }, - ), - ( - 2, - StreamPermissions { - manage_stream: false, - read_stream: true, - manage_topics: false, - read_topics: true, - poll_messages: true, - send_messages: true, - topics: None, - }, - ), - ])), - }; - - let bytes = permissions.to_bytes(); - let deserialized_permissions = Permissions::from_bytes(bytes).unwrap(); - - assert_eq!(permissions, deserialized_permissions); - } - - #[test] - fn should_handle_empty_streams_map() { - let permissions = Permissions { - global: GlobalPermissions::default(), - streams: Some(BTreeMap::new()), - }; - - let bytes = permissions.to_bytes(); - let deserialized = Permissions::from_bytes(bytes).unwrap(); - - assert!(deserialized.streams.is_none()); - } - - #[test] - fn should_handle_empty_topics_map() { - let permissions = Permissions { - global: GlobalPermissions::default(), - streams: Some(BTreeMap::from([( - 1, - StreamPermissions { - manage_stream: true, - read_stream: true, - manage_topics: false, - read_topics: false, - poll_messages: false, - send_messages: false, - topics: Some(BTreeMap::new()), - }, - )])), - }; - - let bytes = permissions.to_bytes(); - let deserialized = Permissions::from_bytes(bytes).unwrap(); - - let stream = deserialized.streams.as_ref().unwrap().get(&1).unwrap(); - assert!(stream.topics.is_none()); - } -} diff --git a/core/common/src/wire_conversions.rs b/core/common/src/wire_conversions.rs index 343172b61..c5a48916c 100644 --- a/core/common/src/wire_conversions.rs +++ b/core/common/src/wire_conversions.rs @@ -522,6 +522,33 @@ pub fn consumer_to_wire(consumer: &Consumer) -> Result<WireConsumer, IggyError> }) } +/// Convert a domain `PollingStrategy` to `WirePollingStrategy`. +pub fn polling_strategy_to_wire( + strategy: &crate::PollingStrategy, +) -> iggy_binary_protocol::primitives::polling_strategy::WirePollingStrategy { + iggy_binary_protocol::primitives::polling_strategy::WirePollingStrategy { + kind: strategy.kind.as_code(), + value: strategy.value, + } +} + +/// Convert a domain `Partitioning` to `WirePartitioning`. +pub fn partitioning_to_wire( + partitioning: &crate::Partitioning, +) -> iggy_binary_protocol::primitives::partitioning::WirePartitioning { + use iggy_binary_protocol::primitives::partitioning::WirePartitioning; + match partitioning.kind { + crate::PartitioningKind::Balanced => WirePartitioning::Balanced, + crate::PartitioningKind::PartitionId => { + let id = u32::from_le_bytes(partitioning.value[..4].try_into().unwrap()); + WirePartitioning::PartitionId(id) + } + crate::PartitioningKind::MessagesKey => { + WirePartitioning::MessagesKey(partitioning.value.clone()) + } + } +} + // --------------------------------------------------------------------------- // Permissions (wire -> domain) // --------------------------------------------------------------------------- diff --git a/core/integration/tests/server/scenarios/offset_scenario.rs b/core/integration/tests/server/scenarios/offset_scenario.rs index c1544137b..282e355fa 100644 --- a/core/integration/tests/server/scenarios/offset_scenario.rs +++ b/core/integration/tests/server/scenarios/offset_scenario.rs @@ -19,6 +19,7 @@ use super::PARTITION_ID; use bytes::BytesMut; use iggy::prelude::*; +use iggy_common::user_headers_from_bytes; use integration::harness::TestHarness; use std::collections::HashMap; @@ -403,8 +404,7 @@ async fn verify_message_content( ); if let Some(headers) = &msg.user_headers { - let headers_map = - HashMap::<HeaderKey, HeaderValue>::from_bytes(headers.clone()).unwrap(); + let headers_map = user_headers_from_bytes(headers.clone()).unwrap(); assert_eq!(headers_map.len(), 3, "Expected 3 headers"); } } diff --git a/core/integration/tests/server/scenarios/timestamp_scenario.rs b/core/integration/tests/server/scenarios/timestamp_scenario.rs index af6fb7796..661970dd6 100644 --- a/core/integration/tests/server/scenarios/timestamp_scenario.rs +++ b/core/integration/tests/server/scenarios/timestamp_scenario.rs @@ -19,6 +19,7 @@ use super::PARTITION_ID; use bytes::BytesMut; use iggy::prelude::*; +use iggy_common::user_headers_from_bytes; use integration::harness::TestHarness; use std::collections::HashMap; use tokio::time::{Duration, sleep}; @@ -427,8 +428,7 @@ async fn verify_message_content_by_timestamp( ); if let Some(headers) = &msg.user_headers { - let headers_map = - HashMap::<HeaderKey, HeaderValue>::from_bytes(headers.clone()).unwrap(); + let headers_map = user_headers_from_bytes(headers.clone()).unwrap(); assert_eq!(headers_map.len(), 3, "Expected 3 headers"); } } diff --git a/core/sdk/src/prelude.rs b/core/sdk/src/prelude.rs index e7824a06e..72e426ce5 100644 --- a/core/sdk/src/prelude.rs +++ b/core/sdk/src/prelude.rs @@ -49,10 +49,10 @@ pub use crate::stream_builder::{IggyStream, IggyStreamConfig}; pub use crate::tcp::tcp_client::TcpClient; pub use crate::websocket::websocket_client::WebSocketClient; pub use iggy_common::{ - Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, BytesSerializable, CacheMetrics, - CacheMetricsKey, ClientError, ClientInfoDetails, ClusterMetadata, ClusterNode, ClusterNodeRole, - ClusterNodeStatus, CompressionAlgorithm, Consumer, ConsumerGroupDetails, ConsumerKind, - EncryptorKind, GlobalPermissions, HeaderKey, HeaderKind, HeaderValue, HttpClientConfig, + Aes256GcmEncryptor, Args, ArgsOptional, AutoLogin, CacheMetrics, CacheMetricsKey, ClientError, + ClientInfoDetails, ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, + CompressionAlgorithm, Consumer, ConsumerGroupDetails, ConsumerKind, EncryptorKind, + GlobalPermissions, HeaderKey, HeaderKind, HeaderValue, HttpClientConfig, HttpClientConfigBuilder, IdKind, Identifier, IdentityInfo, IggyByteSize, IggyDuration, IggyError, IggyExpiry, IggyIndexView, IggyMessage, IggyMessageHeader, IggyMessageHeaderView, IggyMessageView, IggyMessageViewIterator, IggyTimestamp, MaxTopicSize, Partition, Partitioner, @@ -63,6 +63,7 @@ pub use iggy_common::{ TcpClientReconnectionConfig, Topic, TopicDetails, TopicPermissions, TransportEndpoints, TransportProtocol, UserId, UserStatus, Validatable, WebSocketClientConfig, WebSocketClientConfigBuilder, WebSocketClientReconnectionConfig, defaults, locking, + user_headers_from_bytes, user_headers_to_bytes, }; pub use iggy_common::{ Client, ClusterClient, ConsumerGroupClient, ConsumerOffsetClient, MessageClient, diff --git a/core/server/src/shard/system/messages.rs b/core/server/src/shard/system/messages.rs index 21e342003..82a5d25b0 100644 --- a/core/server/src/shard/system/messages.rs +++ b/core/server/src/shard/system/messages.rs @@ -30,8 +30,7 @@ use iggy_common::IggyPollMetadata; use iggy_common::PooledBuffer; use iggy_common::sharding::IggyNamespace; use iggy_common::{ - BytesSerializable, Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, IggyError, - PollingStrategy, + Consumer, EncryptorKind, IGGY_MESSAGE_HEADER_SIZE, Identifier, IggyError, PollingStrategy, }; use std::sync::atomic::Ordering; use tracing::error; diff --git a/core/server/src/state/models.rs b/core/server/src/state/models.rs index 01a9309df..7d885b29d 100644 --- a/core/server/src/state/models.rs +++ b/core/server/src/state/models.rs @@ -107,7 +107,7 @@ impl Display for CreatePersonalAccessTokenWithHash { } // Wire format for WithId wrappers: id:u32_le | inner_length:u32_le | inner_bytes -// This is identical to the previous BytesSerializable format. +// Wire format for WithId wrappers: id:u32_le | inner_length:u32_le | inner_bytes impl WireEncode for CreateStreamWithId { fn encoded_size(&self) -> usize { diff --git a/examples/rust/src/message-headers/message-compression/consumer/main.rs b/examples/rust/src/message-headers/message-compression/consumer/main.rs index 32104d754..3dc14d308 100644 --- a/examples/rust/src/message-headers/message-compression/consumer/main.rs +++ b/examples/rust/src/message-headers/message-compression/consumer/main.rs @@ -76,7 +76,7 @@ fn handle_payload_compression(msg: &mut ReceivedMessage) -> Result<(), IggyError // remove the compression header since payload is now decompressed if let Ok(Some(mut headers_map)) = msg.message.user_headers_map() { headers_map.remove(&Codec::header_key()); - let headers_bytes = headers_map.to_bytes(); + let headers_bytes = user_headers_to_bytes(&headers_map); msg.message.header.user_headers_length = headers_bytes.len() as u32; msg.message.user_headers = if headers_map.is_empty() { None
