hubcio commented on code in PR #3020:
URL: https://github.com/apache/iggy/pull/3020#discussion_r3000777488
##########
core/partitions/src/iggy_partitions.rs:
##########
@@ -656,136 +703,276 @@ where
let journal_info = journal.info;
if journal_info.messages_count == 0 {
- return;
- }
-
- // 1. Update segment metadata from journal state.
- // Note: segment.current_position is already updated in append_batch
(prepare phase).
- let segment_index = partition.log.segments().len() - 1;
- let segment = &mut partition.log.segments_mut()[segment_index];
-
- if segment.end_offset == 0 && journal_info.first_timestamp != 0 {
- segment.start_timestamp = journal_info.first_timestamp;
+ return Ok(());
}
- segment.end_timestamp = journal_info.end_timestamp;
- segment.end_offset = journal_info.current_offset;
-
- // 2. Update stats.
- partition
- .stats
- .increment_size_bytes(journal_info.size.as_bytes_u64());
- partition
- .stats
- .increment_messages_count(u64::from(journal_info.messages_count));
- // 3. Check flush thresholds.
- let is_full = segment.is_full();
+ // 1. Check flush thresholds.
+ let is_full = partition.log.active_segment().is_full();
let unsaved_messages_count_exceeded =
journal_info.messages_count >=
self.config.messages_required_to_save;
let unsaved_messages_size_exceeded = journal_info.size.as_bytes_u64()
>= self.config.size_of_messages_required_to_save.as_bytes_u64();
+ let should_persist = force_persist
+ || is_full
+ || unsaved_messages_count_exceeded
+ || unsaved_messages_size_exceeded;
+
+ if !should_persist {
+ return Ok(());
+ }
- if is_full || unsaved_messages_count_exceeded ||
unsaved_messages_size_exceeded {
+ if is_full
Review Comment:
also at lines 726-728.
lines 716-719 compute `should_persist` and return early at 721-723 when
false. this `if` block re-checks the exact same four conditions - it's always
true at this point. looks like a refactoring artifact from when the early
return was added. the inner `if` can be removed (keep its body).
##########
core/partitions/src/journal.rs:
##########
@@ -383,44 +406,116 @@ where
offset_to_op.insert(offset, op);
let timestamp_to_op = unsafe { &mut *self.timestamp_to_op.get() };
- timestamp_to_op.insert(timestamp, op);
+ timestamp_to_op.insert((timestamp, op), op);
}
Ok(())
}
async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
- self.message_by_op(header.op).await
+ self.bytes_by_op(header.op).await
}
}
-impl<S> QueryableJournal<S> for PartitionJournal<S>
-where
- S: Storage<Buffer = Bytes>,
-{
+impl QueryableJournal<PartitionJournalMemStorage> for
PartitionJournal<PartitionJournalMemStorage> {
type Query = MessageLookup;
- async fn get(&self, query: &Self::Query) -> Option<IggyMessagesBatchSet> {
+ async fn get(&self, query: &Self::Query) -> Option<PollQueryResult<4096>> {
let query = *query;
let start_op = self.candidate_start_op(&query)?;
- let count = match query {
- MessageLookup::Offset { count, .. } | MessageLookup::Timestamp {
count, .. } => count,
- };
+ let result = self.load_polled_batches_from_storage(start_op,
query).await;
- let messages = self.load_messages_from_storage(start_op, count).await;
+ if result.0.is_empty() {
+ None
+ } else {
+ Some(result)
+ }
+ }
+}
+
+fn select_batch_slice(
+ batch: &SendMessages2Ref<'_>,
+ query: MessageLookup,
+ already_matched: u32,
+) -> Option<SelectedBatchSlice> {
+ let remaining = query.count().saturating_sub(already_matched);
+ let batch_message_count = batch.message_count();
+ if remaining == 0 || batch_message_count == 0 {
+ return None;
+ }
- let batch_set = Self::messages_to_batch_set(&messages);
- let result = match query {
- MessageLookup::Offset { offset, count } =>
batch_set.get_by_offset(offset, count),
- MessageLookup::Timestamp { timestamp, count } => {
- batch_set.get_by_timestamp(timestamp, count)
+ let mut start = None;
+ let mut end = 0usize;
+ let mut matched = 0u32;
+ let mut last_matching_offset = None;
+
+ for record in batch.iter_with_offsets() {
+ let offset = batch.header.base_offset +
u64::from(record.message.header.offset_delta);
+
+ let selected = match query {
+ MessageLookup::Offset {
+ offset: query_offset,
+ ..
+ } => offset >= query_offset,
+ MessageLookup::Timestamp { timestamp, .. } => {
+ batch.header.origin_timestamp +
u64::from(record.message.header.timestamp_delta)
+ >= timestamp
}
};
+ if !selected {
+ continue;
+ }
- if result.is_empty() {
- None
- } else {
- Some(result)
+ start.get_or_insert(record.start);
+ end = record.end;
+ matched += 1;
+ last_matching_offset = Some(offset);
+
+ if matched == remaining {
+ break;
}
}
+
+ Some(SelectedBatchSlice {
+ start: start?,
+ end,
+ matched_messages: matched,
+ last_matching_offset: last_matching_offset?,
+ })
+}
+
+fn push_selected_batch_fragments(
+ fragments: &mut PollFragments<4096>,
+ last_matching_offset: &mut Option<u64>,
+ matched_messages: &mut u32,
+ prepare: &Frozen<4096>,
+ prepare_header: &PrepareHeader,
+ batch: &SendMessages2Ref<'_>,
+ selection: SelectedBatchSlice,
+) {
+ let prepare_header_size = std::mem::size_of::<PrepareHeader>();
+ let prepare_size = prepare_header.size as usize;
+ let full_body_selected = selection.start == 0 && selection.end ==
batch.blob().len();
+
+ if full_body_selected {
+ fragments.push(Fragment::slice(
+ prepare.clone(),
+ prepare_header_size,
+ prepare_size,
+ ));
+ } else {
+ let mut rewritten = batch.header;
Review Comment:
also at lines 508-511.
when a partial batch slice is returned (the `else` branch), `rewritten`
copies the full `SendMessages2Header` including `batch_checksum` and
`message_count`, then only `batch_length` is overwritten. the serialized
partial batch header retains the original full-batch checksum and count, which
are incorrect for the sliced data. any consumer that validates checksums on
poll responses will reject these fragments.
fix: set `rewritten.message_count = selection.matched_messages` and
recompute `batch_checksum` over the new header + sliced blob.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]