This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch partition_remaster
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 021cec27d0eb64c918a774bf5ce8794c57257b72
Author: numinex <[email protected]>
AuthorDate: Mon Mar 30 15:13:21 2026 +0200

    fix simulator
---
 Cargo.lock                              |   1 +
 core/common/src/types/send_messages2.rs |  93 ++++++++++++++++++++++++++++
 core/consensus/src/impls.rs             |   2 +-
 core/partitions/src/iggy_partitions.rs  |  61 ++++++++++---------
 core/partitions/src/journal.rs          |  11 ++++
 core/simulator/Cargo.toml               |   1 +
 core/simulator/src/client.rs            | 103 ++++++++++++++++----------------
 7 files changed, 188 insertions(+), 84 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 1276be916..0b570d0b6 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -9952,6 +9952,7 @@ name = "simulator"
 version = "0.1.0"
 dependencies = [
  "bytemuck",
+ "bytes",
  "consensus",
  "enumset",
  "futures",
diff --git a/core/common/src/types/send_messages2.rs 
b/core/common/src/types/send_messages2.rs
index 1b23adb52..e714936da 100644
--- a/core/common/src/types/send_messages2.rs
+++ b/core/common/src/types/send_messages2.rs
@@ -127,6 +127,72 @@ pub struct SendMessages2Owned {
 }
 
 impl SendMessages2Owned {
+    pub fn from_messages(
+        namespace: IggyNamespace,
+        messages: &IggyMessages2,
+    ) -> Result<Self, IggyError> {
+        let message_count = messages.count();
+        let mut origin_timestamp = u64::MAX;
+        for message in messages {
+            origin_timestamp = 
origin_timestamp.min(message.header.origin_timestamp);
+        }
+
+        if origin_timestamp == u64::MAX {
+            origin_timestamp = 0;
+        }
+
+        let mut blob = BytesMut::new();
+        for (index, message) in messages.iter().enumerate() {
+            let id = if message.header.id == 0 {
+                random_id::get_uuid()
+            } else {
+                message.header.id
+            };
+            let offset_delta = u32::try_from(index).map_err(|_| 
IggyError::InvalidCommand)?;
+            let timestamp_delta = message
+                .header
+                .origin_timestamp
+                .checked_sub(origin_timestamp)
+                .ok_or(IggyError::InvalidCommand)?;
+            if timestamp_delta > MAX_TIMESTAMP_DELTA_MICROS {
+                return 
Err(IggyError::InvalidMessageTimestampDelta(timestamp_delta));
+            }
+            let timestamp_delta =
+                u32::try_from(timestamp_delta).map_err(|_| 
IggyError::InvalidCommand)?;
+            let user_headers = 
message.user_headers.as_deref().unwrap_or_default();
+            let user_headers_length =
+                u32::try_from(user_headers.len()).map_err(|_| 
IggyError::InvalidCommand)?;
+            let payload_length =
+                u32::try_from(message.payload.len()).map_err(|_| 
IggyError::InvalidCommand)?;
+
+            let mut header = [0u8; MESSAGE_HEADER_SIZE];
+            header[8..24].copy_from_slice(&id.to_le_bytes());
+            header[24..28].copy_from_slice(&offset_delta.to_le_bytes());
+            header[28..32].copy_from_slice(&timestamp_delta.to_le_bytes());
+            header[32..36].copy_from_slice(&user_headers_length.to_le_bytes());
+            header[36..40].copy_from_slice(&payload_length.to_le_bytes());
+
+            let checksum = calculate_checksum_parts(&header[8..], 
user_headers, &message.payload);
+            header[0..8].copy_from_slice(&checksum.to_le_bytes());
+
+            blob.extend_from_slice(&header);
+            blob.extend_from_slice(user_headers);
+            blob.extend_from_slice(&message.payload);
+        }
+
+        let blob = blob.freeze();
+        let mut header = SendMessages2Header::new(
+            namespace.partition_id() as u64,
+            origin_timestamp,
+            u64::try_from(COMMAND_HEADER_SIZE + blob.len())
+                .map_err(|_| IggyError::InvalidCommand)?,
+            message_count,
+        );
+        header.batch_checksum = calculate_batch_checksum(&header, &blob);
+
+        Ok(Self { header, blob })
+    }
+
     pub fn from_legacy_request(namespace: IggyNamespace, body: &[u8]) -> 
Result<Self, IggyError> {
         let (message_count, messages) = legacy_messages_slice(body)?;
         let mut parsed = Vec::with_capacity(message_count as usize);
@@ -487,9 +553,36 @@ pub fn convert_request_message(
     let request_header = *message.header();
     let total_size = request_header.size as usize;
     let body = 
&message.as_slice()[std::mem::size_of::<RequestHeader>()..total_size];
+    if decode_request_slice(body).is_ok() {
+        return Ok(message);
+    }
     SendMessages2Owned::from_legacy_request(namespace, 
body)?.encode_request(request_header)
 }
 
+fn decode_request_slice(body: &[u8]) -> Result<SendMessages2Ref<'_>, 
IggyError> {
+    if body.len() < COMMAND_HEADER_SIZE {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    let header = SendMessages2Header::decode(&body[..COMMAND_HEADER_SIZE])?;
+    let blob_len = header.blob_len()?;
+    if body.len() < header.total_size() {
+        return Err(IggyError::InvalidCommand);
+    }
+
+    let blob = &body[COMMAND_HEADER_SIZE..COMMAND_HEADER_SIZE + blob_len];
+    let expected_checksum = calculate_batch_checksum(&header, blob);
+    if header.batch_checksum != expected_checksum {
+        return Err(IggyError::InvalidBatchChecksum(
+            header.batch_checksum,
+            expected_checksum,
+            header.base_offset,
+        ));
+    }
+
+    Ok(SendMessages2Ref { header, blob })
+}
+
 pub fn decode_prepare_slice(bytes: &[u8]) -> Result<SendMessages2Ref<'_>, 
IggyError> {
     let header_size = std::mem::size_of::<PrepareHeader>();
     if bytes.len() < header_size {
diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs
index bf9a52412..6cc841d0e 100644
--- a/core/consensus/src/impls.rs
+++ b/core/consensus/src/impls.rs
@@ -1576,6 +1576,6 @@ where
     }
 
     fn is_syncing(&self) -> bool {
-        self.is_syncing()
+        VsrConsensus::is_syncing(self)
     }
 }
diff --git a/core/partitions/src/iggy_partitions.rs 
b/core/partitions/src/iggy_partitions.rs
index 94614d715..2fdc461c2 100644
--- a/core/partitions/src/iggy_partitions.rs
+++ b/core/partitions/src/iggy_partitions.rs
@@ -478,7 +478,6 @@ where
         // In metadata layer we assume that when an `on_request` or 
`on_replicate` is called, it's called from correct shard.
         // I think we need to do the same here, which means that the code from 
below is unfallable, the partition should always exist by now!
         let message = self.replicate(message).await;
-
         if let Err(error) = self.apply_replicated_operation(&namespace, 
message).await {
             warn!(
                 target: "iggy.partitions.diag",
@@ -493,22 +492,6 @@ where
             return;
         }
 
-        if header.operation == Operation::SendMessages
-            && let Err(error) = self.commit_messages(&namespace, true).await
-        {
-            warn!(
-                target: "iggy.partitions.diag",
-                plane = "partitions",
-                replica_id = consensus.replica(),
-                op = header.op,
-                namespace_raw = namespace.inner(),
-                operation = ?header.operation,
-                %error,
-                "failed to durably persist replicated partition operation"
-            );
-            return;
-        }
-
         // TODO: Make those assertions be toggleable through a feature flag, 
so they can be used only by simulator/tests.
         debug_assert_eq!(header.op, current_op + 1);
         consensus.sequencer().set_sequence(header.op);
@@ -772,11 +755,7 @@ where
     /// Updates segment metadata, stats, flushes journal to disk if thresholds
     /// are exceeded, and advances the durable offset last.
     #[allow(clippy::too_many_lines)]
-    async fn commit_messages(
-        &self,
-        namespace: &IggyNamespace,
-        force_persist: bool,
-    ) -> Result<(), IggyError> {
+    async fn commit_messages(&self, namespace: &IggyNamespace) -> Result<(), 
IggyError> {
         let write_lock = self
             .get_by_ns(namespace)
             .expect("commit_messages: partition not found for namespace")
@@ -802,10 +781,8 @@ where
             journal_info.messages_count >= 
self.config.messages_required_to_save;
         let unsaved_messages_size_exceeded = journal_info.size.as_bytes_u64()
             >= self.config.size_of_messages_required_to_save.as_bytes_u64();
-        let should_persist = force_persist
-            || is_full
-            || unsaved_messages_count_exceeded
-            || unsaved_messages_size_exceeded;
+        let should_persist =
+            is_full || unsaved_messages_count_exceeded || 
unsaved_messages_size_exceeded;
 
         if !should_persist {
             return Ok(());
@@ -1002,7 +979,7 @@ where
         match prepare_header.operation {
             Operation::SendMessages => {
                 if committed_ns.insert(entry_namespace)
-                    && let Err(error) = self.commit_messages(&entry_namespace, 
false).await
+                    && let Err(error) = 
self.commit_messages(&entry_namespace).await
                 {
                     failed_ns.insert(entry_namespace);
                     warn!(
@@ -1074,15 +1051,37 @@ where
             .messages_writers()
             .last()
             .and_then(|writer| writer.as_ref())
-            .cloned()
-            .ok_or(IggyError::CannotSaveMessagesToSegment)?;
+            .cloned();
         let index_writer = partition
             .log
             .index_writers()
             .last()
             .and_then(|writer| writer.as_ref())
-            .cloned()
-            .ok_or(IggyError::CannotSaveIndexToSegment)?;
+            .cloned();
+
+        if messages_writer.is_none() || index_writer.is_none() {
+            let saved_bytes = 
stripped_batches.iter().map(Frozen::len).sum::<usize>();
+            debug!(
+                target: "iggy.partitions.diag",
+                plane = "partitions",
+                namespace_raw = namespace.inner(),
+                batch_count,
+                saved_bytes,
+                "simulated in-memory batch persistence"
+            );
+
+            let partition = self
+                .get_mut_by_ns(namespace)
+                .expect("persist: partition not found");
+            let segment_index = partition.log.segments().len() - 1;
+            let segment = &mut partition.log.segments_mut()[segment_index];
+            segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + 
saved_bytes as u64);
+            partition.log.clear_in_flight();
+            return Ok(());
+        }
+
+        let messages_writer = messages_writer.expect("checked above");
+        let index_writer = index_writer.expect("checked above");
 
         let saved = messages_writer
             .save_frozen_batches(&stripped_batches)
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index 85e6242ee..fd0b0fceb 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -218,6 +218,17 @@ impl<S> PartitionJournal<S>
 where
     S: Storage<Buffer = JournalBuffer>,
 {
+    #[must_use]
+    pub fn with_storage(storage: S) -> Self {
+        Self {
+            op_to_storage_offset: UnsafeCell::new(BTreeMap::new()),
+            offset_to_op: UnsafeCell::new(BTreeMap::new()),
+            timestamp_to_op: UnsafeCell::new(BTreeMap::new()),
+            headers: UnsafeCell::new(Vec::new()),
+            inner: UnsafeCell::new(JournalInner { storage }),
+        }
+    }
+
     #[allow(dead_code)]
     fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> {
         match query {
diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml
index 7cfd90b43..7c3c93c74 100644
--- a/core/simulator/Cargo.toml
+++ b/core/simulator/Cargo.toml
@@ -22,6 +22,7 @@ edition = "2024"
 
 [dependencies]
 bytemuck = { workspace = true }
+bytes = { workspace = true }
 consensus = { path = "../consensus" }
 enumset = { workspace = true }
 futures = { workspace = true }
diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs
index 22c2024aa..0aac8dd57 100644
--- a/core/simulator/src/client.rs
+++ b/core/simulator/src/client.rs
@@ -15,10 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use bytes::Bytes;
 use iggy_binary_protocol::{Message, Operation, RequestHeader};
+use iggy_common::send_messages2::{
+    IggyMessage2, IggyMessage2Header, IggyMessages2, SendMessages2Owned,
+};
 use iggy_common::{
-    BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, Identifier,
-    create_stream::CreateStream, delete_stream::DeleteStream, 
sharding::IggyNamespace,
+    BytesSerializable, Identifier, create_stream::CreateStream, 
delete_stream::DeleteStream,
+    sharding::IggyNamespace,
 };
 use iobuf::Owned;
 use std::cell::Cell;
@@ -70,40 +74,25 @@ impl SimClient {
         namespace: IggyNamespace,
         messages: &[&[u8]],
     ) -> Message<RequestHeader> {
-        let count = messages.len() as u32;
-        let mut indexes = Vec::with_capacity(count as usize * INDEX_SIZE);
-        let mut messages_buf = Vec::new();
-
-        let mut current_position = 0u32;
-        for (i, msg) in messages.iter().enumerate() {
-            let msg_total_len = (IGGY_MESSAGE_HEADER_SIZE + msg.len()) as u32;
-
-            // Index: offset(u32) + position(u32) + timestamp(u64)
-            indexes.extend_from_slice(&(i as u32).to_le_bytes()); // offset 
(relative)
-            indexes.extend_from_slice(&current_position.to_le_bytes()); // 
position
-            indexes.extend_from_slice(&0u64.to_le_bytes()); // timestamp (set 
in prepare)
-
-            // Message header (64 bytes)
-            messages_buf.extend_from_slice(&0u64.to_le_bytes()); // checksum
-            messages_buf.extend_from_slice(&0u128.to_le_bytes()); // id
-            messages_buf.extend_from_slice(&0u64.to_le_bytes()); // offset
-            messages_buf.extend_from_slice(&0u64.to_le_bytes()); // timestamp
-            messages_buf.extend_from_slice(&0u64.to_le_bytes()); // 
origin_timestamp
-            messages_buf.extend_from_slice(&0u32.to_le_bytes()); // 
user_headers_length
-            messages_buf.extend_from_slice(&(msg.len() as u32).to_le_bytes()); 
// payload_length
-            messages_buf.extend_from_slice(&0u64.to_le_bytes()); // reserved
-
-            // Payload
-            messages_buf.extend_from_slice(msg);
-            current_position += msg_total_len;
+        let mut batch = IggyMessages2::with_capacity(messages.len());
+        for message in messages {
+            batch.push(IggyMessage2 {
+                header: IggyMessage2Header {
+                    payload_length: message.len() as u32,
+                    ..Default::default()
+                },
+                payload: Bytes::copy_from_slice(message),
+                user_headers: None,
+            });
         }
 
-        let mut payload = Vec::with_capacity(4 + indexes.len() + 
messages_buf.len());
-        payload.extend_from_slice(&count.to_le_bytes());
-        payload.extend_from_slice(&indexes);
-        payload.extend_from_slice(&messages_buf);
-
-        self.build_request_with_namespace(Operation::SendMessages, &payload, 
namespace)
+        let batch = SendMessages2Owned::from_messages(namespace, &batch)
+            .expect("simulator must build a valid send_messages2 batch");
+        let total_size = std::mem::size_of::<RequestHeader>() + 
batch.header.total_size();
+        let request_header = self.request_header(Operation::SendMessages, 
namespace, total_size);
+        batch
+            .encode_request(request_header)
+            .expect("simulator must build a valid send_messages2 request")
     }
 
     pub fn store_consumer_offset(
@@ -131,24 +120,7 @@ impl SimClient {
         let header_size = std::mem::size_of::<RequestHeader>();
         let total_size = header_size + payload.len();
 
-        let header = RequestHeader {
-            command: iggy_binary_protocol::Command2::Request,
-            operation,
-            size: total_size as u32,
-            cluster: 0,
-            checksum: 0,
-            checksum_body: 0,
-            view: 0,
-            release: 0,
-            replica: 0,
-            reserved_frame: [0; 66],
-            client: self.client_id,
-            request_checksum: 0,
-            timestamp: 0,
-            request: self.next_request_number(),
-            namespace: namespace.inner(),
-            ..Default::default()
-        };
+        let header = self.request_header(operation, namespace, total_size);
 
         let header_bytes = bytemuck::bytes_of(&header);
         let mut buffer = Vec::with_capacity(total_size);
@@ -190,4 +162,31 @@ impl SimClient {
         Message::try_from(Owned::<4096>::copy_from_slice(&buffer))
             .expect("request buffer must contain a valid request message")
     }
+
+    #[allow(clippy::cast_possible_truncation)]
+    fn request_header(
+        &self,
+        operation: Operation,
+        namespace: IggyNamespace,
+        total_size: usize,
+    ) -> RequestHeader {
+        RequestHeader {
+            command: iggy_binary_protocol::Command2::Request,
+            operation,
+            size: total_size as u32,
+            cluster: 0,
+            checksum: 0,
+            checksum_body: 0,
+            view: 0,
+            release: 0,
+            replica: 0,
+            reserved_frame: [0; 66],
+            client: self.client_id,
+            request_checksum: 0,
+            timestamp: 0,
+            request: self.next_request_number(),
+            namespace: namespace.inner(),
+            ..Default::default()
+        }
+    }
 }

Reply via email to