This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new a81bcc601 feat(server): aligned buffer memory pool (#2921)
a81bcc601 is described below
commit a81bcc601c4f133ad8ed3015257485e14834ad3f
Author: tungtose <[email protected]>
AuthorDate: Tue Mar 24 16:53:22 2026 +0700
feat(server): aligned buffer memory pool (#2921)
---
Cargo.lock | 1 +
Cargo.toml | 1 +
core/common/Cargo.toml | 1 +
core/common/src/alloc/buffer.rs | 145 ++++++++++++++-------
core/common/src/alloc/memory_pool.rs | 62 ++++++---
.../common/src/types/message/messages_batch_mut.rs | 23 ++--
.../src/types/segment_storage/index_reader.rs | 31 ++---
.../src/types/segment_storage/messages_reader.rs | 32 ++---
core/integration/tests/server/message_cleanup.rs | 2 +-
.../server/scenarios/message_cleanup_scenario.rs | 29 +++--
core/partitions/src/iggy_partition.rs | 2 +-
core/partitions/src/lib.rs | 10 +-
.../handlers/messages/send_messages_handler.rs | 1 +
core/server/src/http/messages.rs | 20 +--
14 files changed, 221 insertions(+), 139 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 63420b7b5..cdf2f962b 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -5409,6 +5409,7 @@ version = "0.9.3-edge.1"
dependencies = [
"aes-gcm",
"ahash 0.8.12",
+ "aligned-vec",
"async-broadcast",
"async-trait",
"base64 0.22.1",
diff --git a/Cargo.toml b/Cargo.toml
index 3843f5649..bdd7d84a1 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -66,6 +66,7 @@ actix-files = "0.6.10"
actix-web = "4.13.0"
aes-gcm = "0.10.3"
ahash = { version = "0.8.12", features = ["serde"] }
+aligned-vec = "0.6.4"
anyhow = "1.0.102"
argon2 = "0.5.3"
arrow = "57.3.0"
diff --git a/core/common/Cargo.toml b/core/common/Cargo.toml
index d3d3794ba..b7e610181 100644
--- a/core/common/Cargo.toml
+++ b/core/common/Cargo.toml
@@ -30,6 +30,7 @@ readme = "../../README.md"
[dependencies]
aes-gcm = { workspace = true }
ahash = { workspace = true }
+aligned-vec = { workspace = true }
async-broadcast = { workspace = true }
async-trait = { workspace = true }
base64 = { workspace = true }
diff --git a/core/common/src/alloc/buffer.rs b/core/common/src/alloc/buffer.rs
index 3c059cba3..1291febb4 100644
--- a/core/common/src/alloc/buffer.rs
+++ b/core/common/src/alloc/buffer.rs
@@ -16,11 +16,15 @@
* under the License.
*/
-use super::memory_pool::{BytesMutExt, memory_pool};
-use bytes::{Buf, BufMut, Bytes, BytesMut};
+use crate::alloc::memory_pool::{ALIGNMENT, AlignedBuffer};
+
+use super::memory_pool::{AlignedBufferExt, memory_pool};
+use bytes::Bytes;
use compio::buf::{IoBuf, IoBufMut, SetLen};
-use std::mem::MaybeUninit;
-use std::ops::{Deref, DerefMut};
+use std::{
+ mem::MaybeUninit,
+ ops::{Deref, DerefMut},
+};
/// A buffer wrapper that participates in memory pooling.
///
@@ -32,7 +36,7 @@ pub struct PooledBuffer {
from_pool: bool,
original_capacity: usize,
original_bucket_idx: Option<usize>,
- inner: BytesMut,
+ inner: AlignedBuffer,
}
impl Default for PooledBuffer {
@@ -48,13 +52,21 @@ impl PooledBuffer {
///
/// * `capacity` - The capacity of the buffer
pub fn with_capacity(capacity: usize) -> Self {
- let (buffer, was_pool_allocated) =
memory_pool().acquire_buffer(capacity);
+ let (buffer, was_pool_allocated) =
memory_pool().acquire_buffer(capacity.max(ALIGNMENT));
let original_capacity = buffer.capacity();
let original_bucket_idx = if was_pool_allocated {
memory_pool().best_fit(original_capacity)
} else {
None
};
+
+ debug_assert_eq!(
+ buffer.as_ptr() as usize % ALIGNMENT,
+ 0,
+ "PooledBuffer not aligned to {} bytes",
+ ALIGNMENT
+ );
+
Self {
from_pool: was_pool_allocated,
original_capacity,
@@ -63,12 +75,12 @@ impl PooledBuffer {
}
}
- /// Creates a new pooled buffer from an existing `BytesMut`.
+ /// Creates a new pooled buffer from an existing `AlignedBuffer`.
///
/// # Arguments
///
- /// * `existing` - The existing `BytesMut` buffer
- pub fn from_existing(existing: BytesMut) -> Self {
+ /// * `existing` - The existing `AlignedBuffer` buffer
+ pub fn from_existing(existing: AlignedBuffer) -> Self {
Self {
from_pool: false,
original_capacity: existing.capacity(),
@@ -83,7 +95,7 @@ impl PooledBuffer {
from_pool: false,
original_capacity: 0,
original_bucket_idx: None,
- inner: BytesMut::new(),
+ inner: AlignedBuffer::new(ALIGNMENT),
}
}
@@ -127,6 +139,52 @@ impl PooledBuffer {
}
}
+ /// Split the buffer at given position, returning a new PooledBuffer
+ /// containing byte [0, at) and leaving [at, len)
+ ///
+ /// # Panic
+ /// Panics if at > len
+ pub fn split_to(&mut self, at: usize) -> PooledBuffer {
+ assert!(
+ at <= self.len(),
+ "split_to out of bounds: at={}, len={}",
+ at,
+ self.len()
+ );
+
+ let mut new_buff = PooledBuffer::with_capacity(at);
+ new_buff.inner.extend_from_slice(&self.inner[..at]);
+
+ // SAFETY:
+ // - `self.inner.as_ptr().add(at)` is valid for `new_len` because
+ // `at + new_len === old_len <= cap`. Similar with
`self.inner.as_mut_ptr()`
+ //
+ // - source range is `[at, at + new_len)` and the destination is
+ // `[0, new_len)`. These ranges do not overlap when `at > 0`.
+ // - when `at == 0`, the operation is noop
+ let new_len = self.len() - at;
+ if new_len > 0 {
+ unsafe {
+ // move [at..] to [0..]
+ std::ptr::copy(
+ self.inner.as_ptr().add(at),
+ self.inner.as_mut_ptr(),
+ new_len,
+ );
+
+ self.inner.set_len(new_len);
+ }
+ } else {
+ self.inner.clear();
+ }
+
+ new_buff
+ }
+
+ pub fn put<T: AsRef<[u8]>>(&mut self, src: T) {
+ self.extend_from_slice(src.as_ref());
+ }
+
/// Wrapper for extend_from_slice which might cause resize
pub fn extend_from_slice(&mut self, extend_from: &[u8]) {
let before_cap = self.inner.capacity();
@@ -140,7 +198,9 @@ impl PooledBuffer {
/// Wrapper for put_bytes which might cause resize
pub fn put_bytes(&mut self, byte: u8, len: usize) {
let before_cap = self.inner.capacity();
- self.inner.put_bytes(byte, len);
+
+ let start = self.inner.len();
+ self.inner.resize(start + len, byte);
if self.inner.capacity() != before_cap {
self.check_for_resize();
@@ -149,18 +209,18 @@ impl PooledBuffer {
/// Wrapper for put_slice which might cause resize
pub fn put_slice(&mut self, src: &[u8]) {
- let before_cap = self.inner.capacity();
- self.inner.put_slice(src);
-
- if self.inner.capacity() != before_cap {
- self.check_for_resize();
- }
+ self.extend_from_slice(src);
+ // let before_cap = self.inner.capacity();
+ //
+ // if self.inner.capacity() != before_cap {
+ // self.check_for_resize();
+ // }
}
/// Wrapper for put_u32_le which might cause resize
pub fn put_u32_le(&mut self, value: u32) {
let before_cap = self.inner.capacity();
- self.inner.put_u32_le(value);
+ self.inner.extend_from_slice(&value.to_le_bytes());
if self.inner.capacity() != before_cap {
self.check_for_resize();
@@ -170,7 +230,7 @@ impl PooledBuffer {
/// Wrapper for put_u64_le which might cause resize
pub fn put_u64_le(&mut self, value: u64) {
let before_cap = self.inner.capacity();
- self.inner.put_u64_le(value);
+ self.inner.extend_from_slice(&value.to_le_bytes());
if self.inner.capacity() != before_cap {
self.check_for_resize();
@@ -192,11 +252,12 @@ impl PooledBuffer {
self.inner.is_empty()
}
- /// Consumes the PooledBuffer and returns the inner BytesMut.
+ /// Consumes the PooledBuffer and returns the inner AlignedBuffer.
/// Note: This bypasses pool return logic, use with caution.
- pub fn into_inner(self) -> BytesMut {
+ pub fn into_inner(self) -> AlignedBuffer {
let mut this = std::mem::ManuallyDrop::new(self);
- std::mem::take(&mut this.inner)
+
+ std::mem::replace(&mut this.inner, AlignedBuffer::new(ALIGNMENT))
}
/// Freezes the buffer, converting it to an immutable `Bytes`.
@@ -205,24 +266,26 @@ impl PooledBuffer {
/// return memory to the pool on drop (the frozen Bytes owns the
allocation).
/// The returned `Bytes` is Arc-backed, allowing cheap clones.
pub fn freeze(&mut self) -> Bytes {
- // Decrement pool counter since memory is transferred to Bytes
- // and won't be returned to the pool.
+ let buf = std::mem::replace(&mut self.inner,
AlignedBuffer::new(ALIGNMENT));
+
+ // Update pool accounting
if self.from_pool
&& let Some(bucket_idx) = self.original_bucket_idx
{
memory_pool().dec_bucket_in_use(bucket_idx);
}
-
- let inner = std::mem::take(&mut self.inner);
self.from_pool = false;
self.original_capacity = 0;
self.original_bucket_idx = None;
- inner.freeze()
+
+ // Zero copy: Bytes takes ownership of the AlignedBuffer
+ // and will drop it when refcount reaches zero
+ Bytes::from_owner(buf)
}
}
impl Deref for PooledBuffer {
- type Target = BytesMut;
+ type Target = AlignedBuffer;
fn deref(&self) -> &Self::Target {
&self.inner
@@ -238,7 +301,7 @@ impl DerefMut for PooledBuffer {
impl Drop for PooledBuffer {
fn drop(&mut self) {
if self.from_pool {
- let buf = std::mem::take(&mut self.inner);
+ let buf = std::mem::replace(&mut self.inner,
AlignedBuffer::new(ALIGNMENT));
buf.return_to_pool(self.original_capacity, true);
}
}
@@ -252,27 +315,15 @@ impl From<&[u8]> for PooledBuffer {
}
}
-impl From<BytesMut> for PooledBuffer {
- fn from(bytes: BytesMut) -> Self {
- Self::from_existing(bytes)
+impl AsRef<[u8]> for PooledBuffer {
+ fn as_ref(&self) -> &[u8] {
+ &self.inner
}
}
-impl Buf for PooledBuffer {
- fn remaining(&self) -> usize {
- self.inner.remaining()
- }
-
- fn chunk(&self) -> &[u8] {
- self.inner.chunk()
- }
-
- fn advance(&mut self, cnt: usize) {
- Buf::advance(&mut self.inner, cnt)
- }
-
- fn chunks_vectored<'t>(&'t self, dst: &mut [std::io::IoSlice<'t>]) ->
usize {
- self.inner.chunks_vectored(dst)
+impl From<AlignedBuffer> for PooledBuffer {
+ fn from(buffer: AlignedBuffer) -> Self {
+ Self::from_existing(buffer)
}
}
diff --git a/core/common/src/alloc/memory_pool.rs
b/core/common/src/alloc/memory_pool.rs
index 9216ef57f..0dd2782ec 100644
--- a/core/common/src/alloc/memory_pool.rs
+++ b/core/common/src/alloc/memory_pool.rs
@@ -16,7 +16,7 @@
* under the License.
*/
-use bytes::BytesMut;
+use aligned_vec::{AVec, ConstAlign};
use crossbeam::queue::ArrayQueue;
use human_repr::HumanCount;
use once_cell::sync::OnceCell;
@@ -24,18 +24,17 @@ use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tracing::{info, trace, warn};
+pub const ALIGNMENT: usize = 4096;
+pub type AlignedBuffer = AVec<u8, ConstAlign<4096>>;
+
/// Global memory pool instance. Use `memory_pool()` to access it.
pub static MEMORY_POOL: OnceCell<MemoryPool> = OnceCell::new();
/// Total number of distinct bucket sizes.
-const NUM_BUCKETS: usize = 32;
+const NUM_BUCKETS: usize = 28;
/// Array of bucket sizes in ascending order. Each entry is a distinct buffer
size (in bytes).
const BUCKET_SIZES: [usize; NUM_BUCKETS] = [
- 256,
- 512,
- 1024,
- 2 * 1024,
4 * 1024,
8 * 1024,
16 * 1024,
@@ -87,7 +86,7 @@ pub struct MemoryPoolConfigOther {
pub bucket_capacity: u32,
}
-/// A memory pool that maintains fixed-size buckets for reusing `BytesMut`
buffers.
+/// A memory pool that maintains fixed-size buckets for reusing
`AlignedBuffer` buffers.
///
/// Each bucket corresponds to a particular size in `BUCKET_SIZES`. The pool
tracks:
/// - Buffers currently in use (`in_use`)
@@ -109,7 +108,7 @@ pub struct MemoryPool {
/// Array of queues for reusable buffers. Each queue can store up to
`bucket_capacity` buffers.
/// The length of each queue (`buckets[i].len()`) is how many **free**
buffers are currently available.
/// Free doesn't mean the buffer is allocated, it just means it's not in
use.
- buckets: [Arc<ArrayQueue<BytesMut>>; NUM_BUCKETS],
+ buckets: [Arc<ArrayQueue<AlignedBuffer>>; NUM_BUCKETS],
/// Number of buffers **in use** for each bucket size (grow/shrink as they
are acquired/released).
in_use: [Arc<AtomicUsize>; NUM_BUCKETS],
@@ -175,29 +174,47 @@ impl MemoryPool {
MEMORY_POOL.get_or_init(|| MemoryPool::new(is_enabled,
memory_limit, bucket_capacity));
}
- /// Acquire a `BytesMut` buffer with at least `capacity` bytes.
+ /// Acquire a `AlignedBuffer` buffer with at least `capacity` bytes.
///
/// - If a bucket can fit `capacity`, try to pop from its free buffer
queue; otherwise create a new buffer.
/// - If `memory_limit` would be exceeded, allocate outside the pool.
///
/// Returns a tuple of (buffer, was_pool_allocated) where
was_pool_allocated indicates if the buffer
/// was allocated from the pool (true) or externally (false).
- pub fn acquire_buffer(&self, capacity: usize) -> (BytesMut, bool) {
+ pub fn acquire_buffer(&self, capacity: usize) -> (AlignedBuffer, bool) {
if !self.is_enabled {
- return (BytesMut::with_capacity(capacity), false);
+ return (allocate_aligned_buffer(capacity), false);
}
let current = self.pool_current_size();
match self.best_fit(capacity) {
Some(idx) => {
+ let new_size = BUCKET_SIZES[idx];
+
if let Some(mut buf) = self.buckets[idx].pop() {
buf.clear();
+
+ if buf.capacity() < capacity {
+ warn!(
+ "Pooled buffer too small! bucket[{}]={},
buf.capacity()={}, requested={}. Dropping and allocating new.",
+ idx,
+ new_size,
+ buf.capacity(),
+ capacity
+ );
+ drop(buf);
+
+ let new_buf = allocate_aligned_buffer(new_size);
+ self.inc_bucket_alloc(idx);
+ self.inc_bucket_in_use(idx);
+ return (new_buf, true);
+ }
+
self.inc_bucket_in_use(idx);
return (buf, true);
}
- let new_size = BUCKET_SIZES[idx];
if current + new_size > self.memory_limit {
self.set_capacity_warning(true);
trace!(
@@ -205,12 +222,12 @@ impl MemoryPool {
new_size, current, self.memory_limit
);
self.inc_external_allocations();
- return (BytesMut::with_capacity(new_size), false);
+ return (allocate_aligned_buffer(new_size), false);
}
self.inc_bucket_alloc(idx);
self.inc_bucket_in_use(idx);
- (BytesMut::with_capacity(new_size), true)
+ (allocate_aligned_buffer(new_size), true)
}
None => {
if current + capacity > self.memory_limit {
@@ -219,16 +236,16 @@ impl MemoryPool {
capacity, current, self.memory_limit
);
self.inc_external_allocations();
- return (BytesMut::with_capacity(capacity), false);
+ return (allocate_aligned_buffer(capacity), false);
}
self.inc_external_allocations();
- (BytesMut::with_capacity(capacity), false)
+ (allocate_aligned_buffer(capacity), false)
}
}
}
- /// Return a `BytesMut` buffer previously acquired from the pool.
+ /// Return a `AlignedBuffer` buffer previously acquired from the pool.
///
/// - If `current_capacity` differs from `original_capacity`, increments
`resize_events`.
/// - If a matching bucket exists, place it back in that bucket's queue
(if space is available).
@@ -236,7 +253,7 @@ impl MemoryPool {
/// - The `was_pool_allocated` flag indicates if this buffer was
originally allocated from the pool.
pub fn release_buffer(
&self,
- buffer: BytesMut,
+ buffer: AlignedBuffer,
original_capacity: usize,
was_pool_allocated: bool,
) {
@@ -448,16 +465,21 @@ impl MemoryPool {
/// Return a buffer to the pool by calling `release_buffer` with the original
capacity.
/// This extension trait makes it easy to do
`some_bytes.return_to_pool(orig_cap, was_pool_allocated)`.
-pub trait BytesMutExt {
+pub trait AlignedBufferExt {
fn return_to_pool(self, original_capacity: usize, was_pool_allocated:
bool);
}
-impl BytesMutExt for BytesMut {
+impl AlignedBufferExt for AlignedBuffer {
fn return_to_pool(self, original_capacity: usize, was_pool_allocated:
bool) {
memory_pool().release_buffer(self, original_capacity,
was_pool_allocated);
}
}
+fn allocate_aligned_buffer(capacity: usize) -> AlignedBuffer {
+ let aligned_capacity = capacity.next_multiple_of(ALIGNMENT).max(ALIGNMENT);
+ AlignedBuffer::with_capacity(ALIGNMENT, aligned_capacity)
+}
+
/// Convert a size in bytes to a string like "8KiB" or "2MiB".
fn size_str(size: usize) -> String {
if size >= 1024 * 1024 {
diff --git a/core/common/src/types/message/messages_batch_mut.rs
b/core/common/src/types/message/messages_batch_mut.rs
index 320f2f1ea..78ba9b3c0 100644
--- a/core/common/src/types/message/messages_batch_mut.rs
+++ b/core/common/src/types/message/messages_batch_mut.rs
@@ -25,9 +25,8 @@ use crate::{
IggyTimestamp, MAX_PAYLOAD_SIZE, MAX_USER_HEADERS_SIZE, Sizeable,
Validatable,
};
use crate::{MessageDeduplicator, PooledBuffer, random_id};
-use bytes::{BufMut, BytesMut};
use lending_iterator::prelude::*;
-use std::ops::{Deref, Index};
+use std::ops::Index;
use std::sync::Arc;
use tracing::{error, warn};
@@ -119,6 +118,11 @@ impl IggyMessagesBatchMut {
self.messages.len() as u32
}
+ /// Returns the raw message bytes as a slice.
+ pub fn as_bytes(&self) -> &[u8] {
+ &self.messages
+ }
+
/// Prepares all messages in the batch for persistence by setting their
offsets,
/// timestamps, and other necessary fields.
///
@@ -257,6 +261,11 @@ impl IggyMessagesBatchMut {
self.count() == 0
}
+ /// Return total size of all messages in bytes.
+ pub fn len(&self) -> usize {
+ self.messages.len()
+ }
+
/// Decomposes the batch into its constituent parts.
pub fn decompose(mut self) -> (IggyIndexesMut, PooledBuffer) {
let indexes = std::mem::replace(&mut self.indexes,
IggyIndexesMut::empty());
@@ -489,7 +498,7 @@ impl IggyMessagesBatchMut {
/// subsequent messages in the new buffer.
#[allow(clippy::too_many_arguments)]
fn rebuild_indexes_for_chunk(
- new_buffer: &BytesMut,
+ new_buffer: &PooledBuffer,
new_indexes: &mut IggyIndexesMut,
offset_in_new_buffer: &mut u32,
chunk_start: usize,
@@ -827,11 +836,3 @@ impl Index<usize> for IggyMessagesBatchMut {
&self.messages[start..end]
}
}
-
-impl Deref for IggyMessagesBatchMut {
- type Target = BytesMut;
-
- fn deref(&self) -> &Self::Target {
- &self.messages
- }
-}
diff --git a/core/common/src/types/segment_storage/index_reader.rs
b/core/common/src/types/segment_storage/index_reader.rs
index 419b6db4b..e62233ca6 100644
--- a/core/common/src/types/segment_storage/index_reader.rs
+++ b/core/common/src/types/segment_storage/index_reader.rs
@@ -16,7 +16,6 @@
// under the License.
use crate::{INDEX_SIZE, IggyError, IggyIndex, IggyIndexView, IggyIndexesMut,
PooledBuffer};
-use bytes::BytesMut;
use compio::{
buf::{IntoInner, IoBuf},
fs::{File, OpenOptions},
@@ -336,26 +335,18 @@ impl IndexReader {
&self,
offset: u32,
len: u32,
- use_pool: bool,
+ _use_pool: bool,
) -> Result<PooledBuffer, std::io::Error> {
- if use_pool {
- let len = len as usize;
- let buf = PooledBuffer::with_capacity(len);
- let (result, buf) = self
- .file
- .read_exact_at(buf.slice(..len), offset as u64)
- .await
- .into();
- let buf = buf.into_inner();
- result?;
- Ok(buf)
- } else {
- let mut buf = BytesMut::with_capacity(len as usize);
- unsafe { buf.set_len(len as usize) };
- let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await.into();
- result?;
- Ok(PooledBuffer::from_existing(buf))
- }
+ let buf = PooledBuffer::with_capacity(len as usize);
+
+ let (result, buf) = self
+ .file
+ .read_exact_at(buf.slice(..len as usize), offset as u64)
+ .await
+ .into();
+ let buf = buf.into_inner();
+ result?;
+ Ok(buf)
}
/// Gets the nth index from the index file.
diff --git a/core/common/src/types/segment_storage/messages_reader.rs
b/core/common/src/types/segment_storage/messages_reader.rs
index 7cd4c11b2..824261b0a 100644
--- a/core/common/src/types/segment_storage/messages_reader.rs
+++ b/core/common/src/types/segment_storage/messages_reader.rs
@@ -16,7 +16,6 @@
// under the License.
use crate::{IggyError, IggyIndexesMut, IggyMessagesBatchMut, PooledBuffer};
-use bytes::BytesMut;
use compio::buf::{IntoInner, IoBuf};
use compio::fs::{File, OpenOptions};
use compio::io::AsyncReadAtExt;
@@ -133,25 +132,18 @@ impl MessagesReader {
&self,
offset: u32,
len: u32,
- use_pool: bool,
+ _use_pool: bool,
) -> Result<PooledBuffer, std::io::Error> {
- if use_pool {
- let buf = PooledBuffer::with_capacity(len as usize);
- let len = len as usize;
- let (result, buf) = self
- .file
- .read_exact_at(buf.slice(..len), offset as u64)
- .await
- .into();
- let buf = buf.into_inner();
- result?;
- Ok(buf)
- } else {
- let mut buf = BytesMut::with_capacity(len as usize);
- unsafe { buf.set_len(len as usize) };
- let (result, buf) = self.file.read_exact_at(buf, offset as
u64).await.into();
- result?;
- Ok(PooledBuffer::from_existing(buf))
- }
+ let buf = PooledBuffer::with_capacity(len as usize);
+
+ let (result, buf) = self
+ .file
+ .read_exact_at(buf.slice(..len as usize), offset as u64)
+ .await
+ .into();
+
+ let buf = buf.into_inner();
+ result?;
+ Ok(buf)
}
}
diff --git a/core/integration/tests/server/message_cleanup.rs
b/core/integration/tests/server/message_cleanup.rs
index 3af365fd7..c3489208b 100644
--- a/core/integration/tests/server/message_cleanup.rs
+++ b/core/integration/tests/server/message_cleanup.rs
@@ -84,7 +84,7 @@ async fn run_cleanup_scenario(scenario: CleanupScenarioFn) {
.server(
TestServerConfig::builder()
.extra_envs(HashMap::from([
- ("IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
"100KiB".to_string()),
+ ("IGGY_SYSTEM_SEGMENT_SIZE".to_string(),
"10KiB".to_string()),
(
"IGGY_DATA_MAINTENANCE_MESSAGES_CLEANER_ENABLED".to_string(),
"true".to_string(),
diff --git
a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
index 2292f353a..05355c3bc 100644
--- a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
+++ b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
@@ -396,7 +396,7 @@ pub async fn run_expiry_with_multiple_partitions(client:
&IggyClient, data_path:
let stream = client.create_stream(STREAM_NAME).await.unwrap();
let stream_id = stream.id;
- let expiry = Duration::from_secs(3);
+ let expiry = Duration::from_secs(5);
let topic = client
.create_topic(
&Identifier::named(STREAM_NAME).unwrap(),
@@ -439,6 +439,8 @@ pub async fn run_expiry_with_multiple_partitions(client:
&IggyClient, data_path:
// Collect initial segment counts
let mut initial_counts: Vec<usize> = Vec::new();
+
+ // Wait until all partitions have >= 2 segments (up to 5s)
for partition_id in 0..PARTITIONS_COUNT {
let partition_path = data_path
.join(format!(
@@ -446,14 +448,23 @@ pub async fn run_expiry_with_multiple_partitions(client:
&IggyClient, data_path:
))
.display()
.to_string();
- let segments = get_segment_paths_for_partition(&partition_path);
- initial_counts.push(segments.len());
- assert!(
- segments.len() >= 2,
- "Partition {} should have at least 2 segments, got {}",
- partition_id,
- segments.len()
- );
+
+ let deadline = tokio::time::Instant::now() + Duration::from_secs(5);
+ let count = loop {
+ let segments = get_segment_paths_for_partition(&partition_path);
+ if segments.len() >= 2 {
+ break segments.len();
+ }
+ if tokio::time::Instant::now() >= deadline {
+ panic!(
+ "Partition {} should have at least 2 segments after 5s,
got {}",
+ partition_id,
+ segments.len()
+ );
+ }
+ tokio::time::sleep(Duration::from_millis(100)).await;
+ };
+ initial_counts.push(count);
}
// Wait for expiry + cleaner
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
index 5addf1516..0413cb472 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -79,7 +79,7 @@ impl IggyPartition {
let mut position = header_size + 4;
bytes[position..position + indexes.len()].copy_from_slice(indexes);
position += indexes.len();
- bytes[position..position + batch.len()].copy_from_slice(batch);
+ bytes[position..position +
batch.len()].copy_from_slice(batch.as_bytes());
Message::<PrepareHeader>::from_bytes(bytes.freeze())
.expect("prepare_message_from_batch: invalid prepared message
bytes")
diff --git a/core/partitions/src/lib.rs b/core/partitions/src/lib.rs
index 1f18e32ed..833c712bd 100644
--- a/core/partitions/src/lib.rs
+++ b/core/partitions/src/lib.rs
@@ -54,8 +54,14 @@ pub(crate) fn decode_send_messages_batch(body: Bytes) ->
Option<IggyMessagesBatc
}
let indexes_bytes = body.split_to(indexes_len);
- let indexes =
IggyIndexesMut::from_bytes(PooledBuffer::from(indexes_bytes), 0);
- let messages = PooledBuffer::from(body);
+
+ let mut indexes_buf = PooledBuffer::with_capacity(indexes_len);
+ indexes_buf.put_slice(indexes_bytes.as_ref());
+ let indexes = IggyIndexesMut::from_bytes(indexes_buf, 0);
+
+ let messages_len = body.len();
+ let mut messages = PooledBuffer::with_capacity(messages_len);
+ messages.put_slice(body.as_ref());
Some(IggyMessagesBatchMut::from_indexes_and_messages(
indexes, messages,
diff --git a/core/server/src/binary/handlers/messages/send_messages_handler.rs
b/core/server/src/binary/handlers/messages/send_messages_handler.rs
index 652b99a7c..91a5ea1e7 100644
--- a/core/server/src/binary/handlers/messages/send_messages_handler.rs
+++ b/core/server/src/binary/handlers/messages/send_messages_handler.rs
@@ -123,6 +123,7 @@ impl ServerCommandHandler for SendMessages {
.and_then(|s| s.checked_sub(metadata_len_field_size))
.ok_or(IggyError::InvalidCommand)?;
let messages_buffer = PooledBuffer::with_capacity(messages_size);
+
let (result, messages_buffer) =
sender.read(messages_buffer.slice(0..messages_size)).await;
result?;
let messages_buffer = messages_buffer.into_inner();
diff --git a/core/server/src/http/messages.rs b/core/server/src/http/messages.rs
index 125c0ad25..f12e36869 100644
--- a/core/server/src/http/messages.rs
+++ b/core/server/src/http/messages.rs
@@ -22,18 +22,17 @@ use crate::http::jwt::json_web_token::Identity;
use crate::http::shared::AppState;
use crate::shard::system::messages::PollingArgs;
use crate::shard::transmission::message::ResolvedPartition;
-use crate::streaming::segments::{IggyIndexesMut, IggyMessagesBatchMut};
+use crate::streaming::segments::IggyMessagesBatchMut;
use crate::streaming::session::Session;
use axum::extract::{Path, Query, State};
use axum::http::StatusCode;
use axum::routing::get;
use axum::{Extension, Json, Router, debug_handler};
use err_trail::ErrContext;
-use iggy_common::Identifier;
-use iggy_common::PooledBuffer;
-use iggy_common::Validatable;
use iggy_common::{Consumer, PollMessages, SendMessages};
+use iggy_common::{Identifier, PooledBuffer};
use iggy_common::{IggyError, IggyMessagesBatch, PolledMessages};
+use iggy_common::{IggyIndexesMut, Validatable};
use send_wrapper::SendWrapper;
use std::sync::Arc;
use tracing::instrument;
@@ -152,9 +151,14 @@ async fn flush_unsaved_buffer(
fn make_mutable(batch: IggyMessagesBatch) -> IggyMessagesBatchMut {
let (_, indexes, messages) = batch.decompose();
- let (_, indexes_buffer) = indexes.decompose();
- let indexes_buffer_mut =
PooledBuffer::from_existing(indexes_buffer.into());
- let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut, 0);
- let messages_buffer_mut = PooledBuffer::from_existing(messages.into());
+ let (base_position, indexes_buffer) = indexes.decompose();
+
+ let mut indexes_buffer_mut =
PooledBuffer::with_capacity(indexes_buffer.len());
+ indexes_buffer_mut.extend_from_slice(&indexes_buffer);
+ let indexes_mut = IggyIndexesMut::from_bytes(indexes_buffer_mut,
base_position);
+
+ let mut messages_buffer_mut = PooledBuffer::with_capacity(messages.len());
+ messages_buffer_mut.extend_from_slice(&messages);
+
IggyMessagesBatchMut::from_indexes_and_messages(indexes_mut,
messages_buffer_mut)
}