This is an automated email from the ASF dual-hosted git repository. hgruszecki pushed a commit to branch refactor-binary-7-sdk-and-http in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 7fbd367344093b75991daeb5c1ecb53d7d850d4d Author: Hubert Gruszecki <[email protected]> AuthorDate: Fri Mar 27 10:50:45 2026 +0100 refactor(server): remove dead BytesSerializable impls from 10 types BytesSerializable conflated wire protocol encoding with domain type serialization. These 10 impls had no external callers or only contained panicking stubs. Deleted entirely: ClusterMetadata, ClusterNode, ClusterNodeRole, ClusterNodeStatus, TransportEndpoints, TransportProtocol, IggyMessageView, IggyMessageHeaderView, IggyMessagesBatch. Converted PolledMessages::from_bytes to inherent method since it has an active caller in the SDK poll path. --- core/common/src/traits/binary_impls/messages.rs | 4 +- core/common/src/types/cluster/metadata.rs | 95 +-------------------- core/common/src/types/cluster/node.rs | 98 +--------------------- core/common/src/types/cluster/role.rs | 27 +----- core/common/src/types/cluster/status.rs | 27 +----- .../src/types/cluster/transport_endpoints.rs | 69 --------------- core/common/src/types/configuration/transport.rs | 23 +---- .../src/types/message/message_header_view.rs | 25 +----- core/common/src/types/message/message_view.rs | 20 +---- core/common/src/types/message/messages_batch.rs | 20 ----- core/common/src/types/message/polled_messages.rs | 8 +- 11 files changed, 13 insertions(+), 403 deletions(-) diff --git a/core/common/src/traits/binary_impls/messages.rs b/core/common/src/traits/binary_impls/messages.rs index 97c8edd88..623f0af90 100644 --- a/core/common/src/traits/binary_impls/messages.rs +++ b/core/common/src/traits/binary_impls/messages.rs @@ -19,8 +19,8 @@ use crate::BinaryClient; use crate::traits::binary_auth::fail_if_not_authenticated; use crate::wire_conversions::identifier_to_wire; use crate::{ - BytesSerializable, Consumer, Identifier, IggyError, IggyMessage, MessageClient, Partitioning, - PollMessages, PolledMessages, PollingStrategy, SendMessages, + Consumer, Identifier, IggyError, IggyMessage, MessageClient, Partitioning, PollMessages, + PolledMessages, PollingStrategy, SendMessages, }; use iggy_binary_protocol::codec::WireEncode; use iggy_binary_protocol::codes::{ diff --git a/core/common/src/types/cluster/metadata.rs b/core/common/src/types/cluster/metadata.rs index f00f6b7e5..18872aa34 100644 --- a/core/common/src/types/cluster/metadata.rs +++ b/core/common/src/types/cluster/metadata.rs @@ -16,8 +16,7 @@ * under the License. */ -use crate::{BytesSerializable, IggyError, types::cluster::node::ClusterNode}; -use bytes::{BufMut, Bytes, BytesMut}; +use crate::types::cluster::node::ClusterNode; use serde::{Deserialize, Serialize}; use std::fmt::Display; @@ -30,98 +29,6 @@ pub struct ClusterMetadata { pub nodes: Vec<ClusterNode>, } -impl BytesSerializable for ClusterMetadata { - fn to_bytes(&self) -> Bytes { - let name_bytes = self.name.as_bytes(); - - // Calculate size for each node - let nodes_size: usize = self.nodes.iter().map(|node| node.get_buffer_size()).sum(); - let size = 4 + name_bytes.len() + 4 + nodes_size; // name_len + name + nodes_len + nodes - - let mut bytes = BytesMut::with_capacity(size); - - // Write name length and name - bytes.put_u32_le(name_bytes.len() as u32); - bytes.put_slice(name_bytes); - - // Write nodes count - bytes.put_u32_le(self.nodes.len() as u32); - - // Write each node - for node in &self.nodes { - node.write_to_buffer(&mut bytes); - } - - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<ClusterMetadata, IggyError> { - if bytes.len() < 8 { - return Err(IggyError::InvalidCommand); - } - - let mut position = 0; - - // Read name length - let name_len = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ) as usize; - position += 4; - - // Read name - if bytes.len() < position + name_len { - return Err(IggyError::InvalidCommand); - } - let name = String::from_utf8(bytes[position..position + name_len].to_vec()) - .map_err(|_| IggyError::InvalidCommand)?; - position += name_len; - - // Read nodes count - if bytes.len() < position + 4 { - return Err(IggyError::InvalidCommand); - } - let nodes_count = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ) as usize; - position += 4; - - // Read nodes - let mut nodes = Vec::with_capacity(nodes_count); - for _ in 0..nodes_count { - let node = ClusterNode::from_bytes(bytes.slice(position..))?; - position += node.get_buffer_size(); - nodes.push(node); - } - - Ok(ClusterMetadata { name, nodes }) - } - - fn write_to_buffer(&self, buf: &mut BytesMut) { - let name_bytes = self.name.as_bytes(); - - // Write name length and name - buf.put_u32_le(name_bytes.len() as u32); - buf.put_slice(name_bytes); - - // Write nodes count - buf.put_u32_le(self.nodes.len() as u32); - - // Write each node - for node in &self.nodes { - node.write_to_buffer(buf); - } - } - - fn get_buffer_size(&self) -> usize { - let nodes_size: usize = self.nodes.iter().map(|node| node.get_buffer_size()).sum(); - 4 + self.name.len() + 4 + nodes_size // name_len + name + nodes_len + nodes - } -} - impl Display for ClusterMetadata { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let nodes = self diff --git a/core/common/src/types/cluster/node.rs b/core/common/src/types/cluster/node.rs index 63c2dad1d..f8517df81 100644 --- a/core/common/src/types/cluster/node.rs +++ b/core/common/src/types/cluster/node.rs @@ -16,13 +16,9 @@ * under the License. */ -use crate::{ - BytesSerializable, IggyError, - types::cluster::{ - role::ClusterNodeRole, status::ClusterNodeStatus, transport_endpoints::TransportEndpoints, - }, +use crate::types::cluster::{ + role::ClusterNodeRole, status::ClusterNodeStatus, transport_endpoints::TransportEndpoints, }; -use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use std::fmt::Display; @@ -35,96 +31,6 @@ pub struct ClusterNode { pub status: ClusterNodeStatus, } -impl BytesSerializable for ClusterNode { - fn to_bytes(&self) -> Bytes { - let size = self.get_buffer_size(); - let mut bytes = BytesMut::with_capacity(size); - self.write_to_buffer(&mut bytes); - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { - if bytes.len() < 10 { - // Minimum: 4 (name_len) + 4 (ip_len) + 1 (role) + 1 (status) - return Err(IggyError::InvalidCommand); - } - - let mut position = 0; - - // Read name length - let name_len = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ) as usize; - position += 4; - - // Read name - if bytes.len() < position + name_len { - return Err(IggyError::InvalidCommand); - } - let name = String::from_utf8(bytes[position..position + name_len].to_vec()) - .map_err(|_| IggyError::InvalidCommand)?; - position += name_len; - - // Read IP length - let ip_len = u32::from_le_bytes( - bytes[position..position + 4] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ) as usize; - position += 4; - - // Read IP - if bytes.len() < position + ip_len { - return Err(IggyError::InvalidCommand); - } - let ip = String::from_utf8(bytes[position..position + ip_len].to_vec()) - .map_err(|_| IggyError::InvalidCommand)?; - position += ip_len; - - // Read transport endpoints - let endpoints_bytes = bytes.slice(position..); - let endpoints = TransportEndpoints::from_bytes(endpoints_bytes)?; - position += endpoints.get_buffer_size(); - - // Read role - if bytes.len() < position + 1 { - return Err(IggyError::InvalidCommand); - } - let role = ClusterNodeRole::try_from(bytes[position])?; - position += 1; - - // Read status - if bytes.len() < position + 1 { - return Err(IggyError::InvalidCommand); - } - let status = ClusterNodeStatus::try_from(bytes[position])?; - - Ok(ClusterNode { - name, - ip, - endpoints, - role, - status, - }) - } - - fn write_to_buffer(&self, buf: &mut BytesMut) { - buf.put_u32_le(self.name.len() as u32); - buf.put_slice(self.name.as_bytes()); - buf.put_u32_le(self.ip.len() as u32); - buf.put_slice(self.ip.as_bytes()); - self.endpoints.write_to_buffer(buf); - self.role.write_to_buffer(buf); - self.status.write_to_buffer(buf); - } - - fn get_buffer_size(&self) -> usize { - 4 + self.name.len() + 4 + self.ip.len() + self.endpoints.get_buffer_size() + 1 + 1 // name_len + name + ip_len + ip + endpoints + role + status - } -} - impl Display for ClusterNode { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/core/common/src/types/cluster/role.rs b/core/common/src/types/cluster/role.rs index 944c48cd5..cd15a177f 100644 --- a/core/common/src/types/cluster/role.rs +++ b/core/common/src/types/cluster/role.rs @@ -16,8 +16,7 @@ * under the License. */ -use crate::{BytesSerializable, IggyError}; -use bytes::{BufMut, Bytes, BytesMut}; +use crate::IggyError; use serde::{Deserialize, Serialize}; use strum::{AsRefStr, Display as StrumDisplay, EnumString}; @@ -35,30 +34,6 @@ pub enum ClusterNodeRole { Follower, } -impl BytesSerializable for ClusterNodeRole { - fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity(1); - self.write_to_buffer(&mut bytes); - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<ClusterNodeRole, IggyError> { - if bytes.is_empty() { - return Err(IggyError::InvalidCommand); - } - - ClusterNodeRole::try_from(bytes[0]) - } - - fn write_to_buffer(&self, buf: &mut BytesMut) { - buf.put_u8(*self as u8); - } - - fn get_buffer_size(&self) -> usize { - 1 - } -} - impl TryFrom<u8> for ClusterNodeRole { type Error = IggyError; diff --git a/core/common/src/types/cluster/status.rs b/core/common/src/types/cluster/status.rs index ddb8c07d3..2331342e2 100644 --- a/core/common/src/types/cluster/status.rs +++ b/core/common/src/types/cluster/status.rs @@ -15,8 +15,7 @@ // specific language governing permissions and limitations // under the License. -use crate::{BytesSerializable, IggyError}; -use bytes::{BufMut, Bytes, BytesMut}; +use crate::IggyError; use serde::{Deserialize, Serialize}; use strum::{AsRefStr, Display as StrumDisplay, EnumString}; @@ -56,27 +55,3 @@ impl TryFrom<u8> for ClusterNodeStatus { } } } - -impl BytesSerializable for ClusterNodeStatus { - fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity(1); - self.write_to_buffer(&mut bytes); - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<ClusterNodeStatus, IggyError> { - if bytes.is_empty() { - return Err(IggyError::InvalidCommand); - } - - ClusterNodeStatus::try_from(bytes[0]) - } - - fn write_to_buffer(&self, buf: &mut BytesMut) { - buf.put_u8(*self as u8); - } - - fn get_buffer_size(&self) -> usize { - 1 - } -} diff --git a/core/common/src/types/cluster/transport_endpoints.rs b/core/common/src/types/cluster/transport_endpoints.rs index e398f2b04..ac79bfd21 100644 --- a/core/common/src/types/cluster/transport_endpoints.rs +++ b/core/common/src/types/cluster/transport_endpoints.rs @@ -16,8 +16,6 @@ * under the License. */ -use crate::{BytesSerializable, IggyError}; -use bytes::{BufMut, Bytes, BytesMut}; use serde::{Deserialize, Serialize}; use std::fmt::Display; @@ -40,73 +38,6 @@ impl TransportEndpoints { } } -impl BytesSerializable for TransportEndpoints { - fn to_bytes(&self) -> Bytes { - let size = self.get_buffer_size(); - let mut bytes = BytesMut::with_capacity(size); - self.write_to_buffer(&mut bytes); - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { - if bytes.len() < 8 { - // Minimum: 4 ports * 2 bytes each - return Err(IggyError::InvalidCommand); - } - - let mut position = 0; - - // Read TCP port - let tcp = u16::from_le_bytes( - bytes[position..position + 2] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - position += 2; - - // Read QUIC port - let quic = u16::from_le_bytes( - bytes[position..position + 2] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - position += 2; - - // Read HTTP port - let http = u16::from_le_bytes( - bytes[position..position + 2] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - position += 2; - - // Read WebSocket port - let websocket = u16::from_le_bytes( - bytes[position..position + 2] - .try_into() - .map_err(|_| IggyError::InvalidNumberEncoding)?, - ); - - Ok(TransportEndpoints { - tcp, - quic, - http, - websocket, - }) - } - - fn write_to_buffer(&self, buf: &mut BytesMut) { - buf.put_u16_le(self.tcp); - buf.put_u16_le(self.quic); - buf.put_u16_le(self.http); - buf.put_u16_le(self.websocket); - } - - fn get_buffer_size(&self) -> usize { - 8 // 4 ports * 2 bytes each - } -} - impl Display for TransportEndpoints { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/core/common/src/types/configuration/transport.rs b/core/common/src/types/configuration/transport.rs index 1b6dca023..95e5c19a4 100644 --- a/core/common/src/types/configuration/transport.rs +++ b/core/common/src/types/configuration/transport.rs @@ -16,8 +16,7 @@ * under the License. */ -use crate::{BytesSerializable, IggyError}; -use bytes::{BufMut, Bytes, BytesMut}; +use crate::IggyError; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::{fmt, str::FromStr}; use strum::{Display, EnumString, IntoStaticStr}; @@ -143,23 +142,3 @@ impl<'de> Deserialize<'de> for TransportProtocol { deserializer.deserialize_any(TransportProtocolVisitor) } } - -impl BytesSerializable for TransportProtocol { - fn get_buffer_size(&self) -> usize { - 1 - } - - fn to_bytes(&self) -> Bytes { - let mut bytes = BytesMut::with_capacity(1); - bytes.put_u8(*self as u8); - bytes.freeze() - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { - if bytes.is_empty() { - return Err(IggyError::InvalidCommand); - } - - TransportProtocol::try_from(bytes[0]) - } -} diff --git a/core/common/src/types/message/message_header_view.rs b/core/common/src/types/message/message_header_view.rs index 2d8b45c34..925cda841 100644 --- a/core/common/src/types/message/message_header_view.rs +++ b/core/common/src/types/message/message_header_view.rs @@ -16,15 +16,12 @@ * under the License. */ -use bytes::{Bytes, BytesMut}; - use crate::{ - BytesSerializable, IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADER_SIZE, + IGGY_MESSAGE_CHECKSUM_OFFSET_RANGE, IGGY_MESSAGE_HEADER_SIZE, IGGY_MESSAGE_HEADERS_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_ID_OFFSET_RANGE, IGGY_MESSAGE_OFFSET_OFFSET_RANGE, IGGY_MESSAGE_ORIGIN_TIMESTAMP_OFFSET_RANGE, IGGY_MESSAGE_PAYLOAD_LENGTH_OFFSET_RANGE, IGGY_MESSAGE_TIMESTAMP_OFFSET_RANGE, - IggyMessageHeader, error::IggyError, - types::message::message_header::IGGY_MESSAGE_RESERVED_OFFSET_RANGE, + IggyMessageHeader, types::message::message_header::IGGY_MESSAGE_RESERVED_OFFSET_RANGE, }; /// A read-only, typed view into a message header in a raw buffer. @@ -108,21 +105,3 @@ impl<'a> IggyMessageHeaderView<'a> { } } } - -impl BytesSerializable for IggyMessageHeaderView<'_> { - fn to_bytes(&self) -> Bytes { - panic!("should not be used") - } - - fn from_bytes(_bytes: Bytes) -> Result<Self, IggyError> { - panic!("should not be used") - } - - fn write_to_buffer(&self, buf: &mut BytesMut) { - buf.extend_from_slice(self.data); - } - - fn get_buffer_size(&self) -> usize { - IGGY_MESSAGE_HEADER_SIZE - } -} diff --git a/core/common/src/types/message/message_view.rs b/core/common/src/types/message/message_view.rs index d6d42e0b4..23a63e711 100644 --- a/core/common/src/types/message/message_view.rs +++ b/core/common/src/types/message/message_view.rs @@ -25,7 +25,7 @@ use crate::Sizeable; use crate::error::IggyError; use crate::utils::checksum; use crate::{HeaderKey, IggyMessageHeaderView}; -use bytes::{Bytes, BytesMut}; +use bytes::Bytes; use std::collections::HashMap; use std::num::NonZeroUsize; @@ -134,24 +134,6 @@ impl<'a> IggyMessageView<'a> { } } -impl BytesSerializable for IggyMessageView<'_> { - fn to_bytes(&self) -> Bytes { - panic!("should not be used") - } - - fn from_bytes(_bytes: Bytes) -> Result<Self, IggyError> { - panic!("should not be used") - } - - fn write_to_buffer(&self, buf: &mut BytesMut) { - buf.extend_from_slice(self.buffer); - } - - fn get_buffer_size(&self) -> usize { - self.buffer.len() - } -} - impl Sizeable for IggyMessageView<'_> { fn get_size_bytes(&self) -> IggyByteSize { IggyByteSize::from(self.buffer.len() as u64) diff --git a/core/common/src/types/message/messages_batch.rs b/core/common/src/types/message/messages_batch.rs index d9b7b171f..6346af08c 100644 --- a/core/common/src/types/message/messages_batch.rs +++ b/core/common/src/types/message/messages_batch.rs @@ -167,26 +167,6 @@ impl Index<usize> for IggyMessagesBatch { } } -impl BytesSerializable for IggyMessagesBatch { - fn to_bytes(&self) -> Bytes { - panic!("should not be used"); - } - - fn from_bytes(_bytes: Bytes) -> Result<Self, IggyError> { - panic!("don't use"); - } - - fn write_to_buffer(&self, buf: &mut BytesMut) { - buf.put_u32_le(self.count); - buf.put_slice(&self.indexes); - buf.put_slice(&self.messages); - } - - fn get_buffer_size(&self) -> usize { - 4 + self.indexes.len() + self.messages.len() - } -} - impl Validatable<IggyError> for IggyMessagesBatch { fn validate(&self) -> Result<(), IggyError> { if self.is_empty() { diff --git a/core/common/src/types/message/polled_messages.rs b/core/common/src/types/message/polled_messages.rs index 8204ffde2..208de1f0a 100644 --- a/core/common/src/types/message/polled_messages.rs +++ b/core/common/src/types/message/polled_messages.rs @@ -52,12 +52,8 @@ impl PolledMessages { } } -impl BytesSerializable for PolledMessages { - fn to_bytes(&self) -> Bytes { - panic!("should not be used") - } - - fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { +impl PolledMessages { + pub fn from_bytes(bytes: Bytes) -> Result<Self, IggyError> { let partition_id = u32::from_le_bytes( bytes[0..4] .try_into()
