This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new a81bcc601 feat(server): aligned buffer memory pool (#2921)
a81bcc601 is described below

commit a81bcc601c4f133ad8ed3015257485e14834ad3f
Author: tungtose <[email protected]>
AuthorDate: Tue Mar 24 16:53:22 2026 +0700

    feat(server): aligned buffer memory pool (#2921)
---
 Cargo.lock                                         |   1 +
 Cargo.toml                                         |   1 +
 core/common/Cargo.toml                             |   1 +
 core/common/src/alloc/buffer.rs                    | 145 ++++++++++++++-------
 core/common/src/alloc/memory_pool.rs               |  62 ++++++---
 .../common/src/types/message/messages_batch_mut.rs |  23 ++--
 .../src/types/segment_storage/index_reader.rs      |  31 ++---
 .../src/types/segment_storage/messages_reader.rs   |  32 ++---
 core/integration/tests/server/message_cleanup.rs   |   2 +-
 .../server/scenarios/message_cleanup_scenario.rs   |  29 +++--
 core/partitions/src/iggy_partition.rs              |   2 +-
 core/partitions/src/lib.rs                         |  10 +-
 .../handlers/messages/send_messages_handler.rs     |   1 +
 core/server/src/http/messages.rs                   |  20 +--
 14 files changed, 221 insertions(+), 139 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 63420b7b5..cdf2f962b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5409,6 +5409,7 @@ version = "0.9.3-edge.1"
 dependencies = [
  "aes-gcm",
  "ahash 0.8.12",
+ "aligned-vec",
  "async-broadcast",
  "async-trait",
  "base64 0.22.1",
diff --git a/Cargo.toml b/Cargo.toml
index 3843f5649..bdd7d84a1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -66,6 +66,7 @@ actix-files = "0.6.10"
 actix-web = "4.13.0"
 aes-gcm = "0.10.3"
 ahash = { version = "0.8.12", features = ["serde"] }
+aligned-vec = "0.6.4"
 anyhow = "1.0.102"
 argon2 = "0.5.3"
 arrow = "57.3.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index d3d3794ba..b7e610181 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -30,6 +30,7 @@ readme = "../../README.md"
 [dependencies]
 aes-gcm = { workspace = true }
 ahash = { workspace = true }
+aligned-vec = { workspace = true }
 async-broadcast = { workspace = true }
 async-trait = { workspace = true }
 base64 = { workspace = true }
diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs
index 3c059cba3..1291febb4 100644
--- a/core/common/src/alloc/buffer.rs
+++ b/core/common/src/alloc/buffer.rs
@@ -16,11 +16,15 @@
  * under the License.
  */
 
-use super::memory_pool::{BytesMutExt, memory_pool};
-use bytes::{Buf, BufMut, Bytes, BytesMut};
+use crate::alloc::memory_pool::{ALIGNMENT, AlignedBuffer};
+
+use super::memory_pool::{AlignedBufferExt, memory_pool};
+use bytes::Bytes;
 use compio::buf::{IoBuf, IoBufMut, SetLen};
-use std::mem::MaybeUninit;
-use std::ops::{Deref, DerefMut};
+use std::{
+    mem::MaybeUninit,
+    ops::{Deref, DerefMut},
+};
 
 /// A buffer wrapper that participates in memory pooling.
 ///
@@ -32,7 +36,7 @@ pub struct PooledBuffer {
     from_pool: bool,
     original_capacity: usize,
     original_bucket_idx: Option<usize>,
-    inner: BytesMut,
+    inner: AlignedBuffer,
 }
 
 impl Default for PooledBuffer {
@@ -48,13 +52,21 @@ impl PooledBuffer {
     ///
     /// * `capacity` - The capacity of the buffer
     pub fn with_capacity(capacity: usize) -> Self {
-        let (buffer, was_pool_allocated) = 
memory_pool().acquire_buffer(capacity);
+        let (buffer, was_pool_allocated) = 
memory_pool().acquire_buffer(capacity.max(ALIGNMENT));
         let original_capacity = buffer.capacity();
         let original_bucket_idx = if was_pool_allocated {
             memory_pool().best_fit(original_capacity)
         } else {
             None
         };
+
+        debug_assert_eq!(
+            buffer.as_ptr() as usize % ALIGNMENT,
+            0,
+            "PooledBuffer not aligned to {} bytes",
+            ALIGNMENT
+        );
+
         Self {
             from_pool: was_pool_allocated,
             original_capacity,
@@ -63,12 +75,12 @@ impl PooledBuffer {
         }
     }
 
-    /// Creates a new pooled buffer from an existing `BytesMut`.
+    /// Creates a new pooled buffer from an existing `AlignedBuffer`.
     ///
     /// # Arguments
     ///
-    /// * `existing` - The existing `BytesMut` buffer
-    pub fn from_existing(existing: BytesMut) -> Self {
+    /// * `existing` - The existing `AlignedBuffer` buffer
+    pub fn from_existing(existing: AlignedBuffer) -> Self {
         Self {
             from_pool: false,
             original_capacity: existing.capacity(),
@@ -83,7 +95,7 @@ impl PooledBuffer {
             from_pool: false,
             original_capacity: 0,
             original_bucket_idx: None,
-            inner: BytesMut::new(),
+            inner: AlignedBuffer::new(ALIGNMENT),
         }
     }
 
@@ -127,6 +139,52 @@ impl PooledBuffer {
         }
     }
 
+    /// Split the buffer at given position, returning a new PooledBuffer
+    /// containing byte [0, at) and leaving [at, len)
+    ///
+    /// # Panic
+    /// Panics if at > len
+    pub fn split_to(&mut self, at: usize) -> PooledBuffer {
+        assert!(
+            at <= self.len(),
+            "split_to out of bounds: at={}, len={}",
+            at,
+            self.len()
+        );
+
+        let mut new_buff = PooledBuffer::with_capacity(at);
+        new_buff.inner.extend_from_slice(&self.inner[..at]);
+
+        // SAFETY:
+        // - `self.inner.as_ptr().add(at)` is valid for `new_len` because
+        // `at + new_len === old_len <= cap`. Similar with 
`self.inner.as_mut_ptr()`
+        //
+        // -  source range is `[at, at + new_len)` and the destination is
+        //   `[0, new_len)`.  These ranges do not overlap when `at > 0`.
+        // - when `at == 0`, the operation is noop
+        let new_len = self.len() - at;
+        if new_len > 0 {
+            unsafe {
+                // move [at..] to [0..]
+                std::ptr::copy(
+                    self.inner.as_ptr().add(at),
+                    self.inner.as_mut_ptr(),
+                    new_len,
+                );
+
+                self.inner.set_len(new_len);
+            }
+        } else {
+            self.inner.clear();
+        }
+
+        new_buff
+    }
+
+    pub fn put<T: AsRef<[u8]>>(&mut self, src: T) {
+        self.extend_from_slice(src.as_ref());
+    }
+
     /// Wrapper for extend_from_slice which might cause resize
     pub fn extend_from_slice(&mut self, extend_from: &[u8]) {
         let before_cap = self.inner.capacity();
@@ -140,7 +198,9 @@ impl PooledBuffer {
     /// Wrapper for put_bytes which might cause resize
     pub fn put_bytes(&mut self, byte: u8, len: usize) {
         let before_cap = self.inner.capacity();
-        self.inner.put_bytes(byte, len);
+
+        let start = self.inner.len();
+        self.inner.resize(start + len, byte);
 
         if self.inner.capacity() != before_cap {
             self.check_for_resize();
@@ -149,18 +209,18 @@ impl PooledBuffer {
 
     /// Wrapper for put_slice which might cause resize
     pub fn put_slice(&mut self, src: &[u8]) {
-        let before_cap = self.inner.capacity();
-        self.inner.put_slice(src);
-
-        if self.inner.capacity() != before_cap {
-            self.check_for_resize();
-        }
+        self.extend_from_slice(src);
+        // let before_cap = self.inner.capacity();
+        //
+        // if self.inner.capacity() != before_cap {
+        //     self.check_for_resize();
+        // }
     }
 
     /// Wrapper for put_u32_le which might cause resize
     pub fn put_u32_le(&mut self, value: u32) {
         let before_cap = self.inner.capacity();
-        self.inner.put_u32_le(value);
+        self.inner.extend_from_slice(&value.to_le_bytes());
 
         if self.inner.capacity() != before_cap {
             self.check_for_resize();
@@ -170,7 +230,7 @@ impl PooledBuffer {
     /// Wrapper for put_u64_le which might cause resize
     pub fn put_u64_le(&mut self, value: u64) {
         let before_cap = self.inner.capacity();
-        self.inner.put_u64_le(value);
+        self.inner.extend_from_slice(&value.to_le_bytes());
 
         if self.inner.capacity() != before_cap {
             self.check_for_resize();
@@ -192,11 +252,12 @@ impl PooledBuffer {
         self.inner.is_empty()
     }
 
-    /// Consumes the PooledBuffer and returns the inner BytesMut.
+    /// Consumes the PooledBuffer and returns the inner AlignedBuffer.
     /// Note: This bypasses pool return logic, use with caution.
-    pub fn into_inner(self) -> BytesMut {
+    pub fn into_inner(self) -> AlignedBuffer {
         let mut this = std::mem::ManuallyDrop::new(self);
-        std::mem::take(&mut this.inner)
+
+        std::mem::replace(&mut this.inner, AlignedBuffer::new(ALIGNMENT))
     }
 
     /// Freezes the buffer, converting it to an immutable `Bytes`.
@@ -205,24 +266,26 @@ impl PooledBuffer {
     /// return memory to the pool on drop (the frozen Bytes owns the 
allocation).
     /// The returned `Bytes` is Arc-backed, allowing cheap clones.
     pub fn freeze(&mut self) -> Bytes {
-        // Decrement pool counter since memory is transferred to Bytes
-        // and won't be returned to the pool.
+        let buf = std::mem::replace(&mut self.inner, 
AlignedBuffer::new(ALIGNMENT));
+
+        // Update pool accounting
         if self.from_pool
             && let Some(bucket_idx) = self.original_bucket_idx
         {
             memory_pool().dec_bucket_in_use(bucket_idx);
         }
-
-        let inner = std::mem::take(&mut self.inner);
         self.from_pool = false;
         self.original_capacity = 0;
         self.original_bucket_idx = None;
-        inner.freeze()
+
+        // Zero copy: Bytes takes ownership of the AlignedBuffer
+        // and will drop it when refcount reaches zero
+        Bytes::from_owner(buf)
     }
 }
 
 impl Deref for PooledBuffer {
-    type Target = BytesMut;
+    type Target = AlignedBuffer;
 
     fn deref(&self) -> &Self::Target {
         &self.inner
@@ -238,7 +301,7 @@ impl DerefMut for PooledBuffer {
 impl Drop for PooledBuffer {
     fn drop(&mut self) {
         if self.from_pool {
-            let buf = std::mem::take(&mut self.inner);
+            let buf = std::mem::replace(&mut self.inner, 
AlignedBuffer::new(ALIGNMENT));
             buf.return_to_pool(self.original_capacity, true);
         }
     }
@@ -252,27 +315,15 @@ impl From<&[u8]> for PooledBuffer {
     }
 }
 
-impl From<BytesMut> for PooledBuffer {
-    fn from(bytes: BytesMut) -> Self {
-        Self::from_existing(bytes)
+impl AsRef<[u8]> for PooledBuffer {
+    fn as_ref(&self) -> &[u8] {
+        &self.inner
     }
 }
 
-impl Buf for PooledBuffer {
-    fn remaining(&self) -> usize {
-        self.inner.remaining()
-    }
-
-    fn chunk(&self) -> &[u8] {
-        self.inner.chunk()
-    }
-
-    fn advance(&mut self, cnt: usize) {
-        Buf::advance(&mut self.inner, cnt)
-    }
-
-    fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) -> 
usize {
-        self.inner.chunks_vectored(dst)
+impl From<AlignedBuffer> for PooledBuffer {
+    fn from(buffer: AlignedBuffer) -> Self {
+        Self::from_existing(buffer)
     }
 }
 
diff --git a/core/common/src/alloc/memory_pool.rs 
b/core/common/src/alloc/memory_pool.rs
index 9216ef57f..0dd2782ec 100644
--- a/core/common/src/alloc/memory_pool.rs
+++ b/core/common/src/alloc/memory_pool.rs
@@ -16,7 +16,7 @@
  * under the License.
  */
 
-use bytes::BytesMut;
+use aligned_vec::{AVec, ConstAlign};
 use crossbeam::queue::ArrayQueue;
 use human_repr::HumanCount;
 use once_cell::sync::OnceCell;
@@ -24,18 +24,17 @@ use std::sync::Arc;
 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
 use tracing::{info, trace, warn};
 
+pub const ALIGNMENT: usize = 4096;
+pub type AlignedBuffer = AVec<u8, ConstAlign<4096>>;
+
 /// Global memory pool instance. Use `memory_pool()` to access it.
 pub static MEMORY_POOL: OnceCell<MemoryPool> = OnceCell::new();
 
 /// Total number of distinct bucket sizes.
-const NUM_BUCKETS: usize = 32;
+const NUM_BUCKETS: usize = 28;
 
 /// Array of bucket sizes in ascending order. Each entry is a distinct buffer 
size (in bytes).
 const BUCKET_SIZES: [usize; NUM_BUCKETS] = [
-    256,
-    512,
-    1024,
-    2 * 1024,
     4 * 1024,
     8 * 1024,
     16 * 1024,
@@ -87,7 +86,7 @@ pub struct MemoryPoolConfigOther {
     pub bucket_capacity: u32,
 }
 
-/// A memory pool that maintains fixed-size buckets for reusing `BytesMut` 
buffers.
+/// A memory pool that maintains fixed-size buckets for reusing 
`AlignedBuffer` buffers.
 ///
 /// Each bucket corresponds to a particular size in `BUCKET_SIZES`. The pool 
tracks:
 /// - Buffers currently in use (`in_use`)
@@ -109,7 +108,7 @@ pub struct MemoryPool {
     /// Array of queues for reusable buffers. Each queue can store up to 
`bucket_capacity` buffers.
     /// The length of each queue (`buckets[i].len()`) is how many **free** 
buffers are currently available.
     /// Free doesn't mean the buffer is allocated, it just means it's not in 
use.
-    buckets: [Arc<ArrayQueue<BytesMut>>; NUM_BUCKETS],
+    buckets: [Arc<ArrayQueue<AlignedBuffer>>; NUM_BUCKETS],
 
     /// Number of buffers **in use** for each bucket size (grow/shrink as they 
are acquired/released).
     in_use: [Arc<AtomicUsize>; NUM_BUCKETS],
@@ -175,29 +174,47 @@ impl MemoryPool {
             MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled, 
memory_limit, bucket_capacity));
     }
 
-    /// Acquire a `BytesMut` buffer with at least `capacity` bytes.
+    /// Acquire a `AlignedBuffer` buffer with at least `capacity` bytes.
     ///
     /// - If a bucket can fit `capacity`, try to pop from its free buffer 
queue; otherwise create a new buffer.
     /// - If `memory_limit` would be exceeded, allocate outside the pool.
     ///
     /// Returns a tuple of (buffer, was_pool_allocated) where 
was_pool_allocated indicates if the buffer
     /// was allocated from the pool (true) or externally (false).
-    pub fn acquire_buffer(&self, capacity: usize) -> (BytesMut, bool) {
+    pub fn acquire_buffer(&self, capacity: usize) -> (AlignedBuffer, bool) {
         if !self.is_enabled {
-            return (BytesMut::with_capacity(capacity), false);
+            return (allocate_aligned_buffer(capacity), false);
         }
 
         let current = self.pool_current_size();
 
         match self.best_fit(capacity) {
             Some(idx) => {
+                let new_size = BUCKET_SIZES[idx];
+
                 if let Some(mut buf) = self.buckets[idx].pop() {
                     buf.clear();
+
+                    if buf.capacity() < capacity {
+                        warn!(
+                            "Pooled buffer too small! bucket[{}]={}, 
buf.capacity()={}, requested={}. Dropping and allocating new.",
+                            idx,
+                            new_size,
+                            buf.capacity(),
+                            capacity
+                        );
+                        drop(buf);
+
+                        let new_buf = allocate_aligned_buffer(new_size);
+                        self.inc_bucket_alloc(idx);
+                        self.inc_bucket_in_use(idx);
+                        return (new_buf, true);
+                    }
+
                     self.inc_bucket_in_use(idx);
                     return (buf, true);
                 }
 
-                let new_size = BUCKET_SIZES[idx];
                 if current + new_size > self.memory_limit {
                     self.set_capacity_warning(true);
                     trace!(
@@ -205,12 +222,12 @@ impl MemoryPool {
                         new_size, current, self.memory_limit
                     );
                     self.inc_external_allocations();
-                    return (BytesMut::with_capacity(new_size), false);
+                    return (allocate_aligned_buffer(new_size), false);
                 }
 
                 self.inc_bucket_alloc(idx);
                 self.inc_bucket_in_use(idx);
-                (BytesMut::with_capacity(new_size), true)
+                (allocate_aligned_buffer(new_size), true)
             }
             None => {
                 if current + capacity > self.memory_limit {
@@ -219,16 +236,16 @@ impl MemoryPool {
                         capacity, current, self.memory_limit
                     );
                     self.inc_external_allocations();
-                    return (BytesMut::with_capacity(capacity), false);
+                    return (allocate_aligned_buffer(capacity), false);
                 }
 
                 self.inc_external_allocations();
-                (BytesMut::with_capacity(capacity), false)
+                (allocate_aligned_buffer(capacity), false)
             }
         }
     }
 
-    /// Return a `BytesMut` buffer previously acquired from the pool.
+    /// Return a `AlignedBuffer` buffer previously acquired from the pool.
     ///
     /// - If `current_capacity` differs from `original_capacity`, increments 
`resize_events`.
     /// - If a matching bucket exists, place it back in that bucket's queue 
(if space is available).
@@ -236,7 +253,7 @@ impl MemoryPool {
     /// - The `was_pool_allocated` flag indicates if this buffer was 
originally allocated from the pool.
     pub fn release_buffer(
         &self,
-        buffer: BytesMut,
+        buffer: AlignedBuffer,
         original_capacity: usize,
         was_pool_allocated: bool,
     ) {
@@ -448,16 +465,21 @@ impl MemoryPool {
 
 /// Return a buffer to the pool by calling `release_buffer` with the original 
capacity.
 /// This extension trait makes it easy to do 
`some_bytes.return_to_pool(orig_cap, was_pool_allocated)`.
-pub trait BytesMutExt {
+pub trait AlignedBufferExt {
     fn return_to_pool(self, original_capacity: usize, was_pool_allocated: 
bool);
 }
 
-impl BytesMutExt for BytesMut {
+impl AlignedBufferExt for AlignedBuffer {
     fn return_to_pool(self, original_capacity: usize, was_pool_allocated: 
bool) {
         memory_pool().release_buffer(self, original_capacity, 
was_pool_allocated);
     }
 }
 
+fn allocate_aligned_buffer(capacity: usize) -> AlignedBuffer {
+    let aligned_capacity = capacity.next_multiple_of(ALIGNMENT).max(ALIGNMENT);
+    AlignedBuffer::with_capacity(ALIGNMENT, aligned_capacity)
+}
+
 /// Convert a size in bytes to a string like "8KiB" or "2MiB".
 fn size_str(size: usize) -> String {
     if size >= 1024 * 1024 {
diff --git a/core/common/src/types/message/messages_batch_mut.rs 
b/core/common/src/types/message/messages_batch_mut.rs
index 320f2f1ea..78ba9b3c0 100644
--- a/core/common/src/types/message/messages_batch_mut.rs
+++ b/core/common/src/types/message/messages_batch_mut.rs
@@ -25,9 +25,8 @@ use crate::{
     IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable, 
Validatable,
 };
 use crate::{MessageDeduplicator, PooledBuffer, random_id};
-use bytes::{BufMut, BytesMut};
 use lending_iterator::prelude::*;
-use std::ops::{Deref, Index};
+use std::ops::Index;
 use std::sync::Arc;
 use tracing::{error, warn};
 
@@ -119,6 +118,11 @@ impl IggyMessagesBatchMut {
         self.messages.len() as u32
     }
 
+    /// Returns the raw message bytes as a slice.
+    pub fn as_bytes(&self) -> &[u8] {
+        &self.messages
+    }
+
     /// Prepares all messages in the batch for persistence by setting their 
offsets,
     /// timestamps, and other necessary fields.
     ///
@@ -257,6 +261,11 @@ impl IggyMessagesBatchMut {
         self.count() == 0
     }
 
+    /// Return total size of all messages in bytes.
+    pub fn len(&self) -> usize {
+        self.messages.len()
+    }
+
     /// Decomposes the batch into its constituent parts.
     pub fn decompose(mut self) -> (IggyIndexesMut, PooledBuffer) {
         let indexes = std::mem::replace(&mut self.indexes, 
IggyIndexesMut::empty());
@@ -489,7 +498,7 @@ impl IggyMessagesBatchMut {
     /// subsequent messages in the new buffer.
     #[allow(clippy::too_many_arguments)]
     fn rebuild_indexes_for_chunk(
-        new_buffer: &BytesMut,
+        new_buffer: &PooledBuffer,
         new_indexes: &mut IggyIndexesMut,
         offset_in_new_buffer: &mut u32,
         chunk_start: usize,
@@ -827,11 +836,3 @@ impl Index<usize> for IggyMessagesBatchMut {
         &self.messages[start..end]
     }
 }
-
-impl Deref for IggyMessagesBatchMut {
-    type Target = BytesMut;
-
-    fn deref(&self) -> &Self::Target {
-        &self.messages
-    }
-}
diff --git a/core/common/src/types/segment_storage/index_reader.rs 
b/core/common/src/types/segment_storage/index_reader.rs
index 419b6db4b..e62233ca6 100644
--- a/core/common/src/types/segment_storage/index_reader.rs
+++ b/core/common/src/types/segment_storage/index_reader.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView, IggyIndexesMut, 
PooledBuffer};
-use bytes::BytesMut;
 use compio::{
     buf::{IntoInner, IoBuf},
     fs::{File, OpenOptions},
@@ -336,26 +335,18 @@ impl IndexReader {
         &self,
         offset: u32,
         len: u32,
-        use_pool: bool,
+        _use_pool: bool,
     ) -> Result<PooledBuffer, std::io::Error> {
-        if use_pool {
-            let len = len as usize;
-            let buf = PooledBuffer::with_capacity(len);
-            let (result, buf) = self
-                .file
-                .read_exact_at(buf.slice(..len), offset as u64)
-                .await
-                .into();
-            let buf = buf.into_inner();
-            result?;
-            Ok(buf)
-        } else {
-            let mut buf = BytesMut::with_capacity(len as usize);
-            unsafe { buf.set_len(len as usize) };
-            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await.into();
-            result?;
-            Ok(PooledBuffer::from_existing(buf))
-        }
+        let buf = PooledBuffer::with_capacity(len as usize);
+
+        let (result, buf) = self
+            .file
+            .read_exact_at(buf.slice(..len as usize), offset as u64)
+            .await
+            .into();
+        let buf = buf.into_inner();
+        result?;
+        Ok(buf)
     }
 
     /// Gets the nth index from the index file.
diff --git a/core/common/src/types/segment_storage/messages_reader.rs 
b/core/common/src/types/segment_storage/messages_reader.rs
index 7cd4c11b2..824261b0a 100644
--- a/core/common/src/types/segment_storage/messages_reader.rs
+++ b/core/common/src/types/segment_storage/messages_reader.rs
@@ -16,7 +16,6 @@
 // under the License.
 
 use crate::{IggyError, IggyIndexesMut, IggyMessagesBatchMut, PooledBuffer};
-use bytes::BytesMut;
 use compio::buf::{IntoInner, IoBuf};
 use compio::fs::{File, OpenOptions};
 use compio::io::AsyncReadAtExt;
@@ -133,25 +132,18 @@ impl MessagesReader {
         &self,
         offset: u32,
         len: u32,
-        use_pool: bool,
+        _use_pool: bool,
     ) -> Result<PooledBuffer, std::io::Error> {
-        if use_pool {
-            let buf = PooledBuffer::with_capacity(len as usize);
-            let len = len as usize;
-            let (result, buf) = self
-                .file
-                .read_exact_at(buf.slice(..len), offset as u64)
-                .await
-                .into();
-            let buf = buf.into_inner();
-            result?;
-            Ok(buf)
-        } else {
-            let mut buf = BytesMut::with_capacity(len as usize);
-            unsafe { buf.set_len(len as usize) };
-            let (result, buf) = self.file.read_exact_at(buf, offset as 
u64).await.into();
-            result?;
-            Ok(PooledBuffer::from_existing(buf))
-        }
+        let buf = PooledBuffer::with_capacity(len as usize);
+
+        let (result, buf) = self
+            .file
+            .read_exact_at(buf.slice(..len as usize), offset as u64)
+            .await
+            .into();
+
+        let buf = buf.into_inner();
+        result?;
+        Ok(buf)
     }
 }
diff --git a/core/integration/tests/server/message_cleanup.rs 
b/core/integration/tests/server/message_cleanup.rs
index 3af365fd7..c3489208b 100644
--- a/core/integration/tests/server/message_cleanup.rs
+++ b/core/integration/tests/server/message_cleanup.rs
@@ -84,7 +84,7 @@ async fn run_cleanup_scenario(scenario: CleanupScenarioFn) {
         .server(
             TestServerConfig::builder()
                 .extra_envs(HashMap::from([
-                    ("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), 
"100KiB".to_string()),
+                    ("IGGY_SYSTEM_SEGMENT_SIZE".to_string(), 
"10KiB".to_string()),
                     (
                         
"IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED".to_string(),
                         "true".to_string(),
diff --git 
a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs 
b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
index 2292f353a..05355c3bc 100644
--- a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
+++ b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
@@ -396,7 +396,7 @@ pub async fn run_expiry_with_multiple_partitions(client: 
&IggyClient, data_path:
     let stream = client.create_stream(STREAM_NAME).await.unwrap();
     let stream_id = stream.id;
 
-    let expiry = Duration::from_secs(3);
+    let expiry = Duration::from_secs(5);
     let topic = client
         .create_topic(
             &Identifier::named(STREAM_NAME).unwrap(),
@@ -439,6 +439,8 @@ pub async fn run_expiry_with_multiple_partitions(client: 
&IggyClient, data_path:
 
     // Collect initial segment counts
     let mut initial_counts: Vec<usize> = Vec::new();
+
+    // Wait until all partitions have >= 2 segments (up to 5s)
     for partition_id in 0..PARTITIONS_COUNT {
         let partition_path = data_path
             .join(format!(
@@ -446,14 +448,23 @@ pub async fn run_expiry_with_multiple_partitions(client: 
&IggyClient, data_path:
             ))
             .display()
             .to_string();
-        let segments = get_segment_paths_for_partition(&partition_path);
-        initial_counts.push(segments.len());
-        assert!(
-            segments.len() >= 2,
-            "Partition {} should have at least 2 segments, got {}",
-            partition_id,
-            segments.len()
-        );
+
+        let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
+        let count = loop {
+            let segments = get_segment_paths_for_partition(&partition_path);
+            if segments.len() >= 2 {
+                break segments.len();
+            }
+            if tokio::time::Instant::now() >= deadline {
+                panic!(
+                    "Partition {} should have at least 2 segments after 5s, 
got {}",
+                    partition_id,
+                    segments.len()
+                );
+            }
+            tokio::time::sleep(Duration::from_millis(100)).await;
+        };
+        initial_counts.push(count);
     }
 
     // Wait for expiry + cleaner
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 5addf1516..0413cb472 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -79,7 +79,7 @@ impl IggyPartition {
         let mut position = header_size + 4;
         bytes[position..position + indexes.len()].copy_from_slice(indexes);
         position += indexes.len();
-        bytes[position..position + batch.len()].copy_from_slice(batch);
+        bytes[position..position + 
batch.len()].copy_from_slice(batch.as_bytes());
 
         Message::<PrepareHeader>::from_bytes(bytes.freeze())
             .expect("prepare_message_from_batch: invalid prepared message 
bytes")
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index 1f18e32ed..833c712bd 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -54,8 +54,14 @@ pub(crate) fn decode_send_messages_batch(body: Bytes) -> 
Option<IggyMessagesBatc
     }
 
     let indexes_bytes = body.split_to(indexes_len);
-    let indexes = 
IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0);
-    let messages = PooledBuffer::from(body);
+
+    let mut indexes_buf = PooledBuffer::with_capacity(indexes_len);
+    indexes_buf.put_slice(indexes_bytes.as_ref());
+    let indexes = IggyIndexesMut::from_bytes(indexes_buf, 0);
+
+    let messages_len = body.len();
+    let mut messages = PooledBuffer::with_capacity(messages_len);
+    messages.put_slice(body.as_ref());
 
     Some(IggyMessagesBatchMut::from_indexes_and_messages(
         indexes, messages,
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs 
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 652b99a7c..91a5ea1e7 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -123,6 +123,7 @@ impl ServerCommandHandler for SendMessages {
             .and_then(|s| s.checked_sub(metadata_len_field_size))
             .ok_or(IggyError::InvalidCommand)?;
         let messages_buffer = PooledBuffer::with_capacity(messages_size);
+
         let (result, messages_buffer) = 
sender.read(messages_buffer.slice(0..messages_size)).await;
         result?;
         let messages_buffer = messages_buffer.into_inner();
diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs
index 125c0ad25..f12e36869 100644
--- a/core/server/src/http/messages.rs
+++ b/core/server/src/http/messages.rs
@@ -22,18 +22,17 @@ use crate::http::jwt::json_web_token::Identity;
 use crate::http::shared::AppState;
 use crate::shard::system::messages::PollingArgs;
 use crate::shard::transmission::message::ResolvedPartition;
-use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
+use crate::streaming::segments::IggyMessagesBatchMut;
 use crate::streaming::session::Session;
 use axum::extract::{Path, Query, State};
 use axum::http::StatusCode;
 use axum::routing::get;
 use axum::{Extension, Json, Router, debug_handler};
 use err_trail::ErrContext;
-use iggy_common::Identifier;
-use iggy_common::PooledBuffer;
-use iggy_common::Validatable;
 use iggy_common::{Consumer, PollMessages, SendMessages};
+use iggy_common::{Identifier, PooledBuffer};
 use iggy_common::{IggyError, IggyMessagesBatch, PolledMessages};
+use iggy_common::{IggyIndexesMut, Validatable};
 use send_wrapper::SendWrapper;
 use std::sync::Arc;
 use tracing::instrument;
@@ -152,9 +151,14 @@ async fn flush_unsaved_buffer(
 
 fn make_mutable(batch: IggyMessagesBatch) -> IggyMessagesBatchMut {
     let (_, indexes, messages) = batch.decompose();
-    let (_, indexes_buffer) = indexes.decompose();
-    let indexes_buffer_mut = 
PooledBuffer::from_existing(indexes_buffer.into());
-    let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 0);
-    let messages_buffer_mut = PooledBuffer::from_existing(messages.into());
+    let (base_position, indexes_buffer) = indexes.decompose();
+
+    let mut indexes_buffer_mut = 
PooledBuffer::with_capacity(indexes_buffer.len());
+    indexes_buffer_mut.extend_from_slice(&indexes_buffer);
+    let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 
base_position);
+
+    let mut messages_buffer_mut = PooledBuffer::with_capacity(messages.len());
+    messages_buffer_mut.extend_from_slice(&messages);
+
     IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut, 
messages_buffer_mut)
 }

Reply via email to