hubcio commented on code in PR #3020:
URL: https://github.com/apache/iggy/pull/3020#discussion_r2983101218
##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -350,6 +398,17 @@ where
.expect("on_request: consensus not initialized");
debug!(?namespace, "handling partition request");
+ let message = if message.header().operation == Operation::SendMessages
{
+ match convert_request_message(namespace, message) {
+ Ok(message) => message,
+ Err(error) => {
+ warn!(?namespace, %error, "on_request: failed to convert
SendMessages");
+ return;
+ }
+ }
+ } else {
+ message
+ };
let prepare = message.project(consensus);
pipeline_prepare_common(consensus, prepare, |prepare|
self.on_replicate(prepare)).await;
}
Review Comment:
re: line 449 - `apply_replicated_operation` is called unconditionally, even
when `fence_old_prepare_by_commit` returned `true` at line 434 (which only
gates `self.replicate()`, not apply). `append_messages` is not idempotent - it
assigns a new `dirty_offset` and fresh `batch_timestamp` on every call. after a
VSR view change, the new primary replays from `max_commit+1` and re-executes,
producing duplicate partition data with different offsets.
the comment at lines 487-496 acknowledges this and lists two required guards:
1. op-based dedup: check
`journal.op_to_storage_offset.contains_key(&header.op)` before appending
2. client reply dedup by `(client_id, request_id)`
the `op_to_storage_offset` BTreeMap already exists - the dedup check is
straightforward.
##########
core/partitions/src/journal.rs:
##########
@@ -373,26 +395,14 @@ where
Review Comment:
re: line 387 - `BTreeMap::insert` at `timestamp_to_op.insert(timestamp, op)`
overwrites the previous value. two batches arriving in the same microsecond get
the same `base_timestamp` (assigned from `IggyTimestamp::now().as_micros()` at
iggy_partition.rs:91). the earlier batch's op mapping is silently lost.
`candidate_start_op` for timestamp queries uses
`range(..=timestamp).next_back()` - if the overwritten mapping pointed to an
earlier op, a timestamp query could start iteration past it, making the earlier
batch unreachable via timestamp-based polling.
offset-based queries are unaffected (keyed by unique offset). fix with
composite key `(timestamp, op)` or `BTreeMap<u64, SmallVec<[u64; 1]>>`.
##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -67,6 +73,12 @@ pub struct IggyPartitions<C> {
consensus: Option<C>,
}
+fn freeze_client_reply(message: Message<GenericHeader>) -> ClientBuffers {
+ let owned = unsafe { message.into_inner().try_merge() }
+ .expect("client reply expects a mergeable message buffer");
+ vec![owned.into()]
+}
+
impl<C> IggyPartitions<C> {
#[must_use]
pub fn new(shard_id: ShardId, config: PartitionsConfig) -> Self {
Review Comment:
re: `partitions_mut()` at line 116 - returns `&mut Vec<IggyPartition>` from
`&self` via `UnsafeCell`. the `#[allow(clippy::mut_from_ref)]` suppresses the
lint but doesn't make it sound - under Rust's aliasing rules, creating multiple
`&mut` references to the same data is UB regardless of threading model.
current usage is sequential (traced all call sites - borrows don't overlap
across `.await` points), so it works in practice. but the API permits aliasing,
and any future code adding a second `get_mut_by_ns` call while a first `&mut`
is alive would silently introduce UB.
consider `RefCell` for runtime borrow checking (at least in debug builds) or
restructure so mutation requires `&mut self`.
##########
core/partitions/src/journal.rs:
##########
@@ -335,18 +346,29 @@ where
}
async fn append(&self, entry: Self::Entry) {
- let first_offset_and_timestamp = Self::message_to_batch(&entry)
- .and_then(|batch| Some((batch.first_offset()?,
batch.first_timestamp()?)));
-
let header = *entry.header();
let op = header.op;
+ let owned = unsafe { entry.into_inner().try_merge() }
Review Comment:
also at `iggy_partitions.rs:766` (commit loop).
`try_merge()` reconstitutes `Owned` from `(Prefix, Frozen)` on every append
and every commit entry. the fields being read (`base_offset` at byte 264,
`base_timestamp` at byte 272) are within the 512-byte prefix - they could be
read directly without merging.
in the common case (same control block, refcount==2) the merge is just
atomic ops + pointer gymnastics, but when the prefix was COW-detached it
triggers a `copy_from_slice` of up to 512 bytes. redesigning the journal to
store `(Prefix, Frozen)` pairs and decode from the split representation would
eliminate this.
##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -847,20 +931,43 @@ where
)
.await
.expect("Failed to create segment storage");
-
- // Clear old segment's indexes.
- // TODO: Waiting for issue to move server config to shared module.
- // Once complete, conditionally clear based on cache_indexes config:
- // if !matches!(self.config.cache_indexes, CacheIndexesConfig::All) {
- // partition.log.indexes_mut()[old_segment_index] = None;
- // }
- partition.log.indexes_mut()[old_segment_index] = None;
+ let messages_size_bytes = storage
+ .messages_writer
+ .as_ref()
+ .expect("Messages writer not initialized")
+ .size_counter();
+ let messages_writer = Rc::new(
+ MessagesWriter::new(
+ &messages_path,
+ messages_size_bytes,
+ self.config.enforce_fsync,
+ false,
+ )
+ .await
+ .expect("Failed to create messages writer"),
+ );
+ let index_writer = Rc::new(
+ IggyIndexWriter::new(
+ &index_path,
+ Rc::new(std::sync::atomic::AtomicU64::new(0)),
+ false,
+ )
+ .await
+ .expect("Failed to create sparse index writer"),
+ );
// Close writers for the sealed segment.
let old_storage = &mut partition.log.storages_mut()[old_segment_index];
let _ = old_storage.shutdown();
-
- partition.log.add_persisted_segment(segment, storage);
+ partition.log.messages_writers_mut()[old_segment_index] = None;
+ partition.log.index_writers_mut()[old_segment_index] = None;
+
+ partition.log.add_persisted_segment(
+ segment,
+ storage,
+ Some(messages_writer),
+ Some(index_writer),
+ );
partition.stats.increment_segments_count(1);
debug!(?namespace, start_offset, "rotated to new segment");
Review Comment:
re: `send_prepare_ok` at line 976 - the TODO at line 980 acknowledges the
journal isn't verified durable before acking. in VSR, a replica must persist
the prepare entry before sending PrepareOk - otherwise a power loss after ack
but before disk flush means the replica promised durability it can't deliver.
concrete failure: client told "committed" after quorum ack, power kills 2 of
3 replicas before disk flush, data is gone. the fix is either per-prepare
fdatasync (correct but expensive) or batched fsync with a flush-before-ack
barrier.
##########
core/partitions/src/send_messages2.rs:
##########
@@ -0,0 +1,633 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{BufMut, Bytes, BytesMut};
+use iggy_common::{
+ INDEX_SIZE, IggyError, calculate_checksum,
+ header::{PrepareHeader, RequestHeader},
+ message::Message,
+ random_id,
+ sharding::IggyNamespace,
+};
+use iobuf::{Owned, TryMerge};
+
+const MESSAGE_ALIGN: usize = 4096;
+pub const COMMAND_HEADER_SIZE: usize = 256;
+pub const PREPARE_SPLIT_POINT: usize = 512;
+const MESSAGE_HEADER_SIZE: usize = 48;
+const LEGACY_MESSAGE_HEADER_SIZE: usize = 64;
+#[derive(Debug, Clone, Copy, Default)]
+pub struct SendMessages2Header {
+ pub partition_id: u64,
+ pub base_offset: u64,
+ pub base_timestamp: u64,
+ pub origin_timestamp: u64,
+ pub batch_length: u64,
+}
+
+impl SendMessages2Header {
+ pub const fn new(partition_id: u64, origin_timestamp: u64, batch_length:
u64) -> Self {
+ Self {
+ partition_id,
+ base_offset: 0,
+ base_timestamp: 0,
+ origin_timestamp,
+ batch_length,
+ }
+ }
+
+ pub fn decode(bytes: &[u8]) -> Result<Self, IggyError> {
+ if bytes.len() < COMMAND_HEADER_SIZE {
+ return Err(IggyError::InvalidCommand);
+ }
+
+ let batch_length = read_u64(bytes, 32)?;
+ if batch_length < COMMAND_HEADER_SIZE as u64 {
+ return Err(IggyError::InvalidCommand);
+ }
+
+ Ok(Self {
+ partition_id: read_u64(bytes, 0)?,
+ base_offset: read_u64(bytes, 8)?,
+ base_timestamp: read_u64(bytes, 16)?,
+ origin_timestamp: read_u64(bytes, 24)?,
+ batch_length,
+ })
+ }
+
+ pub fn encode_into(&self, bytes: &mut [u8]) {
Review Comment:
per-message checksums (line 155) cover the message header tail +
user_headers + payload, but `base_offset` and `base_timestamp` in the batch
header are not covered by any checksum. a single bit flip in `base_offset`
silently corrupts every message's absolute offset (since per-message offsets
are delta-encoded from it), and no checksum would detect it.
add a batch-level CRC32C covering the full batch header (bytes 0-39 at
minimum) before the first message. particularly important because the batch
header is mutated in-place during `stamp_prepare_for_persistence` (line
509-511) - any bug in that stamping path would also go undetected.
##########
core/partitions/src/send_messages2.rs:
##########
@@ -0,0 +1,633 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use bytes::{BufMut, Bytes, BytesMut};
+use iggy_common::{
+ INDEX_SIZE, IggyError, calculate_checksum,
+ header::{PrepareHeader, RequestHeader},
+ message::Message,
+ random_id,
+ sharding::IggyNamespace,
+};
+use iobuf::{Owned, TryMerge};
+
+const MESSAGE_ALIGN: usize = 4096;
+pub const COMMAND_HEADER_SIZE: usize = 256;
+pub const PREPARE_SPLIT_POINT: usize = 512;
+const MESSAGE_HEADER_SIZE: usize = 48;
+const LEGACY_MESSAGE_HEADER_SIZE: usize = 64;
+#[derive(Debug, Clone, Copy, Default)]
+pub struct SendMessages2Header {
+ pub partition_id: u64,
+ pub base_offset: u64,
+ pub base_timestamp: u64,
+ pub origin_timestamp: u64,
+ pub batch_length: u64,
+}
+
+impl SendMessages2Header {
+ pub const fn new(partition_id: u64, origin_timestamp: u64, batch_length:
u64) -> Self {
+ Self {
+ partition_id,
+ base_offset: 0,
+ base_timestamp: 0,
+ origin_timestamp,
+ batch_length,
+ }
+ }
+
+ pub fn decode(bytes: &[u8]) -> Result<Self, IggyError> {
+ if bytes.len() < COMMAND_HEADER_SIZE {
+ return Err(IggyError::InvalidCommand);
+ }
+
+ let batch_length = read_u64(bytes, 32)?;
+ if batch_length < COMMAND_HEADER_SIZE as u64 {
+ return Err(IggyError::InvalidCommand);
+ }
+
+ Ok(Self {
+ partition_id: read_u64(bytes, 0)?,
+ base_offset: read_u64(bytes, 8)?,
+ base_timestamp: read_u64(bytes, 16)?,
+ origin_timestamp: read_u64(bytes, 24)?,
+ batch_length,
+ })
+ }
+
+ pub fn encode_into(&self, bytes: &mut [u8]) {
+ assert!(bytes.len() >= COMMAND_HEADER_SIZE);
+ bytes[..COMMAND_HEADER_SIZE].fill(0);
+ bytes[0..8].copy_from_slice(&self.partition_id.to_le_bytes());
+ bytes[8..16].copy_from_slice(&self.base_offset.to_le_bytes());
+ bytes[16..24].copy_from_slice(&self.base_timestamp.to_le_bytes());
+ bytes[24..32].copy_from_slice(&self.origin_timestamp.to_le_bytes());
+ bytes[32..40].copy_from_slice(&self.batch_length.to_le_bytes());
+ }
+
+ pub fn total_size(&self) -> usize {
+ usize::try_from(self.batch_length).expect("batch length exceeds
usize::MAX")
+ }
+
+ pub fn blob_len(&self) -> Result<usize, IggyError> {
+ usize::try_from(
+ self.batch_length
+ .checked_sub(COMMAND_HEADER_SIZE as u64)
+ .ok_or(IggyError::InvalidCommand)?,
+ )
+ .map_err(|_| IggyError::InvalidCommand)
+ }
+
+ pub fn into_frozen(self) -> FrozenBatchHeader {
+ let mut buffer = Owned::<MESSAGE_ALIGN>::zeroed(COMMAND_HEADER_SIZE);
+ self.encode_into(buffer.as_mut_slice());
+ buffer.into()
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct SendMessages2Owned {
+ pub header: SendMessages2Header,
+ pub blob: Bytes,
+}
+
+impl SendMessages2Owned {
+ pub fn from_legacy_request(namespace: IggyNamespace, body: &[u8]) ->
Result<Self, IggyError> {
+ let (message_count, messages) = legacy_messages_slice(body)?;
+ let mut parsed = Vec::with_capacity(message_count as usize);
+ let mut origin_timestamp = u64::MAX;
+ let mut cursor = 0usize;
+
+ while cursor < messages.len() && parsed.len() < message_count as usize
{
+ let legacy = LegacyMessageRef::decode(&messages[cursor..])?;
+ origin_timestamp = origin_timestamp.min(legacy.origin_timestamp);
+ cursor += legacy.total_size;
+ parsed.push(legacy);
+ }
+
+ if parsed.len() != message_count as usize || cursor != messages.len() {
+ return Err(IggyError::InvalidCommand);
+ }
+
+ if origin_timestamp == u64::MAX {
+ origin_timestamp = 0;
+ }
+
+ let mut blob = BytesMut::with_capacity(messages.len());
+ for (index, legacy) in parsed.iter().enumerate() {
+ let id = if legacy.id == 0 {
+ random_id::get_uuid()
+ } else {
+ legacy.id
+ };
+ let offset_delta = u32::try_from(index).map_err(|_|
IggyError::InvalidCommand)?;
+ let timestamp_delta = legacy
Review Comment:
`timestamp_delta` is `u32` representing microseconds delta from
`origin_timestamp`. `u32::MAX` = ~4295 seconds = ~71.6 minutes. a batch
spanning more than ~72 minutes of client-side `origin_timestamp` is rejected
with a generic `IggyError::InvalidCommand` at line 142.
this is a wire format constant - once deployed, it can't be changed without
protocol versioning. the error is graceful (not silent), but the limit is
undocumented and the error gives zero indication of what went wrong. at minimum
document the ~72 minute maximum batch span on `timestamp_delta`, and return a
more specific error variant.
##########
core/partitions/src/journal.rs:
##########
@@ -401,3 +411,85 @@ where
}
}
}
+
+fn select_batch_slice(
+ batch: &crate::send_messages2::SendMessages2Ref<'_>,
+ query: MessageLookup,
+ already_matched: u32,
+) -> Option<SelectedBatchSlice> {
+ let remaining = query.count().saturating_sub(already_matched);
+ let batch_message_count = batch.message_count().ok()?;
+ if remaining == 0 || batch_message_count == 0 {
+ return None;
+ }
+
+ if matches!(query, MessageLookup::Timestamp { timestamp, .. } if
batch.header.base_timestamp < timestamp)
Review Comment:
the timestamp filter at lines 426-429 skips the entire batch if
`batch.header.base_timestamp < timestamp`, then line 444 returns `true` for all
remaining messages without per-message timestamp filtering. messages within a
kept batch may have `origin_timestamp` (via `timestamp_delta`) earlier than the
query timestamp.
for offset-based queries this doesn't matter (offset_delta is monotonic
within a batch). but for timestamp queries, this can return messages with
timestamps earlier than requested.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]