This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch partition_remaster in repository https://gitbox.apache.org/repos/asf/iggy.git
commit 021cec27d0eb64c918a774bf5ce8794c57257b72 Author: numinex <[email protected]> AuthorDate: Mon Mar 30 15:13:21 2026 +0200 fix simulator --- Cargo.lock | 1 + core/common/src/types/send_messages2.rs | 93 ++++++++++++++++++++++++++++ core/consensus/src/impls.rs | 2 +- core/partitions/src/iggy_partitions.rs | 61 ++++++++++--------- core/partitions/src/journal.rs | 11 ++++ core/simulator/Cargo.toml | 1 + core/simulator/src/client.rs | 103 ++++++++++++++++---------------- 7 files changed, 188 insertions(+), 84 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 1276be916..0b570d0b6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -9952,6 +9952,7 @@ name = "simulator" version = "0.1.0" dependencies = [ "bytemuck", + "bytes", "consensus", "enumset", "futures", diff --git a/core/common/src/types/send_messages2.rs b/core/common/src/types/send_messages2.rs index 1b23adb52..e714936da 100644 --- a/core/common/src/types/send_messages2.rs +++ b/core/common/src/types/send_messages2.rs @@ -127,6 +127,72 @@ pub struct SendMessages2Owned { } impl SendMessages2Owned { + pub fn from_messages( + namespace: IggyNamespace, + messages: &IggyMessages2, + ) -> Result<Self, IggyError> { + let message_count = messages.count(); + let mut origin_timestamp = u64::MAX; + for message in messages { + origin_timestamp = origin_timestamp.min(message.header.origin_timestamp); + } + + if origin_timestamp == u64::MAX { + origin_timestamp = 0; + } + + let mut blob = BytesMut::new(); + for (index, message) in messages.iter().enumerate() { + let id = if message.header.id == 0 { + random_id::get_uuid() + } else { + message.header.id + }; + let offset_delta = u32::try_from(index).map_err(|_| IggyError::InvalidCommand)?; + let timestamp_delta = message + .header + .origin_timestamp + .checked_sub(origin_timestamp) + .ok_or(IggyError::InvalidCommand)?; + if timestamp_delta > MAX_TIMESTAMP_DELTA_MICROS { + return Err(IggyError::InvalidMessageTimestampDelta(timestamp_delta)); + } + let timestamp_delta = + u32::try_from(timestamp_delta).map_err(|_| IggyError::InvalidCommand)?; + let user_headers = message.user_headers.as_deref().unwrap_or_default(); + let user_headers_length = + u32::try_from(user_headers.len()).map_err(|_| IggyError::InvalidCommand)?; + let payload_length = + u32::try_from(message.payload.len()).map_err(|_| IggyError::InvalidCommand)?; + + let mut header = [0u8; MESSAGE_HEADER_SIZE]; + header[8..24].copy_from_slice(&id.to_le_bytes()); + header[24..28].copy_from_slice(&offset_delta.to_le_bytes()); + header[28..32].copy_from_slice(×tamp_delta.to_le_bytes()); + header[32..36].copy_from_slice(&user_headers_length.to_le_bytes()); + header[36..40].copy_from_slice(&payload_length.to_le_bytes()); + + let checksum = calculate_checksum_parts(&header[8..], user_headers, &message.payload); + header[0..8].copy_from_slice(&checksum.to_le_bytes()); + + blob.extend_from_slice(&header); + blob.extend_from_slice(user_headers); + blob.extend_from_slice(&message.payload); + } + + let blob = blob.freeze(); + let mut header = SendMessages2Header::new( + namespace.partition_id() as u64, + origin_timestamp, + u64::try_from(COMMAND_HEADER_SIZE + blob.len()) + .map_err(|_| IggyError::InvalidCommand)?, + message_count, + ); + header.batch_checksum = calculate_batch_checksum(&header, &blob); + + Ok(Self { header, blob }) + } + pub fn from_legacy_request(namespace: IggyNamespace, body: &[u8]) -> Result<Self, IggyError> { let (message_count, messages) = legacy_messages_slice(body)?; let mut parsed = Vec::with_capacity(message_count as usize); @@ -487,9 +553,36 @@ pub fn convert_request_message( let request_header = *message.header(); let total_size = request_header.size as usize; let body = &message.as_slice()[std::mem::size_of::<RequestHeader>()..total_size]; + if decode_request_slice(body).is_ok() { + return Ok(message); + } SendMessages2Owned::from_legacy_request(namespace, body)?.encode_request(request_header) } +fn decode_request_slice(body: &[u8]) -> Result<SendMessages2Ref<'_>, IggyError> { + if body.len() < COMMAND_HEADER_SIZE { + return Err(IggyError::InvalidCommand); + } + + let header = SendMessages2Header::decode(&body[..COMMAND_HEADER_SIZE])?; + let blob_len = header.blob_len()?; + if body.len() < header.total_size() { + return Err(IggyError::InvalidCommand); + } + + let blob = &body[COMMAND_HEADER_SIZE..COMMAND_HEADER_SIZE + blob_len]; + let expected_checksum = calculate_batch_checksum(&header, blob); + if header.batch_checksum != expected_checksum { + return Err(IggyError::InvalidBatchChecksum( + header.batch_checksum, + expected_checksum, + header.base_offset, + )); + } + + Ok(SendMessages2Ref { header, blob }) +} + pub fn decode_prepare_slice(bytes: &[u8]) -> Result<SendMessages2Ref<'_>, IggyError> { let header_size = std::mem::size_of::<PrepareHeader>(); if bytes.len() < header_size { diff --git a/core/consensus/src/impls.rs b/core/consensus/src/impls.rs index bf9a52412..6cc841d0e 100644 --- a/core/consensus/src/impls.rs +++ b/core/consensus/src/impls.rs @@ -1576,6 +1576,6 @@ where } fn is_syncing(&self) -> bool { - self.is_syncing() + VsrConsensus::is_syncing(self) } } diff --git a/core/partitions/src/iggy_partitions.rs b/core/partitions/src/iggy_partitions.rs index 94614d715..2fdc461c2 100644 --- a/core/partitions/src/iggy_partitions.rs +++ b/core/partitions/src/iggy_partitions.rs @@ -478,7 +478,6 @@ where // In metadata layer we assume that when an `on_request` or `on_replicate` is called, it's called from correct shard. // I think we need to do the same here, which means that the code from below is unfallable, the partition should always exist by now! let message = self.replicate(message).await; - if let Err(error) = self.apply_replicated_operation(&namespace, message).await { warn!( target: "iggy.partitions.diag", @@ -493,22 +492,6 @@ where return; } - if header.operation == Operation::SendMessages - && let Err(error) = self.commit_messages(&namespace, true).await - { - warn!( - target: "iggy.partitions.diag", - plane = "partitions", - replica_id = consensus.replica(), - op = header.op, - namespace_raw = namespace.inner(), - operation = ?header.operation, - %error, - "failed to durably persist replicated partition operation" - ); - return; - } - // TODO: Make those assertions be toggleable through a feature flag, so they can be used only by simulator/tests. debug_assert_eq!(header.op, current_op + 1); consensus.sequencer().set_sequence(header.op); @@ -772,11 +755,7 @@ where /// Updates segment metadata, stats, flushes journal to disk if thresholds /// are exceeded, and advances the durable offset last. #[allow(clippy::too_many_lines)] - async fn commit_messages( - &self, - namespace: &IggyNamespace, - force_persist: bool, - ) -> Result<(), IggyError> { + async fn commit_messages(&self, namespace: &IggyNamespace) -> Result<(), IggyError> { let write_lock = self .get_by_ns(namespace) .expect("commit_messages: partition not found for namespace") @@ -802,10 +781,8 @@ where journal_info.messages_count >= self.config.messages_required_to_save; let unsaved_messages_size_exceeded = journal_info.size.as_bytes_u64() >= self.config.size_of_messages_required_to_save.as_bytes_u64(); - let should_persist = force_persist - || is_full - || unsaved_messages_count_exceeded - || unsaved_messages_size_exceeded; + let should_persist = + is_full || unsaved_messages_count_exceeded || unsaved_messages_size_exceeded; if !should_persist { return Ok(()); @@ -1002,7 +979,7 @@ where match prepare_header.operation { Operation::SendMessages => { if committed_ns.insert(entry_namespace) - && let Err(error) = self.commit_messages(&entry_namespace, false).await + && let Err(error) = self.commit_messages(&entry_namespace).await { failed_ns.insert(entry_namespace); warn!( @@ -1074,15 +1051,37 @@ where .messages_writers() .last() .and_then(|writer| writer.as_ref()) - .cloned() - .ok_or(IggyError::CannotSaveMessagesToSegment)?; + .cloned(); let index_writer = partition .log .index_writers() .last() .and_then(|writer| writer.as_ref()) - .cloned() - .ok_or(IggyError::CannotSaveIndexToSegment)?; + .cloned(); + + if messages_writer.is_none() || index_writer.is_none() { + let saved_bytes = stripped_batches.iter().map(Frozen::len).sum::<usize>(); + debug!( + target: "iggy.partitions.diag", + plane = "partitions", + namespace_raw = namespace.inner(), + batch_count, + saved_bytes, + "simulated in-memory batch persistence" + ); + + let partition = self + .get_mut_by_ns(namespace) + .expect("persist: partition not found"); + let segment_index = partition.log.segments().len() - 1; + let segment = &mut partition.log.segments_mut()[segment_index]; + segment.size = IggyByteSize::from(segment.size.as_bytes_u64() + saved_bytes as u64); + partition.log.clear_in_flight(); + return Ok(()); + } + + let messages_writer = messages_writer.expect("checked above"); + let index_writer = index_writer.expect("checked above"); let saved = messages_writer .save_frozen_batches(&stripped_batches) diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs index 85e6242ee..fd0b0fceb 100644 --- a/core/partitions/src/journal.rs +++ b/core/partitions/src/journal.rs @@ -218,6 +218,17 @@ impl<S> PartitionJournal<S> where S: Storage<Buffer = JournalBuffer>, { + #[must_use] + pub fn with_storage(storage: S) -> Self { + Self { + op_to_storage_offset: UnsafeCell::new(BTreeMap::new()), + offset_to_op: UnsafeCell::new(BTreeMap::new()), + timestamp_to_op: UnsafeCell::new(BTreeMap::new()), + headers: UnsafeCell::new(Vec::new()), + inner: UnsafeCell::new(JournalInner { storage }), + } + } + #[allow(dead_code)] fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> { match query { diff --git a/core/simulator/Cargo.toml b/core/simulator/Cargo.toml index 7cfd90b43..7c3c93c74 100644 --- a/core/simulator/Cargo.toml +++ b/core/simulator/Cargo.toml @@ -22,6 +22,7 @@ edition = "2024" [dependencies] bytemuck = { workspace = true } +bytes = { workspace = true } consensus = { path = "../consensus" } enumset = { workspace = true } futures = { workspace = true } diff --git a/core/simulator/src/client.rs b/core/simulator/src/client.rs index 22c2024aa..0aac8dd57 100644 --- a/core/simulator/src/client.rs +++ b/core/simulator/src/client.rs @@ -15,10 +15,14 @@ // specific language governing permissions and limitations // under the License. +use bytes::Bytes; use iggy_binary_protocol::{Message, Operation, RequestHeader}; +use iggy_common::send_messages2::{ + IggyMessage2, IggyMessage2Header, IggyMessages2, SendMessages2Owned, +}; use iggy_common::{ - BytesSerializable, IGGY_MESSAGE_HEADER_SIZE, INDEX_SIZE, Identifier, - create_stream::CreateStream, delete_stream::DeleteStream, sharding::IggyNamespace, + BytesSerializable, Identifier, create_stream::CreateStream, delete_stream::DeleteStream, + sharding::IggyNamespace, }; use iobuf::Owned; use std::cell::Cell; @@ -70,40 +74,25 @@ impl SimClient { namespace: IggyNamespace, messages: &[&[u8]], ) -> Message<RequestHeader> { - let count = messages.len() as u32; - let mut indexes = Vec::with_capacity(count as usize * INDEX_SIZE); - let mut messages_buf = Vec::new(); - - let mut current_position = 0u32; - for (i, msg) in messages.iter().enumerate() { - let msg_total_len = (IGGY_MESSAGE_HEADER_SIZE + msg.len()) as u32; - - // Index: offset(u32) + position(u32) + timestamp(u64) - indexes.extend_from_slice(&(i as u32).to_le_bytes()); // offset (relative) - indexes.extend_from_slice(¤t_position.to_le_bytes()); // position - indexes.extend_from_slice(&0u64.to_le_bytes()); // timestamp (set in prepare) - - // Message header (64 bytes) - messages_buf.extend_from_slice(&0u64.to_le_bytes()); // checksum - messages_buf.extend_from_slice(&0u128.to_le_bytes()); // id - messages_buf.extend_from_slice(&0u64.to_le_bytes()); // offset - messages_buf.extend_from_slice(&0u64.to_le_bytes()); // timestamp - messages_buf.extend_from_slice(&0u64.to_le_bytes()); // origin_timestamp - messages_buf.extend_from_slice(&0u32.to_le_bytes()); // user_headers_length - messages_buf.extend_from_slice(&(msg.len() as u32).to_le_bytes()); // payload_length - messages_buf.extend_from_slice(&0u64.to_le_bytes()); // reserved - - // Payload - messages_buf.extend_from_slice(msg); - current_position += msg_total_len; + let mut batch = IggyMessages2::with_capacity(messages.len()); + for message in messages { + batch.push(IggyMessage2 { + header: IggyMessage2Header { + payload_length: message.len() as u32, + ..Default::default() + }, + payload: Bytes::copy_from_slice(message), + user_headers: None, + }); } - let mut payload = Vec::with_capacity(4 + indexes.len() + messages_buf.len()); - payload.extend_from_slice(&count.to_le_bytes()); - payload.extend_from_slice(&indexes); - payload.extend_from_slice(&messages_buf); - - self.build_request_with_namespace(Operation::SendMessages, &payload, namespace) + let batch = SendMessages2Owned::from_messages(namespace, &batch) + .expect("simulator must build a valid send_messages2 batch"); + let total_size = std::mem::size_of::<RequestHeader>() + batch.header.total_size(); + let request_header = self.request_header(Operation::SendMessages, namespace, total_size); + batch + .encode_request(request_header) + .expect("simulator must build a valid send_messages2 request") } pub fn store_consumer_offset( @@ -131,24 +120,7 @@ impl SimClient { let header_size = std::mem::size_of::<RequestHeader>(); let total_size = header_size + payload.len(); - let header = RequestHeader { - command: iggy_binary_protocol::Command2::Request, - operation, - size: total_size as u32, - cluster: 0, - checksum: 0, - checksum_body: 0, - view: 0, - release: 0, - replica: 0, - reserved_frame: [0; 66], - client: self.client_id, - request_checksum: 0, - timestamp: 0, - request: self.next_request_number(), - namespace: namespace.inner(), - ..Default::default() - }; + let header = self.request_header(operation, namespace, total_size); let header_bytes = bytemuck::bytes_of(&header); let mut buffer = Vec::with_capacity(total_size); @@ -190,4 +162,31 @@ impl SimClient { Message::try_from(Owned::<4096>::copy_from_slice(&buffer)) .expect("request buffer must contain a valid request message") } + + #[allow(clippy::cast_possible_truncation)] + fn request_header( + &self, + operation: Operation, + namespace: IggyNamespace, + total_size: usize, + ) -> RequestHeader { + RequestHeader { + command: iggy_binary_protocol::Command2::Request, + operation, + size: total_size as u32, + cluster: 0, + checksum: 0, + checksum_body: 0, + view: 0, + release: 0, + replica: 0, + reserved_frame: [0; 66], + client: self.client_id, + request_checksum: 0, + timestamp: 0, + request: self.next_request_number(), + namespace: namespace.inner(), + ..Default::default() + } + } }
