hubcio commented on code in PR #3020:
URL: https://github.com/apache/iggy/pull/3020#discussion_r2983101218


##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -350,6 +398,17 @@ where
             .expect("on_request: consensus not initialized");
 
         debug!(?namespace, "handling partition request");
+        let message = if message.header().operation == Operation::SendMessages 
{
+            match convert_request_message(namespace, message) {
+                Ok(message) => message,
+                Err(error) => {
+                    warn!(?namespace, %error, "on_request: failed to convert 
SendMessages");
+                    return;
+                }
+            }
+        } else {
+            message
+        };
         let prepare = message.project(consensus);
         pipeline_prepare_common(consensus, prepare, |prepare| 
self.on_replicate(prepare)).await;
     }

Review Comment:
   re: line 449 - `apply_replicated_operation` is called unconditionally, even 
when `fence_old_prepare_by_commit` returned `true` at line 434 (which only 
gates `self.replicate()`, not apply). `append_messages` is not idempotent - it 
assigns a new `dirty_offset` and fresh `batch_timestamp` on every call. after a 
VSR view change, the new primary replays from `max_commit+1` and re-executes, 
producing duplicate partition data with different offsets.
   
   the comment at lines 487-496 acknowledges this and lists two required guards:
   1. op-based dedup: check 
`journal.op_to_storage_offset.contains_key(&header.op)` before appending
   2. client reply dedup by `(client_id, request_id)`
   
   the `op_to_storage_offset` BTreeMap already exists - the dedup check is 
straightforward.



##########
core/partitions/src/journal.rs:
##########
@@ -373,26 +395,14 @@ where
 

Review Comment:
   re: line 387 - `BTreeMap::insert` at `timestamp_to_op.insert(timestamp, op)` 
overwrites the previous value. two batches arriving in the same microsecond get 
the same `base_timestamp` (assigned from `IggyTimestamp::now().as_micros()` at 
iggy_partition.rs:91). the earlier batch's op mapping is silently lost.
   
   `candidate_start_op` for timestamp queries uses 
`range(..=timestamp).next_back()` - if the overwritten mapping pointed to an 
earlier op, a timestamp query could start iteration past it, making the earlier 
batch unreachable via timestamp-based polling.
   
   offset-based queries are unaffected (keyed by unique offset). fix with 
composite key `(timestamp, op)` or `BTreeMap<u64, SmallVec<[u64; 1]>>`.



##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -67,6 +73,12 @@ pub struct IggyPartitions<C> {
     consensus: Option<C>,
 }
 
+fn freeze_client_reply(message: Message<GenericHeader>) -> ClientBuffers {
+    let owned = unsafe { message.into_inner().try_merge() }
+        .expect("client reply expects a mergeable message buffer");
+    vec![owned.into()]
+}
+
 impl<C> IggyPartitions<C> {
     #[must_use]
     pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self {

Review Comment:
   re: `partitions_mut()` at line 116 - returns `&mut Vec<IggyPartition>` from 
`&self` via `UnsafeCell`. the `#[allow(clippy::mut_from_ref)]` suppresses the 
lint but doesn't make it sound - under Rust's aliasing rules, creating multiple 
`&mut` references to the same data is UB regardless of threading model.
   
   current usage is sequential (traced all call sites - borrows don't overlap 
across `.await` points), so it works in practice. but the API permits aliasing, 
and any future code adding a second `get_mut_by_ns` call while a first `&mut` 
is alive would silently introduce UB.
   
   consider `RefCell` for runtime borrow checking (at least in debug builds) or 
restructure so mutation requires `&mut self`.



##########
core/partitions/src/journal.rs:
##########
@@ -335,18 +346,29 @@ where
     }
 
     async fn append(&self, entry: Self::Entry) {
-        let first_offset_and_timestamp = Self::message_to_batch(&entry)
-            .and_then(|batch| Some((batch.first_offset()?, 
batch.first_timestamp()?)));
-
         let header = *entry.header();
         let op = header.op;
+        let owned = unsafe { entry.into_inner().try_merge() }

Review Comment:
   also at `iggy_partitions.rs:766` (commit loop).
   
   `try_merge()` reconstitutes `Owned` from `(Prefix, Frozen)` on every append 
and every commit entry. the fields being read (`base_offset` at byte 264, 
`base_timestamp` at byte 272) are within the 512-byte prefix - they could be 
read directly without merging.
   
   in the common case (same control block, refcount==2) the merge is just 
atomic ops + pointer gymnastics, but when the prefix was COW-detached it 
triggers a `copy_from_slice` of up to 512 bytes. redesigning the journal to 
store `(Prefix, Frozen)` pairs and decode from the split representation would 
eliminate this.



##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -847,20 +931,43 @@ where
         )
         .await
         .expect("Failed to create segment storage");
-
-        // Clear old segment's indexes.
-        // TODO: Waiting for issue to move server config to shared module.
-        // Once complete, conditionally clear based on cache_indexes config:
-        // if !matches!(self.config.cache_indexes, CacheIndexesConfig::All) {
-        //     partition.log.indexes_mut()[old_segment_index] = None;
-        // }
-        partition.log.indexes_mut()[old_segment_index] = None;
+        let messages_size_bytes = storage
+            .messages_writer
+            .as_ref()
+            .expect("Messages writer not initialized")
+            .size_counter();
+        let messages_writer = Rc::new(
+            MessagesWriter::new(
+                &messages_path,
+                messages_size_bytes,
+                self.config.enforce_fsync,
+                false,
+            )
+            .await
+            .expect("Failed to create messages writer"),
+        );
+        let index_writer = Rc::new(
+            IggyIndexWriter::new(
+                &index_path,
+                Rc::new(std::sync::atomic::AtomicU64::new(0)),
+                false,
+            )
+            .await
+            .expect("Failed to create sparse index writer"),
+        );
 
         // Close writers for the sealed segment.
         let old_storage = &mut partition.log.storages_mut()[old_segment_index];
         let _ = old_storage.shutdown();
-
-        partition.log.add_persisted_segment(segment, storage);
+        partition.log.messages_writers_mut()[old_segment_index] = None;
+        partition.log.index_writers_mut()[old_segment_index] = None;
+
+        partition.log.add_persisted_segment(
+            segment,
+            storage,
+            Some(messages_writer),
+            Some(index_writer),
+        );
         partition.stats.increment_segments_count(1);
 
         debug!(?namespace, start_offset, "rotated to new segment");

Review Comment:
   re: `send_prepare_ok` at line 976 - the TODO at line 980 acknowledges the 
journal isn't verified durable before acking. in VSR, a replica must persist 
the prepare entry before sending PrepareOk - otherwise a power loss after ack 
but before disk flush means the replica promised durability it can't deliver.
   
   concrete failure: client told "committed" after quorum ack, power kills 2 of 
3 replicas before disk flush, data is gone. the fix is either per-prepare 
fdatasync (correct but expensive) or batched fsync with a flush-before-ack 
barrier.



##########
core/partitions/src/send_messages2.rs:
##########
@@ -0,0 +1,633 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{BufMut, Bytes, BytesMut};
+use iggy_common::{
+    INDEX_SIZE, IggyError, calculate_checksum,
+    header::{PrepareHeader, RequestHeader},
+    message::Message,
+    random_id,
+    sharding::IggyNamespace,
+};
+use iobuf::{Owned, TryMerge};
+
+const MESSAGE_ALIGN: usize = 4096;
+pub const COMMAND_HEADER_SIZE: usize = 256;
+pub const PREPARE_SPLIT_POINT: usize = 512;
+const MESSAGE_HEADER_SIZE: usize = 48;
+const LEGACY_MESSAGE_HEADER_SIZE: usize = 64;
+#[derive(Debug, Clone, Copy, Default)]
+pub struct SendMessages2Header {
+    pub partition_id: u64,
+    pub base_offset: u64,
+    pub base_timestamp: u64,
+    pub origin_timestamp: u64,
+    pub batch_length: u64,
+}
+
+impl SendMessages2Header {
+    pub const fn new(partition_id: u64, origin_timestamp: u64, batch_length: 
u64) -> Self {
+        Self {
+            partition_id,
+            base_offset: 0,
+            base_timestamp: 0,
+            origin_timestamp,
+            batch_length,
+        }
+    }
+
+    pub fn decode(bytes: &[u8]) -> Result<Self, IggyError> {
+        if bytes.len() < COMMAND_HEADER_SIZE {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        let batch_length = read_u64(bytes, 32)?;
+        if batch_length < COMMAND_HEADER_SIZE as u64 {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        Ok(Self {
+            partition_id: read_u64(bytes, 0)?,
+            base_offset: read_u64(bytes, 8)?,
+            base_timestamp: read_u64(bytes, 16)?,
+            origin_timestamp: read_u64(bytes, 24)?,
+            batch_length,
+        })
+    }
+
+    pub fn encode_into(&self, bytes: &mut [u8]) {

Review Comment:
   per-message checksums (line 155) cover the message header tail + 
user_headers + payload, but `base_offset` and `base_timestamp` in the batch 
header are not covered by any checksum. a single bit flip in `base_offset` 
silently corrupts every message's absolute offset (since per-message offsets 
are delta-encoded from it), and no checksum would detect it.
   
   add a batch-level CRC32C covering the full batch header (bytes 0-39 at 
minimum) before the first message. particularly important because the batch 
header is mutated in-place during `stamp_prepare_for_persistence` (line 
509-511) - any bug in that stamping path would also go undetected.



##########
core/partitions/src/send_messages2.rs:
##########
@@ -0,0 +1,633 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{BufMut, Bytes, BytesMut};
+use iggy_common::{
+    INDEX_SIZE, IggyError, calculate_checksum,
+    header::{PrepareHeader, RequestHeader},
+    message::Message,
+    random_id,
+    sharding::IggyNamespace,
+};
+use iobuf::{Owned, TryMerge};
+
+const MESSAGE_ALIGN: usize = 4096;
+pub const COMMAND_HEADER_SIZE: usize = 256;
+pub const PREPARE_SPLIT_POINT: usize = 512;
+const MESSAGE_HEADER_SIZE: usize = 48;
+const LEGACY_MESSAGE_HEADER_SIZE: usize = 64;
+#[derive(Debug, Clone, Copy, Default)]
+pub struct SendMessages2Header {
+    pub partition_id: u64,
+    pub base_offset: u64,
+    pub base_timestamp: u64,
+    pub origin_timestamp: u64,
+    pub batch_length: u64,
+}
+
+impl SendMessages2Header {
+    pub const fn new(partition_id: u64, origin_timestamp: u64, batch_length: 
u64) -> Self {
+        Self {
+            partition_id,
+            base_offset: 0,
+            base_timestamp: 0,
+            origin_timestamp,
+            batch_length,
+        }
+    }
+
+    pub fn decode(bytes: &[u8]) -> Result<Self, IggyError> {
+        if bytes.len() < COMMAND_HEADER_SIZE {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        let batch_length = read_u64(bytes, 32)?;
+        if batch_length < COMMAND_HEADER_SIZE as u64 {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        Ok(Self {
+            partition_id: read_u64(bytes, 0)?,
+            base_offset: read_u64(bytes, 8)?,
+            base_timestamp: read_u64(bytes, 16)?,
+            origin_timestamp: read_u64(bytes, 24)?,
+            batch_length,
+        })
+    }
+
+    pub fn encode_into(&self, bytes: &mut [u8]) {
+        assert!(bytes.len() >= COMMAND_HEADER_SIZE);
+        bytes[..COMMAND_HEADER_SIZE].fill(0);
+        bytes[0..8].copy_from_slice(&self.partition_id.to_le_bytes());
+        bytes[8..16].copy_from_slice(&self.base_offset.to_le_bytes());
+        bytes[16..24].copy_from_slice(&self.base_timestamp.to_le_bytes());
+        bytes[24..32].copy_from_slice(&self.origin_timestamp.to_le_bytes());
+        bytes[32..40].copy_from_slice(&self.batch_length.to_le_bytes());
+    }
+
+    pub fn total_size(&self) -> usize {
+        usize::try_from(self.batch_length).expect("batch length exceeds 
usize::MAX")
+    }
+
+    pub fn blob_len(&self) -> Result<usize, IggyError> {
+        usize::try_from(
+            self.batch_length
+                .checked_sub(COMMAND_HEADER_SIZE as u64)
+                .ok_or(IggyError::InvalidCommand)?,
+        )
+        .map_err(|_| IggyError::InvalidCommand)
+    }
+
+    pub fn into_frozen(self) -> FrozenBatchHeader {
+        let mut buffer = Owned::<MESSAGE_ALIGN>::zeroed(COMMAND_HEADER_SIZE);
+        self.encode_into(buffer.as_mut_slice());
+        buffer.into()
+    }
+}
+
+#[derive(Debug, Clone)]
+pub struct SendMessages2Owned {
+    pub header: SendMessages2Header,
+    pub blob: Bytes,
+}
+
+impl SendMessages2Owned {
+    pub fn from_legacy_request(namespace: IggyNamespace, body: &[u8]) -> 
Result<Self, IggyError> {
+        let (message_count, messages) = legacy_messages_slice(body)?;
+        let mut parsed = Vec::with_capacity(message_count as usize);
+        let mut origin_timestamp = u64::MAX;
+        let mut cursor = 0usize;
+
+        while cursor < messages.len() && parsed.len() < message_count as usize 
{
+            let legacy = LegacyMessageRef::decode(&messages[cursor..])?;
+            origin_timestamp = origin_timestamp.min(legacy.origin_timestamp);
+            cursor += legacy.total_size;
+            parsed.push(legacy);
+        }
+
+        if parsed.len() != message_count as usize || cursor != messages.len() {
+            return Err(IggyError::InvalidCommand);
+        }
+
+        if origin_timestamp == u64::MAX {
+            origin_timestamp = 0;
+        }
+
+        let mut blob = BytesMut::with_capacity(messages.len());
+        for (index, legacy) in parsed.iter().enumerate() {
+            let id = if legacy.id == 0 {
+                random_id::get_uuid()
+            } else {
+                legacy.id
+            };
+            let offset_delta = u32::try_from(index).map_err(|_| 
IggyError::InvalidCommand)?;
+            let timestamp_delta = legacy

Review Comment:
   `timestamp_delta` is `u32` representing microseconds delta from 
`origin_timestamp`. `u32::MAX` = ~4295 seconds = ~71.6 minutes. a batch 
spanning more than ~72 minutes of client-side `origin_timestamp` is rejected 
with a generic `IggyError::InvalidCommand` at line 142.
   
   this is a wire format constant - once deployed, it can't be changed without 
protocol versioning. the error is graceful (not silent), but the limit is 
undocumented and the error gives zero indication of what went wrong. at minimum 
document the ~72 minute maximum batch span on `timestamp_delta`, and return a 
more specific error variant.



##########
core/partitions/src/journal.rs:
##########
@@ -401,3 +411,85 @@ where
         }
     }
 }
+
+fn select_batch_slice(
+    batch: &crate::send_messages2::SendMessages2Ref<'_>,
+    query: MessageLookup,
+    already_matched: u32,
+) -> Option<SelectedBatchSlice> {
+    let remaining = query.count().saturating_sub(already_matched);
+    let batch_message_count = batch.message_count().ok()?;
+    if remaining == 0 || batch_message_count == 0 {
+        return None;
+    }
+
+    if matches!(query, MessageLookup::Timestamp { timestamp, .. } if 
batch.header.base_timestamp < timestamp)

Review Comment:
   the timestamp filter at lines 426-429 skips the entire batch if 
`batch.header.base_timestamp < timestamp`, then line 444 returns `true` for all 
remaining messages without per-message timestamp filtering. messages within a 
kept batch may have `origin_timestamp` (via `timestamp_delta`) earlier than the 
query timestamp.
   
   for offset-based queries this doesn't matter (offset_delta is monotonic 
within a batch). but for timestamp queries, this can return messages with 
timestamps earlier than requested.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to