This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch store_consumer_offset
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/store_consumer_offset by this 
push:
     new b76946421 fix tests
b76946421 is described below

commit b7694642160b48ad0e40a8c4cc2295e49dadfd24
Author: numinex <[email protected]>
AuthorDate: Thu Apr 16 18:42:05 2026 +0200

    fix tests
---
 core/partitions/src/iggy_partition.rs |  53 ++++++++-------
 core/partitions/src/journal.rs        |   5 ++
 core/shard/src/lib.rs                 | 117 ++++++++++++++++++++++++++++++++++
 3 files changed, 153 insertions(+), 22 deletions(-)

diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index aa790f781..5f8a97c18 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -659,7 +659,7 @@ where
         let previous_commit = self.consensus.commit_max();
         let current_op = {
             let consensus = self.consensus();
-            let current_op = match replicate_preflight(consensus, &header) {
+            match replicate_preflight(consensus, &header) {
                 Ok(current_op) => current_op,
                 Err(reason) => {
                     emit_partition_diag(
@@ -674,35 +674,44 @@ where
                     );
                     return;
                 }
-            };
-
-            if fence_old_prepare_by_commit(consensus, &header) {
-                emit_partition_diag(
-                    tracing::Level::WARN,
-                    &PartitionDiagEvent::new(
-                        ReplicaLogContext::from_consensus(consensus, 
PlaneKind::Partitions),
-                        "received old prepare, skipping replication",
-                    )
-                    .with_operation(header.operation)
-                    .with_op(header.op),
-                );
-                return;
             }
-
-            current_op
+        };
+        #[allow(clippy::cast_possible_truncation)]
+        let is_old_prepare = {
+            let consensus = self.consensus();
+            fence_old_prepare_by_commit(consensus, &header)
+                || self.log.journal().inner.header_by_op(header.op).is_some()
+        };
+        let message = if is_old_prepare {
+            emit_partition_diag(
+                tracing::Level::WARN,
+                &PartitionDiagEvent::new(
+                    self.diag_ctx(),
+                    "received old prepare, skipping replication",
+                )
+                .with_operation(header.operation)
+                .with_op(header.op),
+            );
+            message
+        } else {
+            let consensus = self.consensus();
+            replicate_to_next_in_chain(consensus, message).await
         };
 
-        debug_assert_eq!(header.op, current_op + 1);
+        if header.op != current_op + 1 {
+            emit_partition_diag(
+                tracing::Level::WARN,
+                &PartitionDiagEvent::new(self.diag_ctx(), "dropping 
out-of-order prepare (gap)")
+                    .with_operation(header.operation)
+                    .with_op(header.op),
+            );
+            return;
+        }
         {
             let consensus = self.consensus();
             consensus.sequencer().set_sequence(header.op);
             consensus.set_last_prepare_checksum(header.checksum);
         }
-
-        let message = {
-            let consensus = self.consensus();
-            replicate_to_next_in_chain(consensus, message).await
-        };
         let replicated_result = self.apply_replicated_operation(message).await;
 
         let commit = self.consensus.commit_max();
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index 8c45d3630..bfa713d34 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -229,6 +229,11 @@ where
         }
     }
 
+    pub fn header_by_op(&self, op: u64) -> Option<PrepareHeader> {
+        let headers = unsafe { &*self.headers.get() };
+        headers.iter().find(|header| header.op == op).copied()
+    }
+
     #[allow(dead_code)]
     fn candidate_start_op(&self, query: &MessageLookup) -> Option<u64> {
         match query {
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index f4dc1aaca..47bd5a9b8 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -454,6 +454,7 @@ where
 
             let actions = 
consensus.handle_start_view_change(PlaneKind::Partitions, &header);
             dispatch_vsr_actions::<B, _, MJ>(consensus, None, &actions).await;
+            dispatch_partition_journal_actions(consensus, partition, 
&actions).await;
             return;
         }
 
@@ -512,6 +513,7 @@ where
 
             let actions = 
consensus.handle_do_view_change(PlaneKind::Partitions, &header);
             dispatch_vsr_actions::<B, _, MJ>(consensus, None, &actions).await;
+            dispatch_partition_journal_actions(consensus, partition, 
&actions).await;
             if actions
                 .iter()
                 .any(|action| matches!(action, VsrAction::CommitJournal))
@@ -564,6 +566,7 @@ where
 
             let actions = consensus.handle_start_view(PlaneKind::Partitions, 
&header);
             dispatch_vsr_actions::<B, _, MJ>(consensus, None, &actions).await;
+            dispatch_partition_journal_actions(consensus, partition, 
&actions).await;
             return;
         }
 
@@ -655,6 +658,7 @@ where
             let current_commit = consensus.commit_min();
             let actions = consensus.tick(PlaneKind::Partitions, current_op, 
current_commit);
             dispatch_vsr_actions::<B, _, MJ>(consensus, None, &actions).await;
+            dispatch_partition_journal_actions(consensus, partition, 
&actions).await;
         }
     }
 
@@ -899,3 +903,116 @@ async fn dispatch_vsr_actions<B, P, J>(
         }
     }
 }
+
+#[allow(
+    clippy::future_not_send,
+    clippy::too_many_lines,
+    clippy::cast_possible_truncation
+)]
+async fn dispatch_partition_journal_actions<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    partition: &IggyPartition<B>,
+    actions: &[VsrAction],
+) where
+    B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
+    P: Pipeline<Entry = consensus::PipelineEntry>,
+{
+    use std::mem::size_of;
+
+    let bus = consensus.message_bus();
+    let self_id = consensus.replica();
+    let cluster = consensus.cluster();
+    let journal = &partition.log.journal().inner;
+
+    let send = |target: u8, msg: Message<GenericHeader>| async move {
+        if let Err(e) = bus.send_to_replica(target, msg).await {
+            tracing::debug!(replica = self_id, target, "bus send failed: {e}");
+        }
+    };
+
+    for action in actions {
+        match action {
+            VsrAction::SendPrepareOk {
+                view,
+                from_op,
+                to_op,
+                target,
+                namespace,
+            } => {
+                for op in *from_op..=*to_op {
+                    let Some(prepare_header) = journal.header_by_op(op) else {
+                        continue;
+                    };
+                    let msg = 
Message::<PrepareOkHeader>::new(size_of::<PrepareOkHeader>())
+                        .transmute_header(|_, h: &mut PrepareOkHeader| {
+                            h.command = Command2::PrepareOk;
+                            h.cluster = cluster;
+                            h.replica = self_id;
+                            h.view = *view;
+                            h.op = op;
+                            h.commit = consensus.commit_max();
+                            h.timestamp = prepare_header.timestamp;
+                            h.parent = prepare_header.parent;
+                            h.prepare_checksum = prepare_header.checksum;
+                            h.request = prepare_header.request;
+                            h.operation = prepare_header.operation;
+                            h.namespace = *namespace;
+                            h.size = size_of::<PrepareOkHeader>() as u32;
+                        });
+                    send(*target, msg.into_generic()).await;
+                }
+            }
+            VsrAction::RetransmitPrepares { targets } => {
+                for (header, replicas) in targets {
+                    let Some(prepare) = journal.entry(header).await else {
+                        continue;
+                    };
+                    let prepare = Message::<PrepareHeader>::try_from(
+                        
iggy_binary_protocol::consensus::iobuf::Owned::<4096>::copy_from_slice(
+                            prepare.as_slice(),
+                        ),
+                    )
+                    .expect("partition journal entry must contain valid 
prepare");
+                    for replica in replicas {
+                        send(*replica, 
prepare.deep_copy().into_generic()).await;
+                    }
+                }
+            }
+            VsrAction::RebuildPipeline { from_op, to_op } => {
+                let mut gap_at = None;
+                let entries: Vec<_> = (*from_op..=*to_op)
+                    .map_while(|op| {
+                        let Some(header) = journal.header_by_op(op) else {
+                            gap_at = Some(op);
+                            return None;
+                        };
+                        let mut entry = consensus::PipelineEntry::new(header);
+                        entry.add_ack(self_id);
+                        Some(entry)
+                    })
+                    .collect();
+                if let Some(missing_op) = gap_at {
+                    let rebuilt_up_to = missing_op.saturating_sub(1);
+                    tracing::warn!(
+                        replica = self_id,
+                        missing_op,
+                        range_start = from_op,
+                        range_end = to_op,
+                        rebuilt = entries.len(),
+                        "RebuildPipeline: journal gap at op {missing_op}, \
+                         truncating sequencer from {to_op} to {rebuilt_up_to} \
+                         ({}/{} ops rebuilt)",
+                        entries.len(),
+                        to_op - from_op + 1,
+                    );
+                    consensus.sequencer().set_sequence(rebuilt_up_to);
+                }
+                let mut pipeline = consensus.pipeline().borrow_mut();
+                for entry in entries {
+                    pipeline.push(entry);
+                }
+            }
+            _ => {}
+        }
+    }
+}

Reply via email to