This is an automated email from the ASF dual-hosted git repository.
gkoszyk pushed a commit to branch store_consumer_offset
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/store_consumer_offset by this
push:
new b76946421 fix tests
b76946421 is described below
commit b7694642160b48ad0e40a8c4cc2295e49dadfd24
Author: numinex <[email protected]>
AuthorDate: Thu Apr 16 18:42:05 2026 +0200
fix tests
---
core/partitions/src/iggy_partition.rs | 53 ++++++++-------
core/partitions/src/journal.rs | 5 ++
core/shard/src/lib.rs | 117 ++++++++++++++++++++++++++++++++++
3 files changed, 153 insertions(+), 22 deletions(-)
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
index aa790f781..5f8a97c18 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -659,7 +659,7 @@ where
let previous_commit = self.consensus.commit_max();
let current_op = {
let consensus = self.consensus();
- let current_op = match replicate_preflight(consensus, &header) {
+ match replicate_preflight(consensus, &header) {
Ok(current_op) => current_op,
Err(reason) => {
emit_partition_diag(
@@ -674,35 +674,44 @@ where
);
return;
}
- };
-
- if fence_old_prepare_by_commit(consensus, &header) {
- emit_partition_diag(
- tracing::Level::WARN,
- &PartitionDiagEvent::new(
- ReplicaLogContext::from_consensus(consensus,
PlaneKind::Partitions),
- "received old prepare, skipping replication",
- )
- .with_operation(header.operation)
- .with_op(header.op),
- );
- return;
}
-
- current_op
+ };
+ #[allow(clippy::cast_possible_truncation)]
+ let is_old_prepare = {
+ let consensus = self.consensus();
+ fence_old_prepare_by_commit(consensus, &header)
+ || self.log.journal().inner.header_by_op(header.op).is_some()
+ };
+ let message = if is_old_prepare {
+ emit_partition_diag(
+ tracing::Level::WARN,
+ &PartitionDiagEvent::new(
+ self.diag_ctx(),
+ "received old prepare, skipping replication",
+ )
+ .with_operation(header.operation)
+ .with_op(header.op),
+ );
+ message
+ } else {
+ let consensus = self.consensus();
+ replicate_to_next_in_chain(consensus, message).await
};
- debug_assert_eq!(header.op, current_op + 1);
+ if header.op != current_op + 1 {
+ emit_partition_diag(
+ tracing::Level::WARN,
+ &PartitionDiagEvent::new(self.diag_ctx(), "dropping
out-of-order prepare (gap)")
+ .with_operation(header.operation)
+ .with_op(header.op),
+ );
+ return;
+ }
{
let consensus = self.consensus();
consensus.sequencer().set_sequence(header.op);
consensus.set_last_prepare_checksum(header.checksum);
}
-
- let message = {
- let consensus = self.consensus();
- replicate_to_next_in_chain(consensus, message).await
- };
let replicated_result = self.apply_replicated_operation(message).await;
let commit = self.consensus.commit_max();
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index 8c45d3630..bfa713d34 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -229,6 +229,11 @@ where
}
}
+ pub fn header_by_op(&self, op: u64) -> Option<PrepareHeader> {
+ let headers = unsafe { &*self.headers.get() };
+ headers.iter().find(|header| header.op == op).copied()
+ }
+
#[allow(dead_code)]
fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> {
match query {
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index f4dc1aaca..47bd5a9b8 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -454,6 +454,7 @@ where
let actions =
consensus.handle_start_view_change(PlaneKind::Partitions, &header);
dispatch_vsr_actions::<B, _, MJ>(consensus, None, &actions).await;
+ dispatch_partition_journal_actions(consensus, partition,
&actions).await;
return;
}
@@ -512,6 +513,7 @@ where
let actions =
consensus.handle_do_view_change(PlaneKind::Partitions, &header);
dispatch_vsr_actions::<B, _, MJ>(consensus, None, &actions).await;
+ dispatch_partition_journal_actions(consensus, partition,
&actions).await;
if actions
.iter()
.any(|action| matches!(action, VsrAction::CommitJournal))
@@ -564,6 +566,7 @@ where
let actions = consensus.handle_start_view(PlaneKind::Partitions,
&header);
dispatch_vsr_actions::<B, _, MJ>(consensus, None, &actions).await;
+ dispatch_partition_journal_actions(consensus, partition,
&actions).await;
return;
}
@@ -655,6 +658,7 @@ where
let current_commit = consensus.commit_min();
let actions = consensus.tick(PlaneKind::Partitions, current_op,
current_commit);
dispatch_vsr_actions::<B, _, MJ>(consensus, None, &actions).await;
+ dispatch_partition_journal_actions(consensus, partition,
&actions).await;
}
}
@@ -899,3 +903,116 @@ async fn dispatch_vsr_actions<B, P, J>(
}
}
}
+
+#[allow(
+ clippy::future_not_send,
+ clippy::too_many_lines,
+ clippy::cast_possible_truncation
+)]
+async fn dispatch_partition_journal_actions<B, P>(
+ consensus: &VsrConsensus<B, P>,
+ partition: &IggyPartition<B>,
+ actions: &[VsrAction],
+) where
+ B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+ P: Pipeline<Entry = consensus::PipelineEntry>,
+{
+ use std::mem::size_of;
+
+ let bus = consensus.message_bus();
+ let self_id = consensus.replica();
+ let cluster = consensus.cluster();
+ let journal = &partition.log.journal().inner;
+
+ let send = |target: u8, msg: Message<GenericHeader>| async move {
+ if let Err(e) = bus.send_to_replica(target, msg).await {
+ tracing::debug!(replica = self_id, target, "bus send failed: {e}");
+ }
+ };
+
+ for action in actions {
+ match action {
+ VsrAction::SendPrepareOk {
+ view,
+ from_op,
+ to_op,
+ target,
+ namespace,
+ } => {
+ for op in *from_op..=*to_op {
+ let Some(prepare_header) = journal.header_by_op(op) else {
+ continue;
+ };
+ let msg =
Message::<PrepareOkHeader>::new(size_of::<PrepareOkHeader>())
+ .transmute_header(|_, h: &mut PrepareOkHeader| {
+ h.command = Command2::PrepareOk;
+ h.cluster = cluster;
+ h.replica = self_id;
+ h.view = *view;
+ h.op = op;
+ h.commit = consensus.commit_max();
+ h.timestamp = prepare_header.timestamp;
+ h.parent = prepare_header.parent;
+ h.prepare_checksum = prepare_header.checksum;
+ h.request = prepare_header.request;
+ h.operation = prepare_header.operation;
+ h.namespace = *namespace;
+ h.size = size_of::<PrepareOkHeader>() as u32;
+ });
+ send(*target, msg.into_generic()).await;
+ }
+ }
+ VsrAction::RetransmitPrepares { targets } => {
+ for (header, replicas) in targets {
+ let Some(prepare) = journal.entry(header).await else {
+ continue;
+ };
+ let prepare = Message::<PrepareHeader>::try_from(
+
iggy_binary_protocol::consensus::iobuf::Owned::<4096>::copy_from_slice(
+ prepare.as_slice(),
+ ),
+ )
+ .expect("partition journal entry must contain valid
prepare");
+ for replica in replicas {
+ send(*replica,
prepare.deep_copy().into_generic()).await;
+ }
+ }
+ }
+ VsrAction::RebuildPipeline { from_op, to_op } => {
+ let mut gap_at = None;
+ let entries: Vec<_> = (*from_op..=*to_op)
+ .map_while(|op| {
+ let Some(header) = journal.header_by_op(op) else {
+ gap_at = Some(op);
+ return None;
+ };
+ let mut entry = consensus::PipelineEntry::new(header);
+ entry.add_ack(self_id);
+ Some(entry)
+ })
+ .collect();
+ if let Some(missing_op) = gap_at {
+ let rebuilt_up_to = missing_op.saturating_sub(1);
+ tracing::warn!(
+ replica = self_id,
+ missing_op,
+ range_start = from_op,
+ range_end = to_op,
+ rebuilt = entries.len(),
+ "RebuildPipeline: journal gap at op {missing_op}, \
+ truncating sequencer from {to_op} to {rebuilt_up_to} \
+ ({}/{} ops rebuilt)",
+ entries.len(),
+ to_op - from_op + 1,
+ );
+ consensus.sequencer().set_sequence(rebuilt_up_to);
+ }
+ let mut pipeline = consensus.pipeline().borrow_mut();
+ for entry in entries {
+ pipeline.push(entry);
+ }
+ }
+ _ => {}
+ }
+ }
+}