This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 298cd3282 fix(server): prevent consumer offset skip during concurrent
produce+consume (#2958)
298cd3282 is described below
commit 298cd3282b617bef8cfb266fd5463d7b186868a3
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 23 14:09:07 2026 +0100
fix(server): prevent consumer offset skip during concurrent produce+consume
(#2958)
---
.../common/src/types/message/messages_batch_mut.rs | 2 +-
.../common/src/types/message/messages_batch_set.rs | 18 +-
core/common/src/types/segment.rs | 29 +-
core/integration/tests/server/message_cleanup.rs | 7 +
.../concurrent_produce_consume_scenario.rs | 196 +++++++++++
.../server/scenarios/message_cleanup_scenario.rs | 140 ++++++++
core/integration/tests/server/scenarios/mod.rs | 2 +
.../scenarios/reconnect_after_restart_scenario.rs | 300 ++++++++++++++++-
.../scenarios/restart_offset_skip_scenario.rs | 272 ++++++++++++++++
core/integration/tests/server/specific.rs | 39 ++-
core/server/src/bootstrap.rs | 4 +-
core/server/src/shard/mod.rs | 82 ++++-
core/server/src/shard/system/messages.rs | 2 +-
core/server/src/shard/system/partitions.rs | 74 ++++-
core/server/src/shard/system/segments.rs | 124 +++++--
core/server/src/streaming/partitions/journal.rs | 93 +++++-
.../src/streaming/partitions/local_partition.rs | 4 +-
core/server/src/streaming/partitions/log.rs | 12 +
core/server/src/streaming/partitions/mod.rs | 2 +
core/server/src/streaming/partitions/ops.rs | 297 +++++++++--------
core/server/src/streaming/partitions/ops_tests.rs | 359 +++++++++++++++++++++
21 files changed, 1856 insertions(+), 202 deletions(-)
diff --git a/core/common/src/types/message/messages_batch_mut.rs
b/core/common/src/types/message/messages_batch_mut.rs
index 231ba00a2..320f2f1ea 100644
--- a/core/common/src/types/message/messages_batch_mut.rs
+++ b/core/common/src/types/message/messages_batch_mut.rs
@@ -324,7 +324,7 @@ impl IggyMessagesBatchMut {
let first_offset = self.first_offset()?;
if start_offset < first_offset {
- return self.slice_by_index(0, count);
+ return None;
}
let last_offset = self.last_offset()?;
diff --git a/core/common/src/types/message/messages_batch_set.rs
b/core/common/src/types/message/messages_batch_set.rs
index 8dd47043e..41183ae83 100644
--- a/core/common/src/types/message/messages_batch_set.rs
+++ b/core/common/src/types/message/messages_batch_set.rs
@@ -237,22 +237,30 @@ impl IggyMessagesBatchSet {
let mut result = Self::with_capacity(self.containers_count());
let mut remaining_count = count;
+ let mut current_offset = start_offset;
for container in self.iter() {
if remaining_count == 0 {
break;
}
- let first_offset = container.first_offset();
- if first_offset.is_none()
- || first_offset.unwrap() + container.count() as u64 <=
start_offset
- {
+ let Some(batch_first) = container.first_offset() else {
+ continue;
+ };
+ if batch_first + container.count() as u64 <= current_offset {
continue;
}
- if let Some(sliced) = container.slice_by_offset(start_offset,
remaining_count)
+ // When current_offset is below this batch's range (cross-batch
+ // reads), start from the batch's first offset instead.
+ let effective_start = current_offset.max(batch_first);
+
+ if let Some(sliced) = container.slice_by_offset(effective_start,
remaining_count)
&& sliced.count() > 0
{
+ if let Some(last) = sliced.last_offset() {
+ current_offset = last + 1;
+ }
remaining_count -= sliced.count();
result.add_batch(sliced);
}
diff --git a/core/common/src/types/segment.rs b/core/common/src/types/segment.rs
index 13f650f5e..23abe4500 100644
--- a/core/common/src/types/segment.rs
+++ b/core/common/src/types/segment.rs
@@ -66,7 +66,7 @@ impl Segment {
}
pub fn is_expired(&self, now: IggyTimestamp, expiry: IggyExpiry) -> bool {
- if !self.sealed {
+ if !self.sealed || self.end_timestamp == 0 {
return false;
}
@@ -79,3 +79,30 @@ impl Segment {
}
}
}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use crate::{IggyDuration, IggyTimestamp};
+ use std::time::Duration;
+
+ #[test]
+ fn zero_timestamp_segment_should_not_appear_expired() {
+ // Reproduce Bug 3 from #2924: during bootstrap, segments with empty
+ // indexes get end_timestamp = 0. is_expired() then evaluates
+ // 0 + expiry <= now, which is always true, causing immediate deletion.
+ let mut seg = Segment::new(5000, IggyByteSize::from(128 * 1024 *
1024u64));
+ seg.sealed = true;
+ seg.end_timestamp = 0; // simulates bootstrap with empty indexes
+
+ let now = IggyTimestamp::now();
+ let expiry =
IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_secs(600)));
+
+ // A segment with unknown timestamp (0) must NOT be considered expired.
+ // Currently this FAILS - is_expired returns true because 0 + 600s <=
now.
+ assert!(
+ !seg.is_expired(now, expiry),
+ "BUG: segment with end_timestamp=0 appears instantly expired"
+ );
+ }
+}
diff --git a/core/integration/tests/server/message_cleanup.rs
b/core/integration/tests/server/message_cleanup.rs
index c61855d83..3af365fd7 100644
--- a/core/integration/tests/server/message_cleanup.rs
+++ b/core/integration/tests/server/message_cleanup.rs
@@ -73,6 +73,12 @@ fn fair_size_cleanup_multipartition() -> CleanupScenarioFn {
}
}
+fn expiry_respects_consumer_offset() -> CleanupScenarioFn {
+ |client, path| {
+
Box::pin(message_cleanup_scenario::run_expiry_respects_consumer_offset(client,
path))
+ }
+}
+
async fn run_cleanup_scenario(scenario: CleanupScenarioFn) {
let mut harness = TestHarness::builder()
.server(
@@ -116,6 +122,7 @@ async fn run_cleanup_scenario(scenario: CleanupScenarioFn) {
combined_retention(),
expiry_multipartition(),
fair_size_cleanup_multipartition(),
+ expiry_respects_consumer_offset(),
])]
#[tokio::test]
#[parallel]
diff --git
a/core/integration/tests/server/scenarios/concurrent_produce_consume_scenario.rs
b/core/integration/tests/server/scenarios/concurrent_produce_consume_scenario.rs
new file mode 100644
index 000000000..3bffb8749
--- /dev/null
+++
b/core/integration/tests/server/scenarios/concurrent_produce_consume_scenario.rs
@@ -0,0 +1,196 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Regression test for issue #2715: consumer offset skip during concurrent
produce+consume.
+//!
+//! Root cause ("State C"):
+//! 1. Journal commit() moves data to in-flight buffer (async persist begins)
+//! 2. New send arrives before persist completes - journal is non-empty again
+//! 3. Consumer polls with Next + auto_commit:
+//! Old code only checked in-flight when journal was empty (Case 0).
+//! With journal non-empty (Case 1-3), in-flight was invisible.
+//! auto_commit stored the journal offset, permanently skipping the
in-flight range.
+//!
+//! Trigger: `messages_required_to_save = "1"` forces an inline journal commit
on every
+//! batch. With concurrent sends, the next batch arrives while the previous is
persisting,
+//! reliably producing State C.
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use integration::iggy_harness;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::{Duration, Instant};
+
+const STREAM_NAME: &str = "issue-2715-stream";
+const TOPIC_NAME: &str = "issue-2715-topic";
+const TOTAL_MESSAGES: u64 = 500;
+const PRODUCER_BATCH_SIZE: u32 = 5;
+const CONSUMER_BATCH_SIZE: u32 = 10;
+const MAX_TEST_DURATION: Duration = Duration::from_secs(60);
+
+/// Regression test for issue #2715: consumer must not skip offsets during
+/// concurrent produce+consume when the journal commits while in-flight data
exists.
+///
+/// `messages_required_to_save = "1"` forces every batch to trigger an inline
+/// journal commit. The next send arrives before async persist completes,
creating
+/// State C (journal non-empty + in-flight non-empty simultaneously).
+#[iggy_harness(server(
+ partition.messages_required_to_save = "1",
+ partition.enforce_fsync = false,
+ message_saver.enabled = true,
+ message_saver.interval = "100ms"
+))]
+async fn concurrent_produce_consume_no_offset_skip(harness: &TestHarness) {
+ let stream_id = Identifier::named(STREAM_NAME).unwrap();
+ let topic_id = Identifier::named(TOPIC_NAME).unwrap();
+
+ let setup = harness.tcp_root_client().await.unwrap();
+ setup.create_stream(STREAM_NAME).await.unwrap();
+ setup
+ .create_topic(
+ &stream_id,
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+ drop(setup);
+
+ let producer_client = harness.tcp_root_client().await.unwrap();
+ let consumer_client = harness.tcp_root_client().await.unwrap();
+
+ let producer_done = Arc::new(AtomicBool::new(false));
+ let producer_done_clone = producer_done.clone();
+
+ // Consumer: polls with Next + auto_commit concurrently with the producer.
+ // Checks that received offsets are strictly contiguous (no gaps).
+ let consumer_task = tokio::spawn(async move {
+ let stream = Identifier::named(STREAM_NAME).unwrap();
+ let topic = Identifier::named(TOPIC_NAME).unwrap();
+ let consumer = Consumer::default();
+ let mut last_offset: Option<u64> = None;
+ let mut total_received = 0u64;
+ let mut consecutive_empty = 0u32;
+ let deadline = Instant::now() + MAX_TEST_DURATION;
+
+ loop {
+ if Instant::now() >= deadline {
+ panic!(
+ "Consumer timed out after {MAX_TEST_DURATION:?}. \
+ Received {total_received}/{TOTAL_MESSAGES}, \
+ last_offset: {last_offset:?}"
+ );
+ }
+
+ let result = consumer_client
+ .poll_messages(
+ &stream,
+ &topic,
+ Some(0),
+ &consumer,
+ &PollingStrategy::next(),
+ CONSUMER_BATCH_SIZE,
+ true, // auto_commit - this is what makes the skip
permanent
+ )
+ .await;
+
+ match result {
+ Ok(polled) if polled.messages.is_empty() => {
+ if producer_done_clone.load(Ordering::Relaxed) {
+ consecutive_empty += 1;
+ if consecutive_empty >= 20 {
+ break;
+ }
+ tokio::time::sleep(Duration::from_millis(50)).await;
+ } else {
+ tokio::time::sleep(Duration::from_millis(5)).await;
+ }
+ }
+ Ok(polled) => {
+ consecutive_empty = 0;
+ for msg in &polled.messages {
+ let offset = msg.header.offset;
+ if let Some(last) = last_offset {
+ assert_eq!(
+ offset,
+ last + 1,
+ "Offset gap! Expected {}, got {} (skipped {}
messages). \
+ Issue #2715: in-flight buffer invisible when
journal non-empty.",
+ last + 1,
+ offset,
+ offset.saturating_sub(last + 1),
+ );
+ }
+ last_offset = Some(offset);
+ total_received += 1;
+ }
+ }
+ Err(e) => {
+ eprintln!("Consumer poll error (transient under load):
{e:?}");
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ }
+ }
+ }
+
+ total_received
+ });
+
+ // Producer: sends TOTAL_MESSAGES in a tight loop to maximize State C
window.
+ let mut sent = 0u64;
+ while sent < TOTAL_MESSAGES {
+ let batch_count = PRODUCER_BATCH_SIZE.min((TOTAL_MESSAGES - sent) as
u32);
+ let mut messages: Vec<IggyMessage> = (0..batch_count)
+ .map(|i| {
+ IggyMessage::builder()
+ .payload(Bytes::from(format!("msg-{}", sent + i as u64)))
+ .build()
+ .unwrap()
+ })
+ .collect();
+
+ producer_client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .unwrap_or_else(|e| panic!("Producer send failed at sent={sent}:
{e}"));
+
+ sent += batch_count as u64;
+ }
+
+ producer_done.store(true, Ordering::Relaxed);
+
+ let total_received = consumer_task.await.unwrap();
+ assert_eq!(
+ total_received,
+ TOTAL_MESSAGES,
+ "Consumer received {total_received}/{TOTAL_MESSAGES} - \
+ missing {} messages (issue #2715 offset skip regression).",
+ TOTAL_MESSAGES - total_received,
+ );
+
+ producer_client.delete_stream(&stream_id).await.unwrap();
+}
diff --git
a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
index 066953153..2292f353a 100644
--- a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
+++ b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
@@ -563,6 +563,146 @@ pub async fn
run_fair_size_based_cleanup_multipartition(client: &IggyClient, dat
.unwrap();
}
+/// Reproduces Bug 2 from #2924: the time-based cleaner deletes expired
segments
+/// without checking whether consumers have read them. The size-based
+/// `delete_oldest_segments` checks `min_committed_offset` but the time-based
+/// `delete_expired_segments_for_partition` does not.
+///
+/// Scenario:
+/// 1. Send 300 messages (3 segments at 100KB each)
+/// 2. Consumer reads only 50 messages (stored offset ~49, within segment 0)
+/// 3. Wait for all segments to expire (2s expiry)
+/// 4. Verify consumer can still poll Next() and get contiguous offsets
+///
+/// On unfixed code: the cleaner deletes segments 0+1 (expired, consumer offset
+/// not checked), consumer's Next() jumps to segment 2, skipping ~250 messages.
+pub async fn run_expiry_respects_consumer_offset(client: &IggyClient,
data_path: &Path) {
+ const TEST_STREAM: &str = "test_cleaner_barrier_stream";
+ const TEST_TOPIC: &str = "test_cleaner_barrier_topic";
+
+ let stream = client.create_stream(TEST_STREAM).await.unwrap();
+ let stream_id = stream.id;
+
+ let expiry = Duration::from_secs(2);
+ let topic = client
+ .create_topic(
+ &Identifier::named(TEST_STREAM).unwrap(),
+ TEST_TOPIC,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::ExpireDuration(IggyDuration::from(expiry)),
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+ let topic_id = topic.id;
+
+ let partition_path = data_path
+ .join(format!(
+ "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+ ))
+ .display()
+ .to_string();
+
+ // Send 300 messages (1KB each) -> 3 sealed segments + active
+ let payload = make_payload('B');
+ let total_messages = 300u32;
+ for i in 0..total_messages {
+ let message = IggyMessage::builder()
+ .id(i as u128)
+ .payload(payload.clone())
+ .build()
+ .unwrap();
+ client
+ .send_messages(
+ &Identifier::named(TEST_STREAM).unwrap(),
+ &Identifier::named(TEST_TOPIC).unwrap(),
+ &Partitioning::partition_id(PARTITION_ID),
+ &mut [message],
+ )
+ .await
+ .unwrap();
+ }
+
+ let initial_segments = get_segment_paths_for_partition(&partition_path);
+ assert!(
+ initial_segments.len() >= 3,
+ "Need at least 3 segments, got {}",
+ initial_segments.len()
+ );
+
+ // Consumer reads only 50 messages with auto_commit, storing offset ~49.
+ // This means the consumer has NOT read segments 1, 2, etc.
+ let consumer = Consumer::new(Identifier::numeric(42).unwrap());
+ let mut consumed_offsets = Vec::new();
+ let mut remaining = 50u32;
+ while remaining > 0 {
+ let batch_size = remaining.min(10);
+ let polled = client
+ .poll_messages(
+ &Identifier::named(TEST_STREAM).unwrap(),
+ &Identifier::named(TEST_TOPIC).unwrap(),
+ Some(PARTITION_ID),
+ &consumer,
+ &PollingStrategy::next(),
+ batch_size,
+ true, // auto_commit
+ )
+ .await
+ .unwrap();
+ for msg in &polled.messages {
+ consumed_offsets.push(msg.header.offset);
+ }
+ remaining -= polled.messages.len() as u32;
+ }
+ let last_committed = *consumed_offsets.last().unwrap();
+ assert_eq!(
+ last_committed, 49,
+ "Consumer should have read through offset 49"
+ );
+
+ // Wait for expiry + cleaner buffer
+ tokio::time::sleep(expiry + CLEANER_BUFFER + CLEANER_BUFFER).await;
+
+ // Now poll Next() - consumer should continue from offset 50 without gaps.
+ // BUG: on unfixed code, the cleaner deleted the segment containing offsets
+ // 50-99 (expired, no consumer barrier check), so Next() jumps to offset
100+.
+ let polled = client
+ .poll_messages(
+ &Identifier::named(TEST_STREAM).unwrap(),
+ &Identifier::named(TEST_TOPIC).unwrap(),
+ Some(PARTITION_ID),
+ &consumer,
+ &PollingStrategy::next(),
+ 10,
+ false,
+ )
+ .await
+ .unwrap();
+
+ assert!(
+ !polled.messages.is_empty(),
+ "Consumer should still be able to poll messages after expiry"
+ );
+
+ let first_offset = polled.messages[0].header.offset;
+ assert_eq!(
+ first_offset,
+ last_committed + 1,
+ "BUG #2924: cleaner deleted unconsumed segments! \
+ Expected next offset {}, got {} (skipped {} messages)",
+ last_committed + 1,
+ first_offset,
+ first_offset - last_committed - 1,
+ );
+
+ client
+ .delete_stream(&Identifier::named(TEST_STREAM).unwrap())
+ .await
+ .unwrap();
+}
+
fn get_segment_paths_for_partition(partition_path: &str) -> Vec<DirEntry> {
read_dir(partition_path)
.map(|read_dir| {
diff --git a/core/integration/tests/server/scenarios/mod.rs
b/core/integration/tests/server/scenarios/mod.rs
index fb510234f..234572887 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -18,6 +18,7 @@
pub mod authentication_scenario;
pub mod bench_scenario;
+pub mod concurrent_produce_consume_scenario;
pub mod concurrent_scenario;
pub mod consumer_group_auto_commit_reconnection_scenario;
pub mod consumer_group_join_scenario;
@@ -39,6 +40,7 @@ pub mod permissions_scenario;
pub mod purge_delete_scenario;
pub mod read_during_persistence_scenario;
pub mod reconnect_after_restart_scenario;
+pub mod restart_offset_skip_scenario;
pub mod segment_rotation_race_scenario;
pub mod single_message_per_batch_scenario;
pub mod snapshot_scenario;
diff --git
a/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
b/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
index 2d2a967f7..b770c23f6 100644
---
a/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
+++
b/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
@@ -143,11 +143,19 @@ pub async fn run_consumer(harness: &mut TestHarness) {
.await
.expect("Failed to initialize consumer");
+ let pre_payloads = consume_messages_validated(&mut consumer, 3,
Duration::from_secs(10)).await;
assert_eq!(
- consume_messages(&mut consumer, 3, Duration::from_secs(10)).await,
+ pre_payloads.len(),
3,
"Should consume all pre-restart messages"
);
+ for (i, payload) in pre_payloads.iter().enumerate() {
+ assert_eq!(
+ payload,
+ &format!("pre-restart-{i}"),
+ "Pre-restart message {i} has wrong payload"
+ );
+ }
harness.server_mut().stop().expect("Failed to stop server");
sleep(Duration::from_secs(2)).await;
@@ -162,11 +170,19 @@ pub async fn run_consumer(harness: &mut TestHarness) {
.expect("Failed to create post-restart client");
send_messages(&post_client, "post-restart", 3).await;
+ let post_payloads = consume_messages_validated(&mut consumer, 3,
Duration::from_secs(30)).await;
assert_eq!(
- consume_messages(&mut consumer, 3, Duration::from_secs(30)).await,
+ post_payloads.len(),
3,
"Should consume all post-restart messages after reconnect"
);
+ for (i, payload) in post_payloads.iter().enumerate() {
+ assert_eq!(
+ payload,
+ &format!("post-restart-{i}"),
+ "Post-restart message {i} has wrong payload"
+ );
+ }
}
fn create_client(harness: &TestHarness) -> IggyClient {
@@ -209,18 +225,286 @@ async fn send_messages(client: &IggyClient, prefix:
&str, count: u32) {
}
}
-async fn consume_messages(consumer: &mut IggyConsumer, expected: u32,
max_wait: Duration) -> u32 {
- let mut consumed = 0u32;
+/// Consumes up to `expected` messages, returning their payloads in order.
+async fn consume_messages_validated(
+ consumer: &mut IggyConsumer,
+ expected: u32,
+ max_wait: Duration,
+) -> Vec<String> {
+ let mut payloads = Vec::with_capacity(expected as usize);
let deadline = tokio::time::Instant::now() + max_wait;
- while consumed < expected && tokio::time::Instant::now() < deadline {
+ while (payloads.len() as u32) < expected && tokio::time::Instant::now() <
deadline {
let poll_result: Option<Result<ReceivedMessage, IggyError>> =
tokio::time::timeout(Duration::from_millis(500), consumer.next())
.await
.ok()
.flatten();
- if let Some(Ok(_)) = poll_result {
- consumed += 1;
+ if let Some(Ok(msg)) = poll_result {
+ let payload =
String::from_utf8_lossy(&msg.message.payload).to_string();
+ payloads.push(payload);
}
}
- consumed
+ payloads
+}
+
+/// Regression test: a partition with exactly one message at offset 0 must not
+/// reassign offset 0 to the next message after server restart.
+///
+/// Root cause: `should_increment_offset = current_offset > 0` is false when
+/// current_offset == 0, but a segment with 1 message at offset 0 has
+/// end_offset = 0. The next append after restart reuses offset 0.
+pub async fn run_single_message_offset_zero_restart(harness: &mut TestHarness)
{
+ const STREAM: &str = "offset-zero-restart-stream";
+ const TOPIC: &str = "offset-zero-restart-topic";
+ const CONSUMER_ID: u32 = 77;
+
+ let client = harness
+ .root_client()
+ .await
+ .expect("Failed to create client");
+
+ client.create_stream(STREAM).await.unwrap();
+ client
+ .create_topic(
+ &Identifier::named(STREAM).unwrap(),
+ TOPIC,
+ 1,
+ Default::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+
+ // Send exactly 1 message (gets offset 0)
+ let mut msg = [IggyMessage::from_str("single-msg-0").unwrap()];
+ client
+ .send_messages(
+ &Identifier::named(STREAM).unwrap(),
+ &Identifier::named(TOPIC).unwrap(),
+ &Partitioning::partition_id(0),
+ &mut msg,
+ )
+ .await
+ .unwrap();
+
+ // Consumer polls to commit offset 0
+ let consumer = Consumer::new(Identifier::numeric(CONSUMER_ID).unwrap());
+ let polled = client
+ .poll_messages(
+ &Identifier::named(STREAM).unwrap(),
+ &Identifier::named(TOPIC).unwrap(),
+ Some(0),
+ &consumer,
+ &PollingStrategy::next(),
+ 10,
+ true,
+ )
+ .await
+ .unwrap();
+ assert_eq!(polled.messages.len(), 1, "Should get the single message");
+ assert_eq!(polled.messages[0].header.offset, 0);
+
+ // Wait for data to flush
+ sleep(Duration::from_secs(1)).await;
+ drop(client);
+
+ // Restart
+ harness.server_mut().stop().expect("Failed to stop server");
+ sleep(Duration::from_secs(2)).await;
+ harness
+ .server_mut()
+ .start()
+ .expect("Failed to start server");
+
+ let client = harness
+ .root_client()
+ .await
+ .expect("Failed to create post-restart client");
+
+ // Send a second message - it MUST get offset 1, not 0
+ let mut msg = [IggyMessage::from_str("single-msg-1").unwrap()];
+ client
+ .send_messages(
+ &Identifier::named(STREAM).unwrap(),
+ &Identifier::named(TOPIC).unwrap(),
+ &Partitioning::partition_id(0),
+ &mut msg,
+ )
+ .await
+ .unwrap();
+
+ // Consumer polls with Next - should get offset 1
+ let consumer = Consumer::new(Identifier::numeric(CONSUMER_ID).unwrap());
+ let polled = client
+ .poll_messages(
+ &Identifier::named(STREAM).unwrap(),
+ &Identifier::named(TOPIC).unwrap(),
+ Some(0),
+ &consumer,
+ &PollingStrategy::next(),
+ 10,
+ true,
+ )
+ .await
+ .unwrap();
+
+ assert!(
+ !polled.messages.is_empty(),
+ "BUG: consumer got empty result after restart - offset stuck"
+ );
+ assert_eq!(
+ polled.messages[0].header.offset, 1,
+ "BUG: second message got offset {} instead of 1. \
+ should_increment_offset was incorrectly false after restart \
+ with single message at offset 0",
+ polled.messages[0].header.offset,
+ );
+
+ let payload = String::from_utf8_lossy(&polled.messages[0].payload);
+ assert_eq!(payload, "single-msg-1");
+}
+
+/// Regression test: consumer offset persisted ahead of partition data (crash
+/// simulation). After restart the consumer must not be permanently stuck.
+///
+/// Simulates: OOM/kill-9 where consumer auto_commit wrote offset to disk but
+/// journal data was not flushed. On restart, consumer offset file says 999 but
+/// partition only has 10 messages (offsets 0-9).
+pub async fn run_consumer_offset_ahead_after_crash(harness: &mut TestHarness) {
+ const STREAM: &str = "offset-ahead-crash-stream";
+ const TOPIC: &str = "offset-ahead-crash-topic";
+ const CONSUMER_ID: u32 = 88;
+
+ let client = harness
+ .root_client()
+ .await
+ .expect("Failed to create client");
+
+ let stream = client.create_stream(STREAM).await.unwrap();
+ let stream_id = stream.id;
+
+ let topic = client
+ .create_topic(
+ &Identifier::named(STREAM).unwrap(),
+ TOPIC,
+ 1,
+ Default::default(),
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+ let topic_id = topic.id;
+
+ // Send 10 messages (offsets 0-9)
+ for i in 0..10u32 {
+ let mut msg =
[IggyMessage::from_str(&format!("crash-msg-{i}")).unwrap()];
+ client
+ .send_messages(
+ &Identifier::named(STREAM).unwrap(),
+ &Identifier::named(TOPIC).unwrap(),
+ &Partitioning::partition_id(0),
+ &mut msg,
+ )
+ .await
+ .unwrap();
+ }
+
+ // Consumer reads all 10 with auto_commit (stored offset = 9)
+ let consumer = Consumer::new(Identifier::numeric(CONSUMER_ID).unwrap());
+ let polled = client
+ .poll_messages(
+ &Identifier::named(STREAM).unwrap(),
+ &Identifier::named(TOPIC).unwrap(),
+ Some(0),
+ &consumer,
+ &PollingStrategy::next(),
+ 100,
+ true,
+ )
+ .await
+ .unwrap();
+ assert_eq!(polled.messages.len(), 10);
+
+ // Wait for flush
+ sleep(Duration::from_secs(1)).await;
+ drop(client);
+
+ // Stop server
+ harness.server_mut().stop().expect("Failed to stop server");
+ sleep(Duration::from_secs(1)).await;
+
+ // Tamper with consumer offset file to simulate crash scenario:
+ // consumer offset persisted ahead of actual data
+ let data_path = harness.server().data_path();
+ let offset_file = data_path.join(format!(
+
"streams/{stream_id}/topics/{topic_id}/partitions/0/offsets/consumers/{CONSUMER_ID}"
+ ));
+ assert!(
+ offset_file.exists(),
+ "Consumer offset file should exist at {}",
+ offset_file.display()
+ );
+ std::fs::write(&offset_file, 999_u64.to_le_bytes()).expect("Failed to
write offset file");
+
+ // Verify the tampered value
+ let bytes = std::fs::read(&offset_file).unwrap();
+ let stored = u64::from_le_bytes(bytes.try_into().unwrap());
+ assert_eq!(stored, 999, "Offset file should contain 999");
+
+ // Restart server
+ harness
+ .server_mut()
+ .start()
+ .expect("Failed to start server");
+
+ let client = harness
+ .root_client()
+ .await
+ .expect("Failed to create post-restart client");
+
+ // Send 5 more messages (should get offsets 10-14)
+ for i in 0..5u32 {
+ let mut msg =
[IggyMessage::from_str(&format!("post-crash-{i}")).unwrap()];
+ client
+ .send_messages(
+ &Identifier::named(STREAM).unwrap(),
+ &Identifier::named(TOPIC).unwrap(),
+ &Partitioning::partition_id(0),
+ &mut msg,
+ )
+ .await
+ .unwrap();
+ }
+
+ // Consumer polls with Next - must NOT be stuck at offset 1000 (999+1)
+ let consumer = Consumer::new(Identifier::numeric(CONSUMER_ID).unwrap());
+ let polled = client
+ .poll_messages(
+ &Identifier::named(STREAM).unwrap(),
+ &Identifier::named(TOPIC).unwrap(),
+ Some(0),
+ &consumer,
+ &PollingStrategy::next(),
+ 100,
+ true,
+ )
+ .await
+ .unwrap();
+
+ assert!(
+ !polled.messages.is_empty(),
+ "BUG: consumer is permanently stuck - offset 999 is ahead of partition
data. \
+ After crash recovery, consumer offsets must be clamped to partition
offset."
+ );
+
+ // Verify we got the new messages
+ let first_offset = polled.messages[0].header.offset;
+ assert!(
+ first_offset <= 14,
+ "BUG: consumer skipped to offset {first_offset}, expected messages in
range 10-14"
+ );
}
diff --git
a/core/integration/tests/server/scenarios/restart_offset_skip_scenario.rs
b/core/integration/tests/server/scenarios/restart_offset_skip_scenario.rs
new file mode 100644
index 000000000..b451a68bc
--- /dev/null
+++ b/core/integration/tests/server/scenarios/restart_offset_skip_scenario.rs
@@ -0,0 +1,272 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Regression test: consumer offset skip after server restart during
concurrent
+//! produce+consume.
+//!
+//! Root cause: after restart, `MemoryMessageJournal` defaults to
`base_offset=0`
+//! instead of `partition.current_offset + 1`. This causes the three-tier
lookup
+//! (`disk -> in-flight -> journal`) to skip disk reads, and `slice_by_offset`
+//! silently returns messages from wrong offsets. Combined with `auto_commit`,
+//! the skip is permanent.
+//!
+//! Reproduction (matches colleague's scenario):
+//! 1. Start server, create stream/topic
+//! 2. Send 100 messages
+//! 3. Restart server
+//! 4. Send more messages concurrently (every 10ms)
+//! 5. Consumer group polls with batch_size=10, Next strategy, auto_commit
+//! 6. Assert: no offset gaps (previously skipped 3-6 messages)
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use integration::harness::{TestBinary, TestHarness};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::{Duration, Instant};
+
+const STREAM_NAME: &str = "restart-skip-stream";
+const TOPIC_NAME: &str = "restart-skip-topic";
+const CONSUMER_GROUP_NAME: &str = "restart-skip-cg";
+// Enough pre-restart messages so the consumer is still reading disk data when
+// journal entries start appearing. With batch_size=10, the consumer needs many
+// poll cycles to drain these, creating the window for the bug.
+const PRE_RESTART_MESSAGES: u32 = 1000;
+const POST_RESTART_MESSAGES: u64 = 200;
+const PRODUCER_BATCH_SIZE: u32 = 5;
+const CONSUMER_BATCH_SIZE: u32 = 10;
+const MAX_TEST_DURATION: Duration = Duration::from_secs(120);
+
+pub async fn run(harness: &mut TestHarness) {
+ let stream_id = Identifier::named(STREAM_NAME).unwrap();
+ let topic_id = Identifier::named(TOPIC_NAME).unwrap();
+
+ // Step 1-2: Create stream/topic, send pre-restart messages
+ let setup_client = harness.tcp_root_client().await.unwrap();
+ setup_client.create_stream(STREAM_NAME).await.unwrap();
+ setup_client
+ .create_topic(
+ &stream_id,
+ TOPIC_NAME,
+ 1,
+ CompressionAlgorithm::None,
+ None,
+ IggyExpiry::NeverExpire,
+ MaxTopicSize::ServerDefault,
+ )
+ .await
+ .unwrap();
+
+ setup_client
+ .create_consumer_group(&stream_id, &topic_id, CONSUMER_GROUP_NAME)
+ .await
+ .unwrap();
+
+ let mut pre_messages: Vec<IggyMessage> = (0..PRE_RESTART_MESSAGES)
+ .map(|i| {
+ IggyMessage::builder()
+ .payload(Bytes::from(format!("pre-{i}")))
+ .build()
+ .unwrap()
+ })
+ .collect();
+
+ setup_client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut pre_messages,
+ )
+ .await
+ .unwrap();
+
+ // Explicitly flush to disk, then wait for message_saver to persist
+ setup_client
+ .flush_unsaved_buffer(
+ &stream_id, &topic_id, 0, true, // fsync
+ )
+ .await
+ .unwrap();
+ tokio::time::sleep(Duration::from_secs(2)).await;
+ drop(setup_client);
+
+ // Step 3: Restart server
+ harness.server_mut().stop().expect("Failed to stop server");
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ harness
+ .server_mut()
+ .start()
+ .expect("Failed to start server");
+ tokio::time::sleep(Duration::from_secs(1)).await;
+
+ // Step 4-5: Concurrent produce + consume after restart.
+ // Key: producer sends a burst FIRST so the journal has data (with wrong
+ // base_offset=0) BEFORE the consumer starts reading disk offsets. This is
+ // the window that triggers the bug.
+ let producer_client = harness.tcp_root_client().await.unwrap();
+
+ // Send an initial burst so the journal is populated before consumer starts
+ for i in 0..20u32 {
+ let mut messages = vec![
+ IggyMessage::builder()
+ .payload(Bytes::from(format!("burst-{i}")))
+ .build()
+ .unwrap(),
+ ];
+ producer_client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .unwrap();
+ }
+
+ let consumer_client = harness.tcp_root_client().await.unwrap();
+
+ // Consumer joins the consumer group
+ consumer_client
+ .join_consumer_group(
+ &stream_id,
+ &topic_id,
+ &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+ )
+ .await
+ .unwrap();
+
+ let producer_done = Arc::new(AtomicBool::new(false));
+ let producer_done_clone = producer_done.clone();
+
+ // Consumer task: polls with Next + auto_commit, checks contiguous offsets
+ let consumer_task = tokio::spawn(async move {
+ let stream = Identifier::named(STREAM_NAME).unwrap();
+ let topic = Identifier::named(TOPIC_NAME).unwrap();
+ let consumer =
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+ let mut last_offset: Option<u64> = None;
+ let mut total_received = 0u64;
+ let mut consecutive_empty = 0u32;
+ let deadline = Instant::now() + MAX_TEST_DURATION;
+
+ loop {
+ if Instant::now() >= deadline {
+ panic!(
+ "Consumer timed out after {MAX_TEST_DURATION:?}. \
+ Received {total_received}, last_offset: {last_offset:?}"
+ );
+ }
+
+ let result = consumer_client
+ .poll_messages(
+ &stream,
+ &topic,
+ Some(0),
+ &consumer,
+ &PollingStrategy::next(),
+ CONSUMER_BATCH_SIZE,
+ true, // auto_commit
+ )
+ .await;
+
+ match result {
+ Ok(polled) if polled.messages.is_empty() => {
+ if producer_done_clone.load(Ordering::Relaxed) {
+ consecutive_empty += 1;
+ if consecutive_empty >= 20 {
+ break;
+ }
+ tokio::time::sleep(Duration::from_millis(50)).await;
+ } else {
+ tokio::time::sleep(Duration::from_millis(5)).await;
+ }
+ }
+ Ok(polled) => {
+ consecutive_empty = 0;
+ for msg in &polled.messages {
+ let offset = msg.header.offset;
+ if let Some(last) = last_offset {
+ assert_eq!(
+ offset,
+ last + 1,
+ "OFFSET SKIP after restart! Expected {}, got
{} \
+ (skipped {} messages). Journal base_offset
was \
+ not initialized after bootstrap.",
+ last + 1,
+ offset,
+ offset.saturating_sub(last + 1),
+ );
+ }
+ last_offset = Some(offset);
+ total_received += 1;
+ }
+ }
+ Err(e) => {
+ eprintln!("Consumer poll error: {e:?}");
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ }
+ }
+ }
+
+ (total_received, last_offset)
+ });
+
+ // Producer: sends messages every 10ms after restart (matches colleague's
scenario)
+ let mut sent = 0u64;
+ while sent < POST_RESTART_MESSAGES {
+ let batch_count = PRODUCER_BATCH_SIZE.min((POST_RESTART_MESSAGES -
sent) as u32);
+ let mut messages: Vec<IggyMessage> = (0..batch_count)
+ .map(|i| {
+ IggyMessage::builder()
+ .payload(Bytes::from(format!("post-{}", sent + i as u64)))
+ .build()
+ .unwrap()
+ })
+ .collect();
+
+ producer_client
+ .send_messages(
+ &stream_id,
+ &topic_id,
+ &Partitioning::partition_id(0),
+ &mut messages,
+ )
+ .await
+ .unwrap_or_else(|e| panic!("Producer send failed at sent={sent}:
{e}"));
+
+ sent += batch_count as u64;
+ tokio::time::sleep(Duration::from_millis(10)).await;
+ }
+
+ producer_done.store(true, Ordering::Relaxed);
+
+ let (total_received, last_offset) = consumer_task.await.unwrap();
+
+ let initial_burst = 20u64;
+ let expected_total = PRE_RESTART_MESSAGES as u64 + initial_burst +
POST_RESTART_MESSAGES;
+ assert_eq!(
+ total_received,
+ expected_total,
+ "Consumer received {total_received}/{expected_total} messages. \
+ Last offset: {last_offset:?}. Missing {} messages.",
+ expected_total - total_received,
+ );
+
+ producer_client.delete_stream(&stream_id).await.unwrap();
+}
diff --git a/core/integration/tests/server/specific.rs
b/core/integration/tests/server/specific.rs
index 2a105939d..49c224150 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -18,8 +18,9 @@
*/
use crate::server::scenarios::{
- message_size_scenario, reconnect_after_restart_scenario,
segment_rotation_race_scenario,
- single_message_per_batch_scenario, tcp_tls_scenario,
websocket_tls_scenario,
+ message_size_scenario, reconnect_after_restart_scenario,
restart_offset_skip_scenario,
+ segment_rotation_race_scenario, single_message_per_batch_scenario,
tcp_tls_scenario,
+ websocket_tls_scenario,
};
use integration::iggy_harness;
@@ -86,6 +87,40 @@ async fn consumer_reconnect_after_server_restart(harness:
&mut TestHarness) {
reconnect_after_restart_scenario::run_consumer(harness).await;
}
+#[iggy_harness(server(
+ partition.messages_required_to_save = "1",
+ partition.enforce_fsync = true
+))]
+async fn single_message_restart_offset_zero(harness: &mut TestHarness) {
+
reconnect_after_restart_scenario::run_single_message_offset_zero_restart(harness).await;
+}
+
+#[iggy_harness(server(
+ partition.messages_required_to_save = "1",
+ partition.enforce_fsync = true
+))]
+async fn consumer_offset_ahead_after_crash(harness: &mut TestHarness) {
+
reconnect_after_restart_scenario::run_consumer_offset_ahead_after_crash(harness).await;
+}
+
+/// Regression test: consumer offset skip after server restart during
concurrent
+/// produce+consume. Reproduces the exact scenario from issue #2924/#2715:
+/// send messages, restart server, produce+consume concurrently, verify no
offset
+/// gaps.
+///
+/// Config: high messages_required_to_save so post-restart messages accumulate
in
+/// the journal (exposing the base_offset=0 bug). message_saver flushes
pre-restart
+/// data before the restart.
+#[iggy_harness(server(
+ partition.messages_required_to_save = "10000",
+ partition.enforce_fsync = false,
+ message_saver.enabled = true,
+ message_saver.interval = "1s"
+))]
+async fn restart_offset_skip(harness: &mut TestHarness) {
+ restart_offset_skip_scenario::run(harness).await;
+}
+
/// This test configures the server to trigger frequent segment rotations and
runs
/// multiple concurrent producers across all protocols (TCP, HTTP, QUIC,
WebSocket)
/// to maximize the chance of hitting the race condition between
persist_messages_to_disk
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 02bd742f2..a874fd068 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -237,7 +237,7 @@ pub async fn load_segments(
) -> Result<SegmentedLog<MemoryMessageJournal>, IggyError> {
let mut log_files = collect_log_files(&partition_path).await?;
log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
- let mut log = SegmentedLog::<MemoryMessageJournal>::default();
+ let mut log = SegmentedLog::new(MemoryMessageJournal::empty());
for entry in log_files {
let log_file_name = entry
.path
@@ -324,7 +324,7 @@ pub async fn load_segments(
};
let end_offset = if loaded_indexes.count() == 0 {
- 0
+ start_offset
} else {
let last_index_offset = loaded_indexes.last().unwrap().offset() as
u64;
start_offset + last_index_offset
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index e76a8cb5c..958a6fd8c 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -37,7 +37,7 @@ use builder::IggyShardBuilder;
use dashmap::DashMap;
use iggy_common::SemanticVersion;
use iggy_common::sharding::{IggyNamespace, PartitionLocation};
-use iggy_common::{EncryptorKind, IggyError};
+use iggy_common::{EncryptorKind, IggyByteSize, IggyError};
use std::{
cell::{Cell, RefCell},
net::SocketAddr,
@@ -48,7 +48,7 @@ use std::{
},
time::{Duration, Instant},
};
-use tracing::{debug, error, info, instrument};
+use tracing::{debug, error, info, instrument, warn};
use transmission::connector::{Receiver, ShardConnector, StopReceiver};
pub mod builder;
@@ -302,13 +302,81 @@ impl IggyShard {
stats.increment_segments_count(1);
}
- let current_offset =
loaded_log.active_segment().end_offset;
+ // Use the max end_offset across segments that have
data,
+ // not just the active segment. Handles the edge case
where
+ // the active segment is empty (rotated right before
shutdown).
+ let current_offset = loaded_log
+ .segments()
+ .iter()
+ .filter(|s| s.size > IggyByteSize::default())
+ .map(|s| s.end_offset)
+ .max()
+ .unwrap_or(0);
stats.set_current_offset(current_offset);
- // Only increment offset if we have messages
(current_offset > 0).
- // When current_offset is 0 and we have no messages,
first message
- // should get offset 0.
- let should_increment_offset = current_offset > 0;
+ // Check if ANY segment has data. Cannot use
current_offset > 0
+ // because a single message at offset 0 yields
current_offset = 0
+ // yet must still increment on the next append.
+ let should_increment_offset = loaded_log
+ .segments()
+ .iter()
+ .any(|s| s.size > IggyByteSize::default());
+
+ // After a crash (OOM, SIGKILL), auto_commit may have
persisted
+ // a consumer offset beyond what was flushed to disk.
Clamp to
+ // the partition's actual offset to prevent permanent
empty polls.
+ {
+ let guard = consumer_offsets.pin();
+ for entry in guard.iter() {
+ let stored =
entry.1.offset.load(Ordering::Relaxed);
+ if stored > current_offset {
+ warn!(
+ "Consumer {} offset {} ahead of
partition offset {} \
+ for stream {}, topic {}, partition {}
- clamping \
+ (crash recovery)",
+ entry.0,
+ stored,
+ current_offset,
+ stream_id,
+ topic_id,
+ partition_id
+ );
+ entry.1.offset.store(current_offset,
Ordering::Relaxed);
+ }
+ }
+ }
+ {
+ let guard = consumer_group_offsets.pin();
+ for entry in guard.iter() {
+ let stored =
entry.1.offset.load(Ordering::Relaxed);
+ if stored > current_offset {
+ warn!(
+ "Consumer group {:?} offset {} ahead
of partition \
+ offset {} for stream {}, topic {},
partition {} - \
+ clamping (crash recovery)",
+ entry.0,
+ stored,
+ current_offset,
+ stream_id,
+ topic_id,
+ partition_id
+ );
+ entry.1.offset.store(current_offset,
Ordering::Relaxed);
+ }
+ }
+ }
+
+ // Initialize journal base_offset so the three-tier
+ // routing in ops.rs computes correct in_memory_floor.
+ // Without this, journal defaults to base_offset=0
which
+ // causes disk reads to be skipped after restart.
+ if should_increment_offset {
+ use crate::streaming::partitions::journal::{Inner,
Journal};
+ loaded_log.journal_mut().init(Inner {
+ base_offset: current_offset + 1,
+ ..Default::default()
+ });
+ }
let revision_id = init_info.revision_id;
diff --git a/core/server/src/shard/system/messages.rs
b/core/server/src/shard/system/messages.rs
index d8ce2d891..a59f68d6d 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -364,7 +364,7 @@ impl IggyShard {
let segment = &mut partition.log.segments_mut()[segment_index];
- if segment.end_offset == 0 {
+ if segment.start_timestamp == 0 {
segment.start_timestamp = batch.first_timestamp().unwrap();
}
diff --git a/core/server/src/shard/system/partitions.rs
b/core/server/src/shard/system/partitions.rs
index e281544c1..f0831828a 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -260,7 +260,29 @@ impl IggyShard {
stats.increment_segments_count(1);
}
- let current_offset = loaded_log.active_segment().end_offset;
+ // Use the max end_offset across segments that have data, not just
+ // the active segment. Mirrors the fix in shard/mod.rs bootstrap.
+ let current_offset = loaded_log
+ .segments()
+ .iter()
+ .filter(|s| s.size > iggy_common::IggyByteSize::default())
+ .map(|s| s.end_offset)
+ .max()
+ .unwrap_or(0);
+
+ let should_increment_offset = loaded_log
+ .segments()
+ .iter()
+ .any(|s| s.size > iggy_common::IggyByteSize::default());
+
+ // Initialize journal base_offset so three-tier routing works
correctly.
+ if should_increment_offset {
+ use crate::streaming::partitions::journal::{Inner, Journal};
+ loaded_log.journal_mut().init(Inner {
+ base_offset: current_offset + 1,
+ ..Default::default()
+ });
+ }
let (revision_id, consumer_offsets, consumer_group_offsets) = self
.metadata
@@ -280,6 +302,54 @@ impl IggyShard {
)
});
+ // Clamp consumer offsets that are ahead of partition offset (crash
recovery).
+ {
+ let guard = consumer_offsets.pin();
+ for entry in guard.iter() {
+ let stored =
entry.1.offset.load(std::sync::atomic::Ordering::Relaxed);
+ if stored > current_offset {
+ tracing::warn!(
+ "Consumer {} offset {} ahead of partition offset {} \
+ for stream {}, topic {}, partition {} - clamping \
+ (lazy init recovery)",
+ entry.0,
+ stored,
+ current_offset,
+ stream_id,
+ topic_id,
+ partition_id
+ );
+ entry
+ .1
+ .offset
+ .store(current_offset,
std::sync::atomic::Ordering::Relaxed);
+ }
+ }
+ }
+ {
+ let guard = consumer_group_offsets.pin();
+ for entry in guard.iter() {
+ let stored =
entry.1.offset.load(std::sync::atomic::Ordering::Relaxed);
+ if stored > current_offset {
+ tracing::warn!(
+ "Consumer group {:?} offset {} ahead of partition \
+ offset {} for stream {}, topic {}, partition {} - \
+ clamping (lazy init recovery)",
+ entry.0,
+ stored,
+ current_offset,
+ stream_id,
+ topic_id,
+ partition_id
+ );
+ entry
+ .1
+ .offset
+ .store(current_offset,
std::sync::atomic::Ordering::Relaxed);
+ }
+ }
+ }
+
let partition = LocalPartition::with_log(
loaded_log,
stats,
@@ -289,7 +359,7 @@ impl IggyShard {
None,
created_at,
revision_id,
- current_offset > 0,
+ should_increment_offset,
);
self.local_partitions.borrow_mut().insert(*ns, partition);
diff --git a/core/server/src/shard/system/segments.rs
b/core/server/src/shard/system/segments.rs
index 43b634c73..b119bb5e3 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -19,7 +19,7 @@ use crate::configs::cache_indexes::CacheIndexesConfig;
use crate::shard::IggyShard;
use crate::streaming::segments::Segment;
use iggy_common::sharding::IggyNamespace;
-use iggy_common::{IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize};
+use iggy_common::{ConsumerKind, IggyError, IggyExpiry, IggyTimestamp,
MaxTopicSize};
impl IggyShard {
/// Performs all cleanup for a topic's partitions: time-based expiry then
size-based trimming.
@@ -122,15 +122,36 @@ impl IggyShard {
let Some(partition) = partitions.get(&ns) else {
return Ok((0, 0));
};
+
+ let min_committed = Self::min_committed_offset(
+ &partition.consumer_offsets,
+ &partition.consumer_group_offsets,
+ );
+
let segments = partition.log.segments();
let last_idx = segments.len().saturating_sub(1);
+ let mut offsets = Vec::new();
- segments
- .iter()
- .enumerate()
- .filter(|(idx, seg)| *idx != last_idx && seg.is_expired(now,
expiry))
- .map(|(_, seg)| seg.start_offset)
- .collect()
+ for (idx, seg) in segments.iter().enumerate() {
+ if idx == last_idx || !seg.is_expired(now, expiry) {
+ continue;
+ }
+ if let Some((barrier, kind, id)) = &min_committed
+ && seg.end_offset > *barrier
+ {
+ tracing::warn!(
+ "Segment [{}..{}] blocked from expiry-based deletion \
+ by {kind} (ID: {id}) at offset {barrier} \
+ in partition {partition_id} (stream: {stream_id},
topic: {topic_id})",
+ seg.start_offset,
+ seg.end_offset,
+ );
+ continue;
+ }
+ offsets.push(seg.start_offset);
+ }
+
+ offsets
};
let mut total_segments = 0u64;
@@ -172,6 +193,23 @@ impl IggyShard {
continue;
}
+ let min_committed = Self::min_committed_offset(
+ &partition.consumer_offsets,
+ &partition.consumer_group_offsets,
+ );
+ if let Some((barrier, kind, id)) = &min_committed
+ && first.end_offset > *barrier
+ {
+ tracing::warn!(
+ "Segment [{}..{}] blocked from size-based deletion \
+ by {kind} (ID: {id}) at offset {barrier} \
+ in partition {partition_id} (stream: {stream_id}, topic:
{topic_id})",
+ first.start_offset,
+ first.end_offset,
+ );
+ continue;
+ }
+
match &oldest {
None => oldest = Some((partition_id, first.start_offset,
first.start_timestamp)),
Some((_, _, ts)) if first.start_timestamp < *ts => {
@@ -271,11 +309,11 @@ impl IggyShard {
}
/// Deletes the N oldest **sealed** segments from a partition, preserving
the active segment
- /// and partition offset. Reuses `remove_segment_by_offset` — same logic
as the message cleaner.
+ /// and partition offset. Reuses `remove_segment_by_offset` - same logic
as the message cleaner.
///
- /// Segments containing unconsumed messages are protected: a segment is
only eligible for
- /// deletion if `end_offset <= min_committed_offset` across all consumers
and consumer groups.
- /// If no consumers exist, there is no barrier.
+ /// Segments containing unconsumed messages are protected by a barrier:
deletion is skipped
+ /// when `end_offset > min_committed_offset` and a warning is logged
identifying the
+ /// blocking consumer. If no consumers exist, there is no barrier.
pub(crate) async fn delete_oldest_segments(
&self,
stream_id: usize,
@@ -298,18 +336,33 @@ impl IggyShard {
let segments = partition.log.segments();
let last_idx = segments.len().saturating_sub(1);
+ let mut offsets = Vec::new();
+ let mut collected = 0u32;
- segments
- .iter()
- .enumerate()
- .filter(|(idx, seg)| {
- *idx != last_idx
- && seg.sealed
- && min_committed.is_none_or(|barrier| seg.end_offset
<= barrier)
- })
- .map(|(_, seg)| seg.start_offset)
- .take(segments_count as usize)
- .collect()
+ for (idx, seg) in segments.iter().enumerate() {
+ if collected >= segments_count {
+ break;
+ }
+ if idx == last_idx || !seg.sealed {
+ continue;
+ }
+ if let Some((barrier, kind, id)) = &min_committed
+ && seg.end_offset > *barrier
+ {
+ tracing::warn!(
+ "Segment [{}..{}] blocked from size-based deletion \
+ by {kind} (ID: {id}) at offset {barrier} \
+ in partition {partition_id} (stream: {stream_id},
topic: {topic_id})",
+ seg.start_offset,
+ seg.end_offset,
+ );
+ continue;
+ }
+ offsets.push(seg.start_offset);
+ collected += 1;
+ }
+
+ offsets
};
let mut total_segments = 0u64;
@@ -325,20 +378,29 @@ impl IggyShard {
}
/// Returns the minimum committed offset across all consumers and consumer
groups,
- /// or `None` if no consumers exist (no barrier).
+ /// along with the identity of the consumer holding it. Returns `None` if
no
+ /// consumers exist (no barrier).
fn min_committed_offset(
consumer_offsets:
&crate::streaming::partitions::consumer_offsets::ConsumerOffsets,
consumer_group_offsets:
&crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets,
- ) -> Option<u64> {
+ ) -> Option<(u64, ConsumerKind, u32)> {
let co_guard = consumer_offsets.pin();
let cg_guard = consumer_group_offsets.pin();
- let consumers = co_guard
- .iter()
- .map(|(_, co)|
co.offset.load(std::sync::atomic::Ordering::Relaxed));
- let groups = cg_guard
- .iter()
- .map(|(_, co)|
co.offset.load(std::sync::atomic::Ordering::Relaxed));
- consumers.chain(groups).min()
+ let consumers = co_guard.iter().map(|(_, co)| {
+ (
+ co.offset.load(std::sync::atomic::Ordering::Relaxed),
+ co.kind,
+ co.consumer_id,
+ )
+ });
+ let groups = cg_guard.iter().map(|(_, co)| {
+ (
+ co.offset.load(std::sync::atomic::Ordering::Relaxed),
+ co.kind,
+ co.consumer_id,
+ )
+ });
+ consumers.chain(groups).min_by_key(|(offset, _, _)| *offset)
}
/// Drains all segments, deletes their files, and re-initializes the
partition log at offset 0.
diff --git a/core/server/src/streaming/partitions/journal.rs
b/core/server/src/streaming/partitions/journal.rs
index d16db4f6d..87bd9cfcb 100644
--- a/core/server/src/streaming/partitions/journal.rs
+++ b/core/server/src/streaming/partitions/journal.rs
@@ -19,11 +19,11 @@ use crate::streaming::segments::{IggyMessagesBatchMut,
IggyMessagesBatchSet};
use iggy_common::{IggyByteSize, IggyError};
use std::fmt::Debug;
-// TODO: Will have to revisit this Journal abstraction....
-// I don't like that it has to leak impl detail via the `Inner` struct in
order to be functional.
-
#[derive(Default, Debug)]
pub struct Inner {
+ /// Base offset for the next journal epoch. After commit(), set to
+ /// current_offset + 1. Used in `append()`: `current_offset = base_offset +
+ /// messages_count - 1`.
pub base_offset: u64,
pub current_offset: u64,
pub first_timestamp: u64,
@@ -32,17 +32,30 @@ pub struct Inner {
pub size: IggyByteSize,
}
-#[derive(Default, Debug)]
+#[derive(Debug)]
pub struct MemoryMessageJournal {
batches: IggyMessagesBatchSet,
inner: Inner,
}
-impl Clone for MemoryMessageJournal {
- fn clone(&self) -> Self {
+impl MemoryMessageJournal {
+ /// Create an empty journal for a fresh partition (no existing data).
+ pub fn empty() -> Self {
+ Self {
+ batches: IggyMessagesBatchSet::default(),
+ inner: Inner::default(),
+ }
+ }
+
+ /// Create an empty journal positioned at the given offset. Used after
+ /// bootstrap when the partition already has data on disk up to some
offset.
+ pub fn at_offset(base_offset: u64) -> Self {
Self {
- batches: Default::default(),
- inner: Default::default(),
+ batches: IggyMessagesBatchSet::default(),
+ inner: Inner {
+ base_offset,
+ ..Default::default()
+ },
}
}
}
@@ -63,6 +76,26 @@ impl Journal for MemoryMessageJournal {
batch_messages_count
);
+ // Defense-in-depth: on first append after empty/default state, correct
+ // base_offset from the batch's actual first offset. Mirrors the
existing
+ // first_timestamp initialization pattern below. Catches code paths
that
+ // create a journal without calling init().
+ if self.inner.messages_count == 0
+ && let Some(first_offset) = entry.first_offset()
+ {
+ // Allow disagreement when either side is 0 (fresh partition or
+ // reset after purge). Only flag when both are non-zero and differ.
+ debug_assert!(
+ self.inner.base_offset == 0
+ || first_offset == 0
+ || self.inner.base_offset == first_offset,
+ "journal base_offset ({}) disagrees with batch first_offset
({})",
+ self.inner.base_offset,
+ first_offset
+ );
+ self.inner.base_offset = first_offset;
+ }
+
let batch_size = entry.size();
let first_timestamp = entry.first_timestamp().unwrap();
let last_timestamp = entry.last_timestamp().unwrap();
@@ -107,6 +140,38 @@ impl Journal for MemoryMessageJournal {
fn inner(&self) -> &Self::Inner {
&self.inner
}
+
+ fn first_offset(&self) -> Option<u64> {
+ if self.is_empty() {
+ None
+ } else {
+ Some(self.inner.base_offset)
+ }
+ }
+
+ fn last_offset(&self) -> Option<u64> {
+ if self.is_empty() {
+ None
+ } else {
+ Some(self.inner.current_offset)
+ }
+ }
+
+ fn first_timestamp(&self) -> Option<u64> {
+ if self.is_empty() || self.inner.first_timestamp == 0 {
+ None
+ } else {
+ Some(self.inner.first_timestamp)
+ }
+ }
+
+ fn last_timestamp(&self) -> Option<u64> {
+ if self.is_empty() || self.inner.end_timestamp == 0 {
+ None
+ } else {
+ Some(self.inner.end_timestamp)
+ }
+ }
}
pub trait Journal {
@@ -127,6 +192,18 @@ pub trait Journal {
fn inner(&self) -> &Self::Inner;
+ /// First offset of data in the journal, or None if empty.
+ fn first_offset(&self) -> Option<u64>;
+
+ /// Last offset of data in the journal, or None if empty.
+ fn last_offset(&self) -> Option<u64>;
+
+ /// Timestamp of first message in journal, or None if empty.
+ fn first_timestamp(&self) -> Option<u64>;
+
+ /// Timestamp of last message in journal, or None if empty.
+ fn last_timestamp(&self) -> Option<u64>;
+
// `flush` is only useful in case of an journal that has disk backed WAL.
// This could be merged together with `append`, but not doing this for two
reasons.
// 1. In case of the `Journal` being used as part of structure that
utilizes interior mutability, async with borrow_mut is not possible.
diff --git a/core/server/src/streaming/partitions/local_partition.rs
b/core/server/src/streaming/partitions/local_partition.rs
index 14f5e63d9..de49720ef 100644
--- a/core/server/src/streaming/partitions/local_partition.rs
+++ b/core/server/src/streaming/partitions/local_partition.rs
@@ -56,7 +56,9 @@ impl LocalPartition {
should_increment_offset: bool,
) -> Self {
Self {
- log: SegmentedLog::default(),
+ log: SegmentedLog::new(
+
crate::streaming::partitions::journal::MemoryMessageJournal::empty(),
+ ),
offset,
consumer_offsets,
consumer_group_offsets,
diff --git a/core/server/src/streaming/partitions/log.rs
b/core/server/src/streaming/partitions/log.rs
index ae7c1efaa..16318553e 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -64,6 +64,18 @@ impl<J> SegmentedLog<J>
where
J: Journal + Debug,
{
+ pub fn new(journal: J) -> Self {
+ Self {
+ journal,
+ _access_map:
AllocRingBuffer::with_capacity_power_of_2(ACCESS_MAP_CAPACITY),
+ _cache: (),
+ segments: Vec::with_capacity(SEGMENTS_CAPACITY),
+ storage: Vec::with_capacity(SEGMENTS_CAPACITY),
+ indexes: Vec::with_capacity(SEGMENTS_CAPACITY),
+ in_flight: IggyMessagesBatchSetInFlight::default(),
+ }
+ }
+
pub fn has_segments(&self) -> bool {
!self.segments.is_empty()
}
diff --git a/core/server/src/streaming/partitions/mod.rs
b/core/server/src/streaming/partitions/mod.rs
index f8a3593ed..ee15942f9 100644
--- a/core/server/src/streaming/partitions/mod.rs
+++ b/core/server/src/streaming/partitions/mod.rs
@@ -26,6 +26,8 @@ pub mod local_partition;
pub mod local_partitions;
pub mod log;
pub mod ops;
+#[cfg(test)]
+mod ops_tests;
pub mod segments;
pub mod storage;
diff --git a/core/server/src/streaming/partitions/ops.rs
b/core/server/src/streaming/partitions/ops.rs
index aa481b7b6..489e35b8a 100644
--- a/core/server/src/streaming/partitions/ops.rs
+++ b/core/server/src/streaming/partitions/ops.rs
@@ -20,6 +20,20 @@
//!
//! This module provides the core logic for polling and loading messages from
partitions,
//! avoiding code duplication between `IggyShard` and test harnesses.
+//!
+//! # Safety invariants
+//!
+//! The snapshot-then-read pattern in [`get_messages_by_offset`] and
+//! [`poll_messages_by_timestamp`] is safe only under **single-threaded shard
+//! execution** (compio runtime). Between the metadata snapshot and the actual
+//! reads, no other shard request can mutate the partition state because the
+//! message pump processes one request at a time.
+//!
+//! The poll + auto_commit sequence in the handler (`handlers.rs`) is likewise
+//! non-atomic but safe for the same reason.
+//!
+//! If the architecture ever moves to multi-threaded shard processing or adds
+//! compaction/message deletion, these invariants must be re-evaluated.
use super::journal::Journal;
use super::local_partitions::LocalPartitions;
@@ -131,100 +145,87 @@ pub async fn get_messages_by_offset(
return Ok(IggyMessagesBatchSet::empty());
}
- // Get journal and in_flight metadata for routing
- let (
- is_journal_empty,
- journal_first_offset,
- journal_last_offset,
- in_flight_empty,
- in_flight_first,
- in_flight_last,
- ) = {
+ // Snapshot journal and in-flight metadata for routing decisions.
+ let (journal_first_offset, in_flight_empty, in_flight_first,
in_flight_last) = {
let store = local_partitions.borrow();
let partition = store
.get(namespace)
.expect("local_partitions: partition must exist for poll");
let journal = partition.log.journal();
- let journal_inner = journal.inner();
let in_flight = partition.log.in_flight();
(
- journal.is_empty(),
- journal_inner.base_offset,
- journal_inner.current_offset,
+ journal.first_offset(),
in_flight.is_empty(),
in_flight.first_offset(),
in_flight.last_offset(),
)
};
- let end_offset = start_offset + (count - 1).max(1) as u64;
-
- // Case 0: Journal is empty - check in_flight buffer or disk
- if is_journal_empty {
- if !in_flight_empty && start_offset >= in_flight_first && start_offset
<= in_flight_last {
- let in_flight_batches = {
- let store = local_partitions.borrow();
- let partition = store
- .get(namespace)
- .expect("local_partitions: partition must exist for poll");
- partition
- .log
- .in_flight()
- .get_by_offset(start_offset, count)
- .to_vec()
- };
- if !in_flight_batches.is_empty() {
- let mut result = IggyMessagesBatchSet::empty();
- result.add_immutable_batches(&in_flight_batches);
- return Ok(result.get_by_offset(start_offset, count));
- }
+ // Lookup ordered by ascending offset: disk -> in-flight -> journal.
+ //
+ // Offsets are sequential: disk < in-flight < journal. A request may span
+ // multiple tiers, so we advance `current` through each sequentially.
+ // See issue #2715.
+
+ let mut combined = IggyMessagesBatchSet::empty();
+ let mut remaining = count;
+ let mut current = start_offset;
+
+ // Lowest in-memory tier boundary (if any). Disk handles offsets below
this.
+ let in_memory_floor = if !in_flight_empty {
+ in_flight_first
+ } else {
+ journal_first_offset.unwrap_or(u64::MAX)
+ };
+
+ // Disk (pre-tier): offsets below the lowest in-memory tier.
+ if remaining > 0 && current < in_memory_floor {
+ let disk_count =
+ ((in_memory_floor.min(current + remaining as u64) - current) as
u32).min(remaining);
+ let disk_messages =
+ load_messages_from_disk(local_partitions, namespace, current,
disk_count).await?;
+ let loaded = disk_messages.count();
+ if loaded > 0 {
+ current += loaded as u64;
+ remaining = remaining.saturating_sub(loaded);
+ combined.add_batch_set(disk_messages);
}
- return load_messages_from_disk(local_partitions, namespace,
start_offset, count).await;
}
- // Case 1: All messages are in journal
- if start_offset >= journal_first_offset && end_offset <=
journal_last_offset {
- let batches = {
+ // In-flight: committed data being persisted to disk.
+ if remaining > 0 && !in_flight_empty && current >= in_flight_first &&
current <= in_flight_last
+ {
+ let in_flight_count = ((in_flight_last - current + 1) as
u32).min(remaining);
+ let in_flight_batches = {
let store = local_partitions.borrow();
let partition = store
.get(namespace)
.expect("local_partitions: partition must exist for poll");
partition
.log
- .journal()
- .get(|batches| batches.get_by_offset(start_offset, count))
+ .in_flight()
+ .get_by_offset(current, in_flight_count)
+ .to_vec()
};
- return Ok(batches);
- }
-
- // Case 2: All messages on disk (end_offset < journal_first_offset)
- if end_offset < journal_first_offset {
- return load_messages_from_disk(local_partitions, namespace,
start_offset, count).await;
- }
-
- // Case 3: Messages span disk and journal boundary
- let disk_count = if start_offset < journal_first_offset {
- ((journal_first_offset - start_offset) as u32).min(count)
- } else {
- 0
- };
-
- let mut combined_batch_set = IggyMessagesBatchSet::empty();
-
- // Load messages from disk if needed
- if disk_count > 0 {
- let disk_messages =
- load_messages_from_disk(local_partitions, namespace, start_offset,
disk_count).await?;
- if !disk_messages.is_empty() {
- combined_batch_set.add_batch_set(disk_messages);
+ if !in_flight_batches.is_empty() {
+ let mut result = IggyMessagesBatchSet::empty();
+ result.add_immutable_batches(&in_flight_batches);
+ let sliced = result.get_by_offset(current, in_flight_count);
+ let loaded = sliced.count();
+ if loaded > 0 {
+ current += loaded as u64;
+ remaining = remaining.saturating_sub(loaded);
+ combined.add_batch_set(sliced);
+ }
}
}
- // Get remaining messages from journal
- let remaining_count = count - combined_batch_set.count();
- if remaining_count > 0 {
- let journal_start_offset = std::cmp::max(start_offset,
journal_first_offset);
+ // Journal: may hold data from recent appends.
+ if remaining > 0
+ && let Some(jfo) = journal_first_offset
+ && current >= jfo
+ {
let journal_messages = {
let store = local_partitions.borrow();
let partition = store
@@ -233,14 +234,14 @@ pub async fn get_messages_by_offset(
partition
.log
.journal()
- .get(|batches| batches.get_by_offset(journal_start_offset,
remaining_count))
+ .get(|batches| batches.get_by_offset(current, remaining))
};
if !journal_messages.is_empty() {
- combined_batch_set.add_batch_set(journal_messages);
+ combined.add_batch_set(journal_messages);
}
}
- Ok(combined_batch_set)
+ Ok(combined)
}
/// Poll messages by timestamp.
@@ -252,8 +253,15 @@ async fn poll_messages_by_timestamp(
) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
let partition_id = namespace.partition_id();
- // Get metadata and journal info
- let (metadata, is_journal_empty, journal_first_timestamp,
journal_last_timestamp) = {
+ // Snapshot metadata from journal and in-flight for routing decisions.
+ let (
+ metadata,
+ journal_first_ts,
+ journal_last_ts,
+ in_flight_empty,
+ in_flight_first_ts,
+ in_flight_last_ts,
+ ) = {
let store = local_partitions.borrow();
let partition = store
.get(namespace)
@@ -263,12 +271,31 @@ async fn poll_messages_by_timestamp(
let metadata = IggyPollMetadata::new(partition_id as u32,
current_offset);
let journal = partition.log.journal();
- let journal_inner = journal.inner();
+
+ let in_flight = partition.log.in_flight();
+ let (ife, ifts, ilts) = if in_flight.is_empty() {
+ (true, 0u64, 0u64)
+ } else {
+ let first_ts = in_flight
+ .batches()
+ .first()
+ .and_then(|b| b.first_timestamp())
+ .unwrap_or(0);
+ let last_ts = in_flight
+ .batches()
+ .last()
+ .and_then(|b| b.last_timestamp())
+ .unwrap_or(0);
+ (false, first_ts, last_ts)
+ };
+
(
metadata,
- journal.is_empty(),
- journal_inner.first_timestamp,
- journal_inner.end_timestamp,
+ journal.first_timestamp(),
+ journal.last_timestamp(),
+ ife,
+ ifts,
+ ilts,
)
};
@@ -276,22 +303,57 @@ async fn poll_messages_by_timestamp(
return Ok((metadata, IggyMessagesBatchSet::empty()));
}
- // Case 0: Journal is empty, all messages on disk
- if is_journal_empty {
- let batches =
- load_messages_from_disk_by_timestamp(local_partitions, namespace,
timestamp, count)
+ // Three-tier timestamp lookup: disk -> in-flight -> journal.
+ // Same structure as offset-based polling (see issue #2715).
+
+ let mut combined = IggyMessagesBatchSet::empty();
+ let mut remaining = count;
+
+ // Phase 1: Disk - timestamps before in-flight range.
+ let disk_upper_ts = if !in_flight_empty {
+ in_flight_first_ts
+ } else {
+ journal_first_ts.unwrap_or(u64::MAX)
+ };
+
+ if timestamp < disk_upper_ts && remaining > 0 {
+ let disk_messages =
+ load_messages_from_disk_by_timestamp(local_partitions, namespace,
timestamp, remaining)
.await?;
- return Ok((metadata, batches));
+ let loaded = disk_messages.count();
+ if loaded > 0 {
+ remaining = remaining.saturating_sub(loaded);
+ combined.add_batch_set(disk_messages);
+ }
}
- // Case 1: Timestamp is after journal's last timestamp - no messages
- if timestamp > journal_last_timestamp {
- return Ok((metadata, IggyMessagesBatchSet::empty()));
+ // Phase 2: In-flight - committed data being persisted.
+ if remaining > 0 && !in_flight_empty && timestamp <= in_flight_last_ts {
+ let in_flight_batches = {
+ let store = local_partitions.borrow();
+ let partition = store
+ .get(namespace)
+ .expect("local_partitions: partition must exist for poll");
+ partition.log.in_flight().batches().to_vec()
+ };
+ if !in_flight_batches.is_empty() {
+ let mut batch_set = IggyMessagesBatchSet::empty();
+ batch_set.add_immutable_batches(&in_flight_batches);
+ let filtered = batch_set.get_by_timestamp(timestamp, remaining);
+ let loaded = filtered.count();
+ if loaded > 0 {
+ remaining = remaining.saturating_sub(loaded);
+ combined.add_batch_set(filtered);
+ }
+ }
}
- // Case 2: Timestamp is within journal range - get from journal
- if timestamp >= journal_first_timestamp {
- let batches = {
+ // Phase 3: Journal - newest appends (post-commit).
+ if remaining > 0
+ && let Some(jlts) = journal_last_ts
+ && timestamp <= jlts
+ {
+ let journal_messages = {
let store = local_partitions.borrow();
let partition = store
.get(namespace)
@@ -299,37 +361,14 @@ async fn poll_messages_by_timestamp(
partition
.log
.journal()
- .get(|batches| batches.get_by_timestamp(timestamp, count))
+ .get(|batches| batches.get_by_timestamp(timestamp, remaining))
};
- return Ok((metadata, batches));
- }
-
- // Case 3: Timestamp is before journal - need disk + possibly journal
- let disk_messages =
- load_messages_from_disk_by_timestamp(local_partitions, namespace,
timestamp, count).await?;
-
- if disk_messages.count() >= count {
- return Ok((metadata, disk_messages));
+ if !journal_messages.is_empty() {
+ combined.add_batch_set(journal_messages);
+ }
}
- // Case 4: Messages span disk and journal
- let remaining_count = count - disk_messages.count();
- let journal_messages = {
- let store = local_partitions.borrow();
- let partition = store
- .get(namespace)
- .expect("local_partitions: partition must exist for poll");
- partition
- .log
- .journal()
- .get(|batches| batches.get_by_timestamp(timestamp,
remaining_count))
- };
-
- let mut combined_batch_set = disk_messages;
- if !journal_messages.is_empty() {
- combined_batch_set.add_batch_set(journal_messages);
- }
- Ok((metadata, combined_batch_set))
+ Ok((metadata, combined))
}
/// Load messages from disk by offset.
@@ -388,7 +427,7 @@ pub async fn load_messages_from_disk(
current_offset
};
- let mut end_offset = offset + (remaining_count - 1).max(1) as u64;
+ let mut end_offset = offset + (remaining_count - 1) as u64;
if end_offset > segment_end_offset {
end_offset = segment_end_offset;
}
@@ -429,7 +468,7 @@ async fn load_segment_messages(
) -> Result<IggyMessagesBatchSet, IggyError> {
let relative_start_offset = (start_offset - segment_start_offset) as u32;
- // Check journal first for this segment's data
+ // Check journal for this segment's data (handles callers outside
get_messages_by_offset).
let journal_data = {
let store = local_partitions.borrow();
let partition = store
@@ -437,14 +476,10 @@ async fn load_segment_messages(
.expect("local_partitions: partition must exist");
let journal = partition.log.journal();
- let is_journal_empty = journal.is_empty();
- let journal_inner = journal.inner();
- let journal_first_offset = journal_inner.base_offset;
- let journal_last_offset = journal_inner.current_offset;
-
- if !is_journal_empty
- && start_offset >= journal_first_offset
- && end_offset <= journal_last_offset
+
+ if let (Some(jfo), Some(jlo)) = (journal.first_offset(),
journal.last_offset())
+ && start_offset >= jfo
+ && end_offset <= jlo
{
Some(journal.get(|batches| batches.get_by_offset(start_offset,
count)))
} else {
@@ -617,14 +652,10 @@ async fn load_segment_messages_by_timestamp(
.expect("local_partitions: partition must exist");
let journal = partition.log.journal();
- let is_journal_empty = journal.is_empty();
- let journal_inner = journal.inner();
- let journal_first_timestamp = journal_inner.first_timestamp;
- let journal_last_timestamp = journal_inner.end_timestamp;
-
- if !is_journal_empty
- && timestamp >= journal_first_timestamp
- && timestamp <= journal_last_timestamp
+
+ if let (Some(jfts), Some(jlts)) = (journal.first_timestamp(),
journal.last_timestamp())
+ && timestamp >= jfts
+ && timestamp <= jlts
{
Some(journal.get(|batches| batches.get_by_timestamp(timestamp,
count)))
} else {
diff --git a/core/server/src/streaming/partitions/ops_tests.rs
b/core/server/src/streaming/partitions/ops_tests.rs
new file mode 100644
index 000000000..fbed5c2ed
--- /dev/null
+++ b/core/server/src/streaming/partitions/ops_tests.rs
@@ -0,0 +1,359 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for the in-flight buffer visibility gap fix (issue #2715).
+//!
+//! These tests set up "State C" directly in memory:
+//! - in-flight holds offsets [0..N-1] (committed journal, not yet on disk)
+//! - journal holds offsets [N..N+M-1] (new appends after commit)
+//! - no actual disk data
+//!
+//! Before the fix, Cases 1-3 in get_messages_by_offset never checked
+//! in-flight, causing the consumer to miss committed data and either
+//! get empty results or skip directly to journal offsets.
+
+#[cfg(test)]
+mod tests {
+ use
crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets;
+ use crate::streaming::partitions::consumer_offsets::ConsumerOffsets;
+ use crate::streaming::partitions::journal::{Inner, Journal};
+ use crate::streaming::partitions::local_partition::LocalPartition;
+ use crate::streaming::partitions::local_partitions::LocalPartitions;
+ use crate::streaming::partitions::ops;
+ use crate::streaming::polling_consumer::PollingConsumer;
+ use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
+ use iggy_common::sharding::IggyNamespace;
+ use iggy_common::{
+ IggyByteSize, IggyMessage, MemoryPool, MemoryPoolConfigOther,
PollingStrategy, Sizeable,
+ };
+ use std::cell::RefCell;
+ use std::sync::Arc;
+ use std::sync::atomic::AtomicU64;
+
+ fn init_memory_pool() {
+ static INIT: std::sync::Once = std::sync::Once::new();
+ INIT.call_once(|| {
+ let config = MemoryPoolConfigOther {
+ enabled: false,
+ size: IggyByteSize::from(64 * 1024 * 1024u64),
+ bucket_capacity: 256,
+ };
+ MemoryPool::init_pool(&config);
+ });
+ }
+
+ fn create_test_partition(current_offset: u64) -> LocalPartition {
+ let stream_stats = Arc::new(StreamStats::default());
+ let topic_stats = Arc::new(TopicStats::new(stream_stats));
+ let partition_stats = Arc::new(PartitionStats::new(topic_stats));
+
+ LocalPartition::new(
+ partition_stats,
+ Arc::new(AtomicU64::new(current_offset)),
+ Arc::new(ConsumerOffsets::with_capacity(10)),
+ Arc::new(ConsumerGroupOffsets::with_capacity(10)),
+ None,
+ iggy_common::IggyTimestamp::now(),
+ 1,
+ true,
+ )
+ }
+
+ fn create_batch(count: u32) -> iggy_common::IggyMessagesBatchMut {
+ let messages: Vec<IggyMessage> = (0..count)
+ .map(|_| {
+ IggyMessage::builder()
+ .payload(bytes::Bytes::from("test-payload"))
+ .build()
+ .unwrap()
+ })
+ .collect();
+
+ let messages_size: u32 = messages
+ .iter()
+ .map(|m| m.get_size_bytes().as_bytes_u32())
+ .sum();
+ iggy_common::IggyMessagesBatchMut::from_messages(&messages,
messages_size)
+ }
+
+ /// Sets up "State C": in-flight holds committed data, journal holds new
+ /// appends that arrived after commit but before persist completes.
+ ///
+ /// Layout:
+ /// segment metadata: [0..journal_end] (no actual disk data)
+ /// in-flight: [0..in_flight_count-1]
+ /// journal: [in_flight_count..in_flight_count+journal_count-1]
+ /// partition.offset: in_flight_count + journal_count - 1
+ async fn setup_state_c(
+ in_flight_count: u32,
+ journal_count: u32,
+ ) -> (RefCell<LocalPartitions>, IggyNamespace) {
+ init_memory_pool();
+ let ns = IggyNamespace::new(1, 1, 0);
+
+ let in_flight_end = in_flight_count as u64 - 1;
+ let journal_base = in_flight_end + 1;
+ let journal_end = journal_base + journal_count as u64 - 1;
+
+ let mut partition = create_test_partition(journal_end);
+
+ let segment = iggy_common::Segment::new(0,
IggyByteSize::from(1_073_741_824u64));
+ let storage = iggy_common::SegmentStorage::default();
+ partition.log.add_persisted_segment(segment, storage);
+
+ let seg = &mut partition.log.segments_mut()[0];
+ seg.end_offset = journal_end;
+ seg.start_timestamp = 1;
+ seg.end_timestamp = 2;
+
+ let mut in_flight_batch = create_batch(in_flight_count);
+ in_flight_batch.prepare_for_persistence(0, 0, 0, None).await;
+ let in_flight_size = in_flight_batch.size();
+ partition.log.set_in_flight(vec![in_flight_batch.freeze()]);
+
+ let journal_inner = Inner {
+ base_offset: journal_base,
+ current_offset: 0,
+ first_timestamp: 0,
+ end_timestamp: 0,
+ messages_count: 0,
+ size: IggyByteSize::default(),
+ };
+ partition.log.journal_mut().init(journal_inner);
+
+ let mut journal_batch = create_batch(journal_count);
+ journal_batch
+ .prepare_for_persistence(0, journal_base, in_flight_size as u32,
None)
+ .await;
+ partition.log.journal_mut().append(journal_batch).unwrap();
+
+ let mut store = LocalPartitions::new();
+ store.insert(ns, partition);
+ (RefCell::new(store), ns)
+ }
+
+ // -----------------------------------------------------------------------
+ // Issue #2715: In-flight buffer must be reachable when journal is
non-empty
+ // -----------------------------------------------------------------------
+
+ #[compio::test]
+ async fn in_flight_reachable_when_journal_non_empty() {
+ let (store, ns) = setup_state_c(10, 5).await;
+ let batches = ops::get_messages_by_offset(&store, &ns, 0, 5)
+ .await
+ .unwrap();
+ assert_eq!(batches.count(), 5);
+ assert_eq!(batches.first_offset(), Some(0));
+ }
+
+ #[compio::test]
+ async fn spanning_in_flight_and_journal_returns_all_in_order() {
+ let (store, ns) = setup_state_c(10, 5).await;
+ let batches = ops::get_messages_by_offset(&store, &ns, 0, 15)
+ .await
+ .unwrap();
+ assert_eq!(batches.count(), 15);
+ assert_eq!(batches.first_offset(), Some(0));
+ }
+
+ #[compio::test]
+ async fn polling_next_starts_from_in_flight_not_journal() {
+ let (store, ns) = setup_state_c(10, 5).await;
+ let consumer = PollingConsumer::Consumer(1, 0);
+ let args =
+
crate::shard::system::messages::PollingArgs::new(PollingStrategy::next(), 15,
false);
+ let (metadata, batches) = ops::poll_messages(&store, &ns, consumer,
args)
+ .await
+ .unwrap();
+ assert_eq!(batches.first_offset(), Some(0));
+ assert!(metadata.current_offset >= 14);
+ }
+
+ #[compio::test]
+ async fn single_message_at_in_flight_journal_boundary() {
+ let (store, ns) = setup_state_c(10, 5).await;
+ let batches = ops::get_messages_by_offset(&store, &ns, 9, 1)
+ .await
+ .unwrap();
+ assert_eq!(batches.count(), 1);
+ assert_eq!(batches.first_offset(), Some(9));
+ }
+
+ #[compio::test]
+ async fn single_message_from_in_flight_at_offset_zero() {
+ let (store, ns) = setup_state_c(10, 5).await;
+ let batches = ops::get_messages_by_offset(&store, &ns, 0, 1)
+ .await
+ .unwrap();
+ assert_eq!(batches.count(), 1);
+ assert_eq!(batches.first_offset(), Some(0));
+ }
+
+ // -----------------------------------------------------------------------
+ // Existing correct behavior must still work
+ // -----------------------------------------------------------------------
+
+ #[compio::test]
+ async fn in_flight_reachable_when_journal_empty() {
+ init_memory_pool();
+ let ns = IggyNamespace::new(1, 1, 0);
+ let mut partition = create_test_partition(9);
+
+ let segment = iggy_common::Segment::new(0,
IggyByteSize::from(1_073_741_824u64));
+ partition
+ .log
+ .add_persisted_segment(segment,
iggy_common::SegmentStorage::default());
+ let seg = &mut partition.log.segments_mut()[0];
+ seg.end_offset = 9;
+ seg.start_timestamp = 1;
+ seg.end_timestamp = 2;
+
+ let mut batch = create_batch(10);
+ batch.prepare_for_persistence(0, 0, 0, None).await;
+ partition.log.set_in_flight(vec![batch.freeze()]);
+
+ let mut store = LocalPartitions::new();
+ store.insert(ns, partition);
+ let store = RefCell::new(store);
+
+ let batches = ops::get_messages_by_offset(&store, &ns, 0, 10)
+ .await
+ .unwrap();
+ assert_eq!(batches.count(), 10);
+ }
+
+ #[compio::test]
+ async fn journal_reachable_when_in_flight_empty() {
+ init_memory_pool();
+ let ns = IggyNamespace::new(1, 1, 0);
+ let mut partition = create_test_partition(9);
+
+ let segment = iggy_common::Segment::new(0,
IggyByteSize::from(1_073_741_824u64));
+ partition
+ .log
+ .add_persisted_segment(segment,
iggy_common::SegmentStorage::default());
+ let seg = &mut partition.log.segments_mut()[0];
+ seg.end_offset = 9;
+ seg.start_timestamp = 1;
+ seg.end_timestamp = 2;
+
+ partition.log.journal_mut().init(Inner {
+ base_offset: 0,
+ current_offset: 0,
+ first_timestamp: 0,
+ end_timestamp: 0,
+ messages_count: 0,
+ size: IggyByteSize::default(),
+ });
+
+ let mut batch = create_batch(10);
+ batch.prepare_for_persistence(0, 0, 0, None).await;
+ partition.log.journal_mut().append(batch).unwrap();
+
+ let mut store = LocalPartitions::new();
+ store.insert(ns, partition);
+ let store = RefCell::new(store);
+
+ let batches = ops::get_messages_by_offset(&store, &ns, 0, 10)
+ .await
+ .unwrap();
+ assert_eq!(batches.count(), 10);
+ }
+
+ #[compio::test]
+ async fn journal_single_message_at_specific_offset() {
+ let (store, ns) = setup_state_c(10, 5).await;
+ let batches = ops::get_messages_by_offset(&store, &ns, 12, 1)
+ .await
+ .unwrap();
+ assert_eq!(batches.count(), 1);
+ assert_eq!(batches.first_offset(), Some(12));
+ }
+
+ // -----------------------------------------------------------------------
+ // Bug reproduction: journal base_offset=0 after restart causes offset skip
+ // -----------------------------------------------------------------------
+
+ /// Verifies that journal self-heals base_offset on first append.
+ /// Without self-healing, a journal created via Default would have
+ /// base_offset=0, causing incorrect offset calculations.
+ #[compio::test]
+ async fn journal_self_heals_base_offset_on_first_append() {
+ init_memory_pool();
+
+ let mut journal =
crate::streaming::partitions::journal::MemoryMessageJournal::empty();
+ assert_eq!(journal.inner().base_offset, 0);
+
+ let mut batch = create_batch(5);
+ batch.prepare_for_persistence(0, 100, 0, None).await;
+ journal.append(batch).unwrap();
+
+ assert_eq!(
+ journal.inner().base_offset,
+ 100,
+ "Journal should self-heal base_offset from batch's first offset"
+ );
+ assert_eq!(
+ journal.inner().current_offset,
+ 104,
+ "current_offset should be base_offset + messages_count - 1"
+ );
+ }
+
+ /// Verifies that slice_by_offset returns None when start_offset is below
+ /// the batch's range, instead of clamping to index 0 (the old bug).
+ #[compio::test]
+ async fn slice_by_offset_rejects_offset_below_range() {
+ init_memory_pool();
+
+ let mut batch = create_batch(10);
+ batch.prepare_for_persistence(0, 100, 0, None).await;
+
+ let result = batch.slice_by_offset(95, 10);
+
+ assert!(
+ result.is_none(),
+ "slice_by_offset should return None when start_offset(95) <
first_offset(100), \
+ got {} messages at offset {:?}",
+ result.as_ref().map(|r| r.count()).unwrap_or(0),
+ result.as_ref().and_then(|r| r.first_offset())
+ );
+ }
+
+ /// After proper journal initialization, polling across the
in-flight/journal
+ /// boundary returns contiguous messages with no gaps.
+ #[compio::test]
+ async fn post_restart_poll_with_correct_journal_init_no_skip() {
+ let (store, ns) = setup_state_c(100, 10).await;
+
+ let batches = ops::get_messages_by_offset(&store, &ns, 95, 10)
+ .await
+ .unwrap();
+
+ assert_eq!(batches.count(), 10, "should return exactly 10 messages");
+ assert_eq!(
+ batches.first_offset(),
+ Some(95),
+ "first message should be at requested offset 95"
+ );
+ assert_eq!(
+ batches.last_offset(),
+ Some(104),
+ "last message should be at offset 104 (contiguous)"
+ );
+ }
+}