This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 298cd3282 fix(server): prevent consumer offset skip during concurrent 
produce+consume (#2958)
298cd3282 is described below

commit 298cd3282b617bef8cfb266fd5463d7b186868a3
Author: Hubert Gruszecki <[email protected]>
AuthorDate: Mon Mar 23 14:09:07 2026 +0100

    fix(server): prevent consumer offset skip during concurrent produce+consume 
(#2958)
---
 .../common/src/types/message/messages_batch_mut.rs |   2 +-
 .../common/src/types/message/messages_batch_set.rs |  18 +-
 core/common/src/types/segment.rs                   |  29 +-
 core/integration/tests/server/message_cleanup.rs   |   7 +
 .../concurrent_produce_consume_scenario.rs         | 196 +++++++++++
 .../server/scenarios/message_cleanup_scenario.rs   | 140 ++++++++
 core/integration/tests/server/scenarios/mod.rs     |   2 +
 .../scenarios/reconnect_after_restart_scenario.rs  | 300 ++++++++++++++++-
 .../scenarios/restart_offset_skip_scenario.rs      | 272 ++++++++++++++++
 core/integration/tests/server/specific.rs          |  39 ++-
 core/server/src/bootstrap.rs                       |   4 +-
 core/server/src/shard/mod.rs                       |  82 ++++-
 core/server/src/shard/system/messages.rs           |   2 +-
 core/server/src/shard/system/partitions.rs         |  74 ++++-
 core/server/src/shard/system/segments.rs           | 124 +++++--
 core/server/src/streaming/partitions/journal.rs    |  93 +++++-
 .../src/streaming/partitions/local_partition.rs    |   4 +-
 core/server/src/streaming/partitions/log.rs        |  12 +
 core/server/src/streaming/partitions/mod.rs        |   2 +
 core/server/src/streaming/partitions/ops.rs        | 297 +++++++++--------
 core/server/src/streaming/partitions/ops_tests.rs  | 359 +++++++++++++++++++++
 21 files changed, 1856 insertions(+), 202 deletions(-)

diff --git a/core/common/src/types/message/messages_batch_mut.rs 
b/core/common/src/types/message/messages_batch_mut.rs
index 231ba00a2..320f2f1ea 100644
--- a/core/common/src/types/message/messages_batch_mut.rs
+++ b/core/common/src/types/message/messages_batch_mut.rs
@@ -324,7 +324,7 @@ impl IggyMessagesBatchMut {
         let first_offset = self.first_offset()?;
 
         if start_offset < first_offset {
-            return self.slice_by_index(0, count);
+            return None;
         }
 
         let last_offset = self.last_offset()?;
diff --git a/core/common/src/types/message/messages_batch_set.rs 
b/core/common/src/types/message/messages_batch_set.rs
index 8dd47043e..41183ae83 100644
--- a/core/common/src/types/message/messages_batch_set.rs
+++ b/core/common/src/types/message/messages_batch_set.rs
@@ -237,22 +237,30 @@ impl IggyMessagesBatchSet {
 
         let mut result = Self::with_capacity(self.containers_count());
         let mut remaining_count = count;
+        let mut current_offset = start_offset;
 
         for container in self.iter() {
             if remaining_count == 0 {
                 break;
             }
 
-            let first_offset = container.first_offset();
-            if first_offset.is_none()
-                || first_offset.unwrap() + container.count() as u64 <= 
start_offset
-            {
+            let Some(batch_first) = container.first_offset() else {
+                continue;
+            };
+            if batch_first + container.count() as u64 <= current_offset {
                 continue;
             }
 
-            if let Some(sliced) = container.slice_by_offset(start_offset, 
remaining_count)
+            // When current_offset is below this batch's range (cross-batch
+            // reads), start from the batch's first offset instead.
+            let effective_start = current_offset.max(batch_first);
+
+            if let Some(sliced) = container.slice_by_offset(effective_start, 
remaining_count)
                 && sliced.count() > 0
             {
+                if let Some(last) = sliced.last_offset() {
+                    current_offset = last + 1;
+                }
                 remaining_count -= sliced.count();
                 result.add_batch(sliced);
             }
diff --git a/core/common/src/types/segment.rs b/core/common/src/types/segment.rs
index 13f650f5e..23abe4500 100644
--- a/core/common/src/types/segment.rs
+++ b/core/common/src/types/segment.rs
@@ -66,7 +66,7 @@ impl Segment {
     }
 
     pub fn is_expired(&self, now: IggyTimestamp, expiry: IggyExpiry) -> bool {
-        if !self.sealed {
+        if !self.sealed || self.end_timestamp == 0 {
             return false;
         }
 
@@ -79,3 +79,30 @@ impl Segment {
         }
     }
 }
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use crate::{IggyDuration, IggyTimestamp};
+    use std::time::Duration;
+
+    #[test]
+    fn zero_timestamp_segment_should_not_appear_expired() {
+        // Reproduce Bug 3 from #2924: during bootstrap, segments with empty
+        // indexes get end_timestamp = 0. is_expired() then evaluates
+        // 0 + expiry <= now, which is always true, causing immediate deletion.
+        let mut seg = Segment::new(5000, IggyByteSize::from(128 * 1024 * 
1024u64));
+        seg.sealed = true;
+        seg.end_timestamp = 0; // simulates bootstrap with empty indexes
+
+        let now = IggyTimestamp::now();
+        let expiry = 
IggyExpiry::ExpireDuration(IggyDuration::from(Duration::from_secs(600)));
+
+        // A segment with unknown timestamp (0) must NOT be considered expired.
+        // Currently this FAILS - is_expired returns true because 0 + 600s <= 
now.
+        assert!(
+            !seg.is_expired(now, expiry),
+            "BUG: segment with end_timestamp=0 appears instantly expired"
+        );
+    }
+}
diff --git a/core/integration/tests/server/message_cleanup.rs 
b/core/integration/tests/server/message_cleanup.rs
index c61855d83..3af365fd7 100644
--- a/core/integration/tests/server/message_cleanup.rs
+++ b/core/integration/tests/server/message_cleanup.rs
@@ -73,6 +73,12 @@ fn fair_size_cleanup_multipartition() -> CleanupScenarioFn {
     }
 }
 
+fn expiry_respects_consumer_offset() -> CleanupScenarioFn {
+    |client, path| {
+        
Box::pin(message_cleanup_scenario::run_expiry_respects_consumer_offset(client, 
path))
+    }
+}
+
 async fn run_cleanup_scenario(scenario: CleanupScenarioFn) {
     let mut harness = TestHarness::builder()
         .server(
@@ -116,6 +122,7 @@ async fn run_cleanup_scenario(scenario: CleanupScenarioFn) {
     combined_retention(),
     expiry_multipartition(),
     fair_size_cleanup_multipartition(),
+    expiry_respects_consumer_offset(),
 ])]
 #[tokio::test]
 #[parallel]
diff --git 
a/core/integration/tests/server/scenarios/concurrent_produce_consume_scenario.rs
 
b/core/integration/tests/server/scenarios/concurrent_produce_consume_scenario.rs
new file mode 100644
index 000000000..3bffb8749
--- /dev/null
+++ 
b/core/integration/tests/server/scenarios/concurrent_produce_consume_scenario.rs
@@ -0,0 +1,196 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Regression test for issue #2715: consumer offset skip during concurrent 
produce+consume.
+//!
+//! Root cause ("State C"):
+//!   1. Journal commit() moves data to in-flight buffer (async persist begins)
+//!   2. New send arrives before persist completes - journal is non-empty again
+//!   3. Consumer polls with Next + auto_commit:
+//!      Old code only checked in-flight when journal was empty (Case 0).
+//!      With journal non-empty (Case 1-3), in-flight was invisible.
+//!      auto_commit stored the journal offset, permanently skipping the 
in-flight range.
+//!
+//! Trigger: `messages_required_to_save = "1"` forces an inline journal commit 
on every
+//! batch. With concurrent sends, the next batch arrives while the previous is 
persisting,
+//! reliably producing State C.
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use integration::iggy_harness;
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::{Duration, Instant};
+
+const STREAM_NAME: &str = "issue-2715-stream";
+const TOPIC_NAME: &str = "issue-2715-topic";
+const TOTAL_MESSAGES: u64 = 500;
+const PRODUCER_BATCH_SIZE: u32 = 5;
+const CONSUMER_BATCH_SIZE: u32 = 10;
+const MAX_TEST_DURATION: Duration = Duration::from_secs(60);
+
+/// Regression test for issue #2715: consumer must not skip offsets during
+/// concurrent produce+consume when the journal commits while in-flight data 
exists.
+///
+/// `messages_required_to_save = "1"` forces every batch to trigger an inline
+/// journal commit. The next send arrives before async persist completes, 
creating
+/// State C (journal non-empty + in-flight non-empty simultaneously).
+#[iggy_harness(server(
+    partition.messages_required_to_save = "1",
+    partition.enforce_fsync = false,
+    message_saver.enabled = true,
+    message_saver.interval = "100ms"
+))]
+async fn concurrent_produce_consume_no_offset_skip(harness: &TestHarness) {
+    let stream_id = Identifier::named(STREAM_NAME).unwrap();
+    let topic_id = Identifier::named(TOPIC_NAME).unwrap();
+
+    let setup = harness.tcp_root_client().await.unwrap();
+    setup.create_stream(STREAM_NAME).await.unwrap();
+    setup
+        .create_topic(
+            &stream_id,
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    drop(setup);
+
+    let producer_client = harness.tcp_root_client().await.unwrap();
+    let consumer_client = harness.tcp_root_client().await.unwrap();
+
+    let producer_done = Arc::new(AtomicBool::new(false));
+    let producer_done_clone = producer_done.clone();
+
+    // Consumer: polls with Next + auto_commit concurrently with the producer.
+    // Checks that received offsets are strictly contiguous (no gaps).
+    let consumer_task = tokio::spawn(async move {
+        let stream = Identifier::named(STREAM_NAME).unwrap();
+        let topic = Identifier::named(TOPIC_NAME).unwrap();
+        let consumer = Consumer::default();
+        let mut last_offset: Option<u64> = None;
+        let mut total_received = 0u64;
+        let mut consecutive_empty = 0u32;
+        let deadline = Instant::now() + MAX_TEST_DURATION;
+
+        loop {
+            if Instant::now() >= deadline {
+                panic!(
+                    "Consumer timed out after {MAX_TEST_DURATION:?}. \
+                     Received {total_received}/{TOTAL_MESSAGES}, \
+                     last_offset: {last_offset:?}"
+                );
+            }
+
+            let result = consumer_client
+                .poll_messages(
+                    &stream,
+                    &topic,
+                    Some(0),
+                    &consumer,
+                    &PollingStrategy::next(),
+                    CONSUMER_BATCH_SIZE,
+                    true, // auto_commit - this is what makes the skip 
permanent
+                )
+                .await;
+
+            match result {
+                Ok(polled) if polled.messages.is_empty() => {
+                    if producer_done_clone.load(Ordering::Relaxed) {
+                        consecutive_empty += 1;
+                        if consecutive_empty >= 20 {
+                            break;
+                        }
+                        tokio::time::sleep(Duration::from_millis(50)).await;
+                    } else {
+                        tokio::time::sleep(Duration::from_millis(5)).await;
+                    }
+                }
+                Ok(polled) => {
+                    consecutive_empty = 0;
+                    for msg in &polled.messages {
+                        let offset = msg.header.offset;
+                        if let Some(last) = last_offset {
+                            assert_eq!(
+                                offset,
+                                last + 1,
+                                "Offset gap! Expected {}, got {} (skipped {} 
messages). \
+                                 Issue #2715: in-flight buffer invisible when 
journal non-empty.",
+                                last + 1,
+                                offset,
+                                offset.saturating_sub(last + 1),
+                            );
+                        }
+                        last_offset = Some(offset);
+                        total_received += 1;
+                    }
+                }
+                Err(e) => {
+                    eprintln!("Consumer poll error (transient under load): 
{e:?}");
+                    tokio::time::sleep(Duration::from_millis(10)).await;
+                }
+            }
+        }
+
+        total_received
+    });
+
+    // Producer: sends TOTAL_MESSAGES in a tight loop to maximize State C 
window.
+    let mut sent = 0u64;
+    while sent < TOTAL_MESSAGES {
+        let batch_count = PRODUCER_BATCH_SIZE.min((TOTAL_MESSAGES - sent) as 
u32);
+        let mut messages: Vec<IggyMessage> = (0..batch_count)
+            .map(|i| {
+                IggyMessage::builder()
+                    .payload(Bytes::from(format!("msg-{}", sent + i as u64)))
+                    .build()
+                    .unwrap()
+            })
+            .collect();
+
+        producer_client
+            .send_messages(
+                &stream_id,
+                &topic_id,
+                &Partitioning::partition_id(0),
+                &mut messages,
+            )
+            .await
+            .unwrap_or_else(|e| panic!("Producer send failed at sent={sent}: 
{e}"));
+
+        sent += batch_count as u64;
+    }
+
+    producer_done.store(true, Ordering::Relaxed);
+
+    let total_received = consumer_task.await.unwrap();
+    assert_eq!(
+        total_received,
+        TOTAL_MESSAGES,
+        "Consumer received {total_received}/{TOTAL_MESSAGES} - \
+         missing {} messages (issue #2715 offset skip regression).",
+        TOTAL_MESSAGES - total_received,
+    );
+
+    producer_client.delete_stream(&stream_id).await.unwrap();
+}
diff --git 
a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs 
b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
index 066953153..2292f353a 100644
--- a/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
+++ b/core/integration/tests/server/scenarios/message_cleanup_scenario.rs
@@ -563,6 +563,146 @@ pub async fn 
run_fair_size_based_cleanup_multipartition(client: &IggyClient, dat
         .unwrap();
 }
 
+/// Reproduces Bug 2 from #2924: the time-based cleaner deletes expired 
segments
+/// without checking whether consumers have read them. The size-based
+/// `delete_oldest_segments` checks `min_committed_offset` but the time-based
+/// `delete_expired_segments_for_partition` does not.
+///
+/// Scenario:
+/// 1. Send 300 messages (3 segments at 100KB each)
+/// 2. Consumer reads only 50 messages (stored offset ~49, within segment 0)
+/// 3. Wait for all segments to expire (2s expiry)
+/// 4. Verify consumer can still poll Next() and get contiguous offsets
+///
+/// On unfixed code: the cleaner deletes segments 0+1 (expired, consumer offset
+/// not checked), consumer's Next() jumps to segment 2, skipping ~250 messages.
+pub async fn run_expiry_respects_consumer_offset(client: &IggyClient, 
data_path: &Path) {
+    const TEST_STREAM: &str = "test_cleaner_barrier_stream";
+    const TEST_TOPIC: &str = "test_cleaner_barrier_topic";
+
+    let stream = client.create_stream(TEST_STREAM).await.unwrap();
+    let stream_id = stream.id;
+
+    let expiry = Duration::from_secs(2);
+    let topic = client
+        .create_topic(
+            &Identifier::named(TEST_STREAM).unwrap(),
+            TEST_TOPIC,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::ExpireDuration(IggyDuration::from(expiry)),
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    let partition_path = data_path
+        .join(format!(
+            "streams/{stream_id}/topics/{topic_id}/partitions/{PARTITION_ID}"
+        ))
+        .display()
+        .to_string();
+
+    // Send 300 messages (1KB each) -> 3 sealed segments + active
+    let payload = make_payload('B');
+    let total_messages = 300u32;
+    for i in 0..total_messages {
+        let message = IggyMessage::builder()
+            .id(i as u128)
+            .payload(payload.clone())
+            .build()
+            .unwrap();
+        client
+            .send_messages(
+                &Identifier::named(TEST_STREAM).unwrap(),
+                &Identifier::named(TEST_TOPIC).unwrap(),
+                &Partitioning::partition_id(PARTITION_ID),
+                &mut [message],
+            )
+            .await
+            .unwrap();
+    }
+
+    let initial_segments = get_segment_paths_for_partition(&partition_path);
+    assert!(
+        initial_segments.len() >= 3,
+        "Need at least 3 segments, got {}",
+        initial_segments.len()
+    );
+
+    // Consumer reads only 50 messages with auto_commit, storing offset ~49.
+    // This means the consumer has NOT read segments 1, 2, etc.
+    let consumer = Consumer::new(Identifier::numeric(42).unwrap());
+    let mut consumed_offsets = Vec::new();
+    let mut remaining = 50u32;
+    while remaining > 0 {
+        let batch_size = remaining.min(10);
+        let polled = client
+            .poll_messages(
+                &Identifier::named(TEST_STREAM).unwrap(),
+                &Identifier::named(TEST_TOPIC).unwrap(),
+                Some(PARTITION_ID),
+                &consumer,
+                &PollingStrategy::next(),
+                batch_size,
+                true, // auto_commit
+            )
+            .await
+            .unwrap();
+        for msg in &polled.messages {
+            consumed_offsets.push(msg.header.offset);
+        }
+        remaining -= polled.messages.len() as u32;
+    }
+    let last_committed = *consumed_offsets.last().unwrap();
+    assert_eq!(
+        last_committed, 49,
+        "Consumer should have read through offset 49"
+    );
+
+    // Wait for expiry + cleaner buffer
+    tokio::time::sleep(expiry + CLEANER_BUFFER + CLEANER_BUFFER).await;
+
+    // Now poll Next() - consumer should continue from offset 50 without gaps.
+    // BUG: on unfixed code, the cleaner deleted the segment containing offsets
+    // 50-99 (expired, no consumer barrier check), so Next() jumps to offset 
100+.
+    let polled = client
+        .poll_messages(
+            &Identifier::named(TEST_STREAM).unwrap(),
+            &Identifier::named(TEST_TOPIC).unwrap(),
+            Some(PARTITION_ID),
+            &consumer,
+            &PollingStrategy::next(),
+            10,
+            false,
+        )
+        .await
+        .unwrap();
+
+    assert!(
+        !polled.messages.is_empty(),
+        "Consumer should still be able to poll messages after expiry"
+    );
+
+    let first_offset = polled.messages[0].header.offset;
+    assert_eq!(
+        first_offset,
+        last_committed + 1,
+        "BUG #2924: cleaner deleted unconsumed segments! \
+         Expected next offset {}, got {} (skipped {} messages)",
+        last_committed + 1,
+        first_offset,
+        first_offset - last_committed - 1,
+    );
+
+    client
+        .delete_stream(&Identifier::named(TEST_STREAM).unwrap())
+        .await
+        .unwrap();
+}
+
 fn get_segment_paths_for_partition(partition_path: &str) -> Vec<DirEntry> {
     read_dir(partition_path)
         .map(|read_dir| {
diff --git a/core/integration/tests/server/scenarios/mod.rs 
b/core/integration/tests/server/scenarios/mod.rs
index fb510234f..234572887 100644
--- a/core/integration/tests/server/scenarios/mod.rs
+++ b/core/integration/tests/server/scenarios/mod.rs
@@ -18,6 +18,7 @@
 
 pub mod authentication_scenario;
 pub mod bench_scenario;
+pub mod concurrent_produce_consume_scenario;
 pub mod concurrent_scenario;
 pub mod consumer_group_auto_commit_reconnection_scenario;
 pub mod consumer_group_join_scenario;
@@ -39,6 +40,7 @@ pub mod permissions_scenario;
 pub mod purge_delete_scenario;
 pub mod read_during_persistence_scenario;
 pub mod reconnect_after_restart_scenario;
+pub mod restart_offset_skip_scenario;
 pub mod segment_rotation_race_scenario;
 pub mod single_message_per_batch_scenario;
 pub mod snapshot_scenario;
diff --git 
a/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs 
b/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
index 2d2a967f7..b770c23f6 100644
--- 
a/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
+++ 
b/core/integration/tests/server/scenarios/reconnect_after_restart_scenario.rs
@@ -143,11 +143,19 @@ pub async fn run_consumer(harness: &mut TestHarness) {
         .await
         .expect("Failed to initialize consumer");
 
+    let pre_payloads = consume_messages_validated(&mut consumer, 3, 
Duration::from_secs(10)).await;
     assert_eq!(
-        consume_messages(&mut consumer, 3, Duration::from_secs(10)).await,
+        pre_payloads.len(),
         3,
         "Should consume all pre-restart messages"
     );
+    for (i, payload) in pre_payloads.iter().enumerate() {
+        assert_eq!(
+            payload,
+            &format!("pre-restart-{i}"),
+            "Pre-restart message {i} has wrong payload"
+        );
+    }
 
     harness.server_mut().stop().expect("Failed to stop server");
     sleep(Duration::from_secs(2)).await;
@@ -162,11 +170,19 @@ pub async fn run_consumer(harness: &mut TestHarness) {
         .expect("Failed to create post-restart client");
     send_messages(&post_client, "post-restart", 3).await;
 
+    let post_payloads = consume_messages_validated(&mut consumer, 3, 
Duration::from_secs(30)).await;
     assert_eq!(
-        consume_messages(&mut consumer, 3, Duration::from_secs(30)).await,
+        post_payloads.len(),
         3,
         "Should consume all post-restart messages after reconnect"
     );
+    for (i, payload) in post_payloads.iter().enumerate() {
+        assert_eq!(
+            payload,
+            &format!("post-restart-{i}"),
+            "Post-restart message {i} has wrong payload"
+        );
+    }
 }
 
 fn create_client(harness: &TestHarness) -> IggyClient {
@@ -209,18 +225,286 @@ async fn send_messages(client: &IggyClient, prefix: 
&str, count: u32) {
     }
 }
 
-async fn consume_messages(consumer: &mut IggyConsumer, expected: u32, 
max_wait: Duration) -> u32 {
-    let mut consumed = 0u32;
+/// Consumes up to `expected` messages, returning their payloads in order.
+async fn consume_messages_validated(
+    consumer: &mut IggyConsumer,
+    expected: u32,
+    max_wait: Duration,
+) -> Vec<String> {
+    let mut payloads = Vec::with_capacity(expected as usize);
     let deadline = tokio::time::Instant::now() + max_wait;
-    while consumed < expected && tokio::time::Instant::now() < deadline {
+    while (payloads.len() as u32) < expected && tokio::time::Instant::now() < 
deadline {
         let poll_result: Option<Result<ReceivedMessage, IggyError>> =
             tokio::time::timeout(Duration::from_millis(500), consumer.next())
                 .await
                 .ok()
                 .flatten();
-        if let Some(Ok(_)) = poll_result {
-            consumed += 1;
+        if let Some(Ok(msg)) = poll_result {
+            let payload = 
String::from_utf8_lossy(&msg.message.payload).to_string();
+            payloads.push(payload);
         }
     }
-    consumed
+    payloads
+}
+
+/// Regression test: a partition with exactly one message at offset 0 must not
+/// reassign offset 0 to the next message after server restart.
+///
+/// Root cause: `should_increment_offset = current_offset > 0` is false when
+/// current_offset == 0, but a segment with 1 message at offset 0 has
+/// end_offset = 0. The next append after restart reuses offset 0.
+pub async fn run_single_message_offset_zero_restart(harness: &mut TestHarness) 
{
+    const STREAM: &str = "offset-zero-restart-stream";
+    const TOPIC: &str = "offset-zero-restart-topic";
+    const CONSUMER_ID: u32 = 77;
+
+    let client = harness
+        .root_client()
+        .await
+        .expect("Failed to create client");
+
+    client.create_stream(STREAM).await.unwrap();
+    client
+        .create_topic(
+            &Identifier::named(STREAM).unwrap(),
+            TOPIC,
+            1,
+            Default::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    // Send exactly 1 message (gets offset 0)
+    let mut msg = [IggyMessage::from_str("single-msg-0").unwrap()];
+    client
+        .send_messages(
+            &Identifier::named(STREAM).unwrap(),
+            &Identifier::named(TOPIC).unwrap(),
+            &Partitioning::partition_id(0),
+            &mut msg,
+        )
+        .await
+        .unwrap();
+
+    // Consumer polls to commit offset 0
+    let consumer = Consumer::new(Identifier::numeric(CONSUMER_ID).unwrap());
+    let polled = client
+        .poll_messages(
+            &Identifier::named(STREAM).unwrap(),
+            &Identifier::named(TOPIC).unwrap(),
+            Some(0),
+            &consumer,
+            &PollingStrategy::next(),
+            10,
+            true,
+        )
+        .await
+        .unwrap();
+    assert_eq!(polled.messages.len(), 1, "Should get the single message");
+    assert_eq!(polled.messages[0].header.offset, 0);
+
+    // Wait for data to flush
+    sleep(Duration::from_secs(1)).await;
+    drop(client);
+
+    // Restart
+    harness.server_mut().stop().expect("Failed to stop server");
+    sleep(Duration::from_secs(2)).await;
+    harness
+        .server_mut()
+        .start()
+        .expect("Failed to start server");
+
+    let client = harness
+        .root_client()
+        .await
+        .expect("Failed to create post-restart client");
+
+    // Send a second message - it MUST get offset 1, not 0
+    let mut msg = [IggyMessage::from_str("single-msg-1").unwrap()];
+    client
+        .send_messages(
+            &Identifier::named(STREAM).unwrap(),
+            &Identifier::named(TOPIC).unwrap(),
+            &Partitioning::partition_id(0),
+            &mut msg,
+        )
+        .await
+        .unwrap();
+
+    // Consumer polls with Next - should get offset 1
+    let consumer = Consumer::new(Identifier::numeric(CONSUMER_ID).unwrap());
+    let polled = client
+        .poll_messages(
+            &Identifier::named(STREAM).unwrap(),
+            &Identifier::named(TOPIC).unwrap(),
+            Some(0),
+            &consumer,
+            &PollingStrategy::next(),
+            10,
+            true,
+        )
+        .await
+        .unwrap();
+
+    assert!(
+        !polled.messages.is_empty(),
+        "BUG: consumer got empty result after restart - offset stuck"
+    );
+    assert_eq!(
+        polled.messages[0].header.offset, 1,
+        "BUG: second message got offset {} instead of 1. \
+         should_increment_offset was incorrectly false after restart \
+         with single message at offset 0",
+        polled.messages[0].header.offset,
+    );
+
+    let payload = String::from_utf8_lossy(&polled.messages[0].payload);
+    assert_eq!(payload, "single-msg-1");
+}
+
+/// Regression test: consumer offset persisted ahead of partition data (crash
+/// simulation). After restart the consumer must not be permanently stuck.
+///
+/// Simulates: OOM/kill-9 where consumer auto_commit wrote offset to disk but
+/// journal data was not flushed. On restart, consumer offset file says 999 but
+/// partition only has 10 messages (offsets 0-9).
+pub async fn run_consumer_offset_ahead_after_crash(harness: &mut TestHarness) {
+    const STREAM: &str = "offset-ahead-crash-stream";
+    const TOPIC: &str = "offset-ahead-crash-topic";
+    const CONSUMER_ID: u32 = 88;
+
+    let client = harness
+        .root_client()
+        .await
+        .expect("Failed to create client");
+
+    let stream = client.create_stream(STREAM).await.unwrap();
+    let stream_id = stream.id;
+
+    let topic = client
+        .create_topic(
+            &Identifier::named(STREAM).unwrap(),
+            TOPIC,
+            1,
+            Default::default(),
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+    let topic_id = topic.id;
+
+    // Send 10 messages (offsets 0-9)
+    for i in 0..10u32 {
+        let mut msg = 
[IggyMessage::from_str(&format!("crash-msg-{i}")).unwrap()];
+        client
+            .send_messages(
+                &Identifier::named(STREAM).unwrap(),
+                &Identifier::named(TOPIC).unwrap(),
+                &Partitioning::partition_id(0),
+                &mut msg,
+            )
+            .await
+            .unwrap();
+    }
+
+    // Consumer reads all 10 with auto_commit (stored offset = 9)
+    let consumer = Consumer::new(Identifier::numeric(CONSUMER_ID).unwrap());
+    let polled = client
+        .poll_messages(
+            &Identifier::named(STREAM).unwrap(),
+            &Identifier::named(TOPIC).unwrap(),
+            Some(0),
+            &consumer,
+            &PollingStrategy::next(),
+            100,
+            true,
+        )
+        .await
+        .unwrap();
+    assert_eq!(polled.messages.len(), 10);
+
+    // Wait for flush
+    sleep(Duration::from_secs(1)).await;
+    drop(client);
+
+    // Stop server
+    harness.server_mut().stop().expect("Failed to stop server");
+    sleep(Duration::from_secs(1)).await;
+
+    // Tamper with consumer offset file to simulate crash scenario:
+    // consumer offset persisted ahead of actual data
+    let data_path = harness.server().data_path();
+    let offset_file = data_path.join(format!(
+        
"streams/{stream_id}/topics/{topic_id}/partitions/0/offsets/consumers/{CONSUMER_ID}"
+    ));
+    assert!(
+        offset_file.exists(),
+        "Consumer offset file should exist at {}",
+        offset_file.display()
+    );
+    std::fs::write(&offset_file, 999_u64.to_le_bytes()).expect("Failed to 
write offset file");
+
+    // Verify the tampered value
+    let bytes = std::fs::read(&offset_file).unwrap();
+    let stored = u64::from_le_bytes(bytes.try_into().unwrap());
+    assert_eq!(stored, 999, "Offset file should contain 999");
+
+    // Restart server
+    harness
+        .server_mut()
+        .start()
+        .expect("Failed to start server");
+
+    let client = harness
+        .root_client()
+        .await
+        .expect("Failed to create post-restart client");
+
+    // Send 5 more messages (should get offsets 10-14)
+    for i in 0..5u32 {
+        let mut msg = 
[IggyMessage::from_str(&format!("post-crash-{i}")).unwrap()];
+        client
+            .send_messages(
+                &Identifier::named(STREAM).unwrap(),
+                &Identifier::named(TOPIC).unwrap(),
+                &Partitioning::partition_id(0),
+                &mut msg,
+            )
+            .await
+            .unwrap();
+    }
+
+    // Consumer polls with Next - must NOT be stuck at offset 1000 (999+1)
+    let consumer = Consumer::new(Identifier::numeric(CONSUMER_ID).unwrap());
+    let polled = client
+        .poll_messages(
+            &Identifier::named(STREAM).unwrap(),
+            &Identifier::named(TOPIC).unwrap(),
+            Some(0),
+            &consumer,
+            &PollingStrategy::next(),
+            100,
+            true,
+        )
+        .await
+        .unwrap();
+
+    assert!(
+        !polled.messages.is_empty(),
+        "BUG: consumer is permanently stuck - offset 999 is ahead of partition 
data. \
+         After crash recovery, consumer offsets must be clamped to partition 
offset."
+    );
+
+    // Verify we got the new messages
+    let first_offset = polled.messages[0].header.offset;
+    assert!(
+        first_offset <= 14,
+        "BUG: consumer skipped to offset {first_offset}, expected messages in 
range 10-14"
+    );
 }
diff --git 
a/core/integration/tests/server/scenarios/restart_offset_skip_scenario.rs 
b/core/integration/tests/server/scenarios/restart_offset_skip_scenario.rs
new file mode 100644
index 000000000..b451a68bc
--- /dev/null
+++ b/core/integration/tests/server/scenarios/restart_offset_skip_scenario.rs
@@ -0,0 +1,272 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+//! Regression test: consumer offset skip after server restart during 
concurrent
+//! produce+consume.
+//!
+//! Root cause: after restart, `MemoryMessageJournal` defaults to 
`base_offset=0`
+//! instead of `partition.current_offset + 1`. This causes the three-tier 
lookup
+//! (`disk -> in-flight -> journal`) to skip disk reads, and `slice_by_offset`
+//! silently returns messages from wrong offsets. Combined with `auto_commit`,
+//! the skip is permanent.
+//!
+//! Reproduction (matches colleague's scenario):
+//!   1. Start server, create stream/topic
+//!   2. Send 100 messages
+//!   3. Restart server
+//!   4. Send more messages concurrently (every 10ms)
+//!   5. Consumer group polls with batch_size=10, Next strategy, auto_commit
+//!   6. Assert: no offset gaps (previously skipped 3-6 messages)
+
+use bytes::Bytes;
+use iggy::prelude::*;
+use integration::harness::{TestBinary, TestHarness};
+use std::sync::Arc;
+use std::sync::atomic::{AtomicBool, Ordering};
+use std::time::{Duration, Instant};
+
+const STREAM_NAME: &str = "restart-skip-stream";
+const TOPIC_NAME: &str = "restart-skip-topic";
+const CONSUMER_GROUP_NAME: &str = "restart-skip-cg";
+// Enough pre-restart messages so the consumer is still reading disk data when
+// journal entries start appearing. With batch_size=10, the consumer needs many
+// poll cycles to drain these, creating the window for the bug.
+const PRE_RESTART_MESSAGES: u32 = 1000;
+const POST_RESTART_MESSAGES: u64 = 200;
+const PRODUCER_BATCH_SIZE: u32 = 5;
+const CONSUMER_BATCH_SIZE: u32 = 10;
+const MAX_TEST_DURATION: Duration = Duration::from_secs(120);
+
+pub async fn run(harness: &mut TestHarness) {
+    let stream_id = Identifier::named(STREAM_NAME).unwrap();
+    let topic_id = Identifier::named(TOPIC_NAME).unwrap();
+
+    // Step 1-2: Create stream/topic, send pre-restart messages
+    let setup_client = harness.tcp_root_client().await.unwrap();
+    setup_client.create_stream(STREAM_NAME).await.unwrap();
+    setup_client
+        .create_topic(
+            &stream_id,
+            TOPIC_NAME,
+            1,
+            CompressionAlgorithm::None,
+            None,
+            IggyExpiry::NeverExpire,
+            MaxTopicSize::ServerDefault,
+        )
+        .await
+        .unwrap();
+
+    setup_client
+        .create_consumer_group(&stream_id, &topic_id, CONSUMER_GROUP_NAME)
+        .await
+        .unwrap();
+
+    let mut pre_messages: Vec<IggyMessage> = (0..PRE_RESTART_MESSAGES)
+        .map(|i| {
+            IggyMessage::builder()
+                .payload(Bytes::from(format!("pre-{i}")))
+                .build()
+                .unwrap()
+        })
+        .collect();
+
+    setup_client
+        .send_messages(
+            &stream_id,
+            &topic_id,
+            &Partitioning::partition_id(0),
+            &mut pre_messages,
+        )
+        .await
+        .unwrap();
+
+    // Explicitly flush to disk, then wait for message_saver to persist
+    setup_client
+        .flush_unsaved_buffer(
+            &stream_id, &topic_id, 0, true, // fsync
+        )
+        .await
+        .unwrap();
+    tokio::time::sleep(Duration::from_secs(2)).await;
+    drop(setup_client);
+
+    // Step 3: Restart server
+    harness.server_mut().stop().expect("Failed to stop server");
+    tokio::time::sleep(Duration::from_secs(1)).await;
+    harness
+        .server_mut()
+        .start()
+        .expect("Failed to start server");
+    tokio::time::sleep(Duration::from_secs(1)).await;
+
+    // Step 4-5: Concurrent produce + consume after restart.
+    // Key: producer sends a burst FIRST so the journal has data (with wrong
+    // base_offset=0) BEFORE the consumer starts reading disk offsets. This is
+    // the window that triggers the bug.
+    let producer_client = harness.tcp_root_client().await.unwrap();
+
+    // Send an initial burst so the journal is populated before consumer starts
+    for i in 0..20u32 {
+        let mut messages = vec![
+            IggyMessage::builder()
+                .payload(Bytes::from(format!("burst-{i}")))
+                .build()
+                .unwrap(),
+        ];
+        producer_client
+            .send_messages(
+                &stream_id,
+                &topic_id,
+                &Partitioning::partition_id(0),
+                &mut messages,
+            )
+            .await
+            .unwrap();
+    }
+
+    let consumer_client = harness.tcp_root_client().await.unwrap();
+
+    // Consumer joins the consumer group
+    consumer_client
+        .join_consumer_group(
+            &stream_id,
+            &topic_id,
+            &Identifier::named(CONSUMER_GROUP_NAME).unwrap(),
+        )
+        .await
+        .unwrap();
+
+    let producer_done = Arc::new(AtomicBool::new(false));
+    let producer_done_clone = producer_done.clone();
+
+    // Consumer task: polls with Next + auto_commit, checks contiguous offsets
+    let consumer_task = tokio::spawn(async move {
+        let stream = Identifier::named(STREAM_NAME).unwrap();
+        let topic = Identifier::named(TOPIC_NAME).unwrap();
+        let consumer = 
Consumer::group(Identifier::named(CONSUMER_GROUP_NAME).unwrap());
+        let mut last_offset: Option<u64> = None;
+        let mut total_received = 0u64;
+        let mut consecutive_empty = 0u32;
+        let deadline = Instant::now() + MAX_TEST_DURATION;
+
+        loop {
+            if Instant::now() >= deadline {
+                panic!(
+                    "Consumer timed out after {MAX_TEST_DURATION:?}. \
+                     Received {total_received}, last_offset: {last_offset:?}"
+                );
+            }
+
+            let result = consumer_client
+                .poll_messages(
+                    &stream,
+                    &topic,
+                    Some(0),
+                    &consumer,
+                    &PollingStrategy::next(),
+                    CONSUMER_BATCH_SIZE,
+                    true, // auto_commit
+                )
+                .await;
+
+            match result {
+                Ok(polled) if polled.messages.is_empty() => {
+                    if producer_done_clone.load(Ordering::Relaxed) {
+                        consecutive_empty += 1;
+                        if consecutive_empty >= 20 {
+                            break;
+                        }
+                        tokio::time::sleep(Duration::from_millis(50)).await;
+                    } else {
+                        tokio::time::sleep(Duration::from_millis(5)).await;
+                    }
+                }
+                Ok(polled) => {
+                    consecutive_empty = 0;
+                    for msg in &polled.messages {
+                        let offset = msg.header.offset;
+                        if let Some(last) = last_offset {
+                            assert_eq!(
+                                offset,
+                                last + 1,
+                                "OFFSET SKIP after restart! Expected {}, got 
{} \
+                                 (skipped {} messages). Journal base_offset 
was \
+                                 not initialized after bootstrap.",
+                                last + 1,
+                                offset,
+                                offset.saturating_sub(last + 1),
+                            );
+                        }
+                        last_offset = Some(offset);
+                        total_received += 1;
+                    }
+                }
+                Err(e) => {
+                    eprintln!("Consumer poll error: {e:?}");
+                    tokio::time::sleep(Duration::from_millis(10)).await;
+                }
+            }
+        }
+
+        (total_received, last_offset)
+    });
+
+    // Producer: sends messages every 10ms after restart (matches colleague's 
scenario)
+    let mut sent = 0u64;
+    while sent < POST_RESTART_MESSAGES {
+        let batch_count = PRODUCER_BATCH_SIZE.min((POST_RESTART_MESSAGES - 
sent) as u32);
+        let mut messages: Vec<IggyMessage> = (0..batch_count)
+            .map(|i| {
+                IggyMessage::builder()
+                    .payload(Bytes::from(format!("post-{}", sent + i as u64)))
+                    .build()
+                    .unwrap()
+            })
+            .collect();
+
+        producer_client
+            .send_messages(
+                &stream_id,
+                &topic_id,
+                &Partitioning::partition_id(0),
+                &mut messages,
+            )
+            .await
+            .unwrap_or_else(|e| panic!("Producer send failed at sent={sent}: 
{e}"));
+
+        sent += batch_count as u64;
+        tokio::time::sleep(Duration::from_millis(10)).await;
+    }
+
+    producer_done.store(true, Ordering::Relaxed);
+
+    let (total_received, last_offset) = consumer_task.await.unwrap();
+
+    let initial_burst = 20u64;
+    let expected_total = PRE_RESTART_MESSAGES as u64 + initial_burst + 
POST_RESTART_MESSAGES;
+    assert_eq!(
+        total_received,
+        expected_total,
+        "Consumer received {total_received}/{expected_total} messages. \
+         Last offset: {last_offset:?}. Missing {} messages.",
+        expected_total - total_received,
+    );
+
+    producer_client.delete_stream(&stream_id).await.unwrap();
+}
diff --git a/core/integration/tests/server/specific.rs 
b/core/integration/tests/server/specific.rs
index 2a105939d..49c224150 100644
--- a/core/integration/tests/server/specific.rs
+++ b/core/integration/tests/server/specific.rs
@@ -18,8 +18,9 @@
  */
 
 use crate::server::scenarios::{
-    message_size_scenario, reconnect_after_restart_scenario, 
segment_rotation_race_scenario,
-    single_message_per_batch_scenario, tcp_tls_scenario, 
websocket_tls_scenario,
+    message_size_scenario, reconnect_after_restart_scenario, 
restart_offset_skip_scenario,
+    segment_rotation_race_scenario, single_message_per_batch_scenario, 
tcp_tls_scenario,
+    websocket_tls_scenario,
 };
 use integration::iggy_harness;
 
@@ -86,6 +87,40 @@ async fn consumer_reconnect_after_server_restart(harness: 
&mut TestHarness) {
     reconnect_after_restart_scenario::run_consumer(harness).await;
 }
 
+#[iggy_harness(server(
+    partition.messages_required_to_save = "1",
+    partition.enforce_fsync = true
+))]
+async fn single_message_restart_offset_zero(harness: &mut TestHarness) {
+    
reconnect_after_restart_scenario::run_single_message_offset_zero_restart(harness).await;
+}
+
+#[iggy_harness(server(
+    partition.messages_required_to_save = "1",
+    partition.enforce_fsync = true
+))]
+async fn consumer_offset_ahead_after_crash(harness: &mut TestHarness) {
+    
reconnect_after_restart_scenario::run_consumer_offset_ahead_after_crash(harness).await;
+}
+
+/// Regression test: consumer offset skip after server restart during 
concurrent
+/// produce+consume. Reproduces the exact scenario from issue #2924/#2715:
+/// send messages, restart server, produce+consume concurrently, verify no 
offset
+/// gaps.
+///
+/// Config: high messages_required_to_save so post-restart messages accumulate 
in
+/// the journal (exposing the base_offset=0 bug). message_saver flushes 
pre-restart
+/// data before the restart.
+#[iggy_harness(server(
+    partition.messages_required_to_save = "10000",
+    partition.enforce_fsync = false,
+    message_saver.enabled = true,
+    message_saver.interval = "1s"
+))]
+async fn restart_offset_skip(harness: &mut TestHarness) {
+    restart_offset_skip_scenario::run(harness).await;
+}
+
 /// This test configures the server to trigger frequent segment rotations and 
runs
 /// multiple concurrent producers across all protocols (TCP, HTTP, QUIC, 
WebSocket)
 /// to maximize the chance of hitting the race condition between 
persist_messages_to_disk
diff --git a/core/server/src/bootstrap.rs b/core/server/src/bootstrap.rs
index 02bd742f2..a874fd068 100644
--- a/core/server/src/bootstrap.rs
+++ b/core/server/src/bootstrap.rs
@@ -237,7 +237,7 @@ pub async fn load_segments(
 ) -> Result<SegmentedLog<MemoryMessageJournal>, IggyError> {
     let mut log_files = collect_log_files(&partition_path).await?;
     log_files.sort_by(|a, b| a.path.file_name().cmp(&b.path.file_name()));
-    let mut log = SegmentedLog::<MemoryMessageJournal>::default();
+    let mut log = SegmentedLog::new(MemoryMessageJournal::empty());
     for entry in log_files {
         let log_file_name = entry
             .path
@@ -324,7 +324,7 @@ pub async fn load_segments(
         };
 
         let end_offset = if loaded_indexes.count() == 0 {
-            0
+            start_offset
         } else {
             let last_index_offset = loaded_indexes.last().unwrap().offset() as 
u64;
             start_offset + last_index_offset
diff --git a/core/server/src/shard/mod.rs b/core/server/src/shard/mod.rs
index e76a8cb5c..958a6fd8c 100644
--- a/core/server/src/shard/mod.rs
+++ b/core/server/src/shard/mod.rs
@@ -37,7 +37,7 @@ use builder::IggyShardBuilder;
 use dashmap::DashMap;
 use iggy_common::SemanticVersion;
 use iggy_common::sharding::{IggyNamespace, PartitionLocation};
-use iggy_common::{EncryptorKind, IggyError};
+use iggy_common::{EncryptorKind, IggyByteSize, IggyError};
 use std::{
     cell::{Cell, RefCell},
     net::SocketAddr,
@@ -48,7 +48,7 @@ use std::{
     },
     time::{Duration, Instant},
 };
-use tracing::{debug, error, info, instrument};
+use tracing::{debug, error, info, instrument, warn};
 use transmission::connector::{Receiver, ShardConnector, StopReceiver};
 
 pub mod builder;
@@ -302,13 +302,81 @@ impl IggyShard {
                             stats.increment_segments_count(1);
                         }
 
-                        let current_offset = 
loaded_log.active_segment().end_offset;
+                        // Use the max end_offset across segments that have 
data,
+                        // not just the active segment. Handles the edge case 
where
+                        // the active segment is empty (rotated right before 
shutdown).
+                        let current_offset = loaded_log
+                            .segments()
+                            .iter()
+                            .filter(|s| s.size > IggyByteSize::default())
+                            .map(|s| s.end_offset)
+                            .max()
+                            .unwrap_or(0);
                         stats.set_current_offset(current_offset);
 
-                        // Only increment offset if we have messages 
(current_offset > 0).
-                        // When current_offset is 0 and we have no messages, 
first message
-                        // should get offset 0.
-                        let should_increment_offset = current_offset > 0;
+                        // Check if ANY segment has data. Cannot use 
current_offset > 0
+                        // because a single message at offset 0 yields 
current_offset = 0
+                        // yet must still increment on the next append.
+                        let should_increment_offset = loaded_log
+                            .segments()
+                            .iter()
+                            .any(|s| s.size > IggyByteSize::default());
+
+                        // After a crash (OOM, SIGKILL), auto_commit may have 
persisted
+                        // a consumer offset beyond what was flushed to disk. 
Clamp to
+                        // the partition's actual offset to prevent permanent 
empty polls.
+                        {
+                            let guard = consumer_offsets.pin();
+                            for entry in guard.iter() {
+                                let stored = 
entry.1.offset.load(Ordering::Relaxed);
+                                if stored > current_offset {
+                                    warn!(
+                                        "Consumer {} offset {} ahead of 
partition offset {} \
+                                         for stream {}, topic {}, partition {} 
- clamping \
+                                         (crash recovery)",
+                                        entry.0,
+                                        stored,
+                                        current_offset,
+                                        stream_id,
+                                        topic_id,
+                                        partition_id
+                                    );
+                                    entry.1.offset.store(current_offset, 
Ordering::Relaxed);
+                                }
+                            }
+                        }
+                        {
+                            let guard = consumer_group_offsets.pin();
+                            for entry in guard.iter() {
+                                let stored = 
entry.1.offset.load(Ordering::Relaxed);
+                                if stored > current_offset {
+                                    warn!(
+                                        "Consumer group {:?} offset {} ahead 
of partition \
+                                         offset {} for stream {}, topic {}, 
partition {} - \
+                                         clamping (crash recovery)",
+                                        entry.0,
+                                        stored,
+                                        current_offset,
+                                        stream_id,
+                                        topic_id,
+                                        partition_id
+                                    );
+                                    entry.1.offset.store(current_offset, 
Ordering::Relaxed);
+                                }
+                            }
+                        }
+
+                        // Initialize journal base_offset so the three-tier
+                        // routing in ops.rs computes correct in_memory_floor.
+                        // Without this, journal defaults to base_offset=0 
which
+                        // causes disk reads to be skipped after restart.
+                        if should_increment_offset {
+                            use crate::streaming::partitions::journal::{Inner, 
Journal};
+                            loaded_log.journal_mut().init(Inner {
+                                base_offset: current_offset + 1,
+                                ..Default::default()
+                            });
+                        }
 
                         let revision_id = init_info.revision_id;
 
diff --git a/core/server/src/shard/system/messages.rs 
b/core/server/src/shard/system/messages.rs
index d8ce2d891..a59f68d6d 100644
--- a/core/server/src/shard/system/messages.rs
+++ b/core/server/src/shard/system/messages.rs
@@ -364,7 +364,7 @@ impl IggyShard {
 
             let segment = &mut partition.log.segments_mut()[segment_index];
 
-            if segment.end_offset == 0 {
+            if segment.start_timestamp == 0 {
                 segment.start_timestamp = batch.first_timestamp().unwrap();
             }
 
diff --git a/core/server/src/shard/system/partitions.rs 
b/core/server/src/shard/system/partitions.rs
index e281544c1..f0831828a 100644
--- a/core/server/src/shard/system/partitions.rs
+++ b/core/server/src/shard/system/partitions.rs
@@ -260,7 +260,29 @@ impl IggyShard {
             stats.increment_segments_count(1);
         }
 
-        let current_offset = loaded_log.active_segment().end_offset;
+        // Use the max end_offset across segments that have data, not just
+        // the active segment. Mirrors the fix in shard/mod.rs bootstrap.
+        let current_offset = loaded_log
+            .segments()
+            .iter()
+            .filter(|s| s.size > iggy_common::IggyByteSize::default())
+            .map(|s| s.end_offset)
+            .max()
+            .unwrap_or(0);
+
+        let should_increment_offset = loaded_log
+            .segments()
+            .iter()
+            .any(|s| s.size > iggy_common::IggyByteSize::default());
+
+        // Initialize journal base_offset so three-tier routing works 
correctly.
+        if should_increment_offset {
+            use crate::streaming::partitions::journal::{Inner, Journal};
+            loaded_log.journal_mut().init(Inner {
+                base_offset: current_offset + 1,
+                ..Default::default()
+            });
+        }
 
         let (revision_id, consumer_offsets, consumer_group_offsets) = self
             .metadata
@@ -280,6 +302,54 @@ impl IggyShard {
                 )
             });
 
+        // Clamp consumer offsets that are ahead of partition offset (crash 
recovery).
+        {
+            let guard = consumer_offsets.pin();
+            for entry in guard.iter() {
+                let stored = 
entry.1.offset.load(std::sync::atomic::Ordering::Relaxed);
+                if stored > current_offset {
+                    tracing::warn!(
+                        "Consumer {} offset {} ahead of partition offset {} \
+                         for stream {}, topic {}, partition {} - clamping \
+                         (lazy init recovery)",
+                        entry.0,
+                        stored,
+                        current_offset,
+                        stream_id,
+                        topic_id,
+                        partition_id
+                    );
+                    entry
+                        .1
+                        .offset
+                        .store(current_offset, 
std::sync::atomic::Ordering::Relaxed);
+                }
+            }
+        }
+        {
+            let guard = consumer_group_offsets.pin();
+            for entry in guard.iter() {
+                let stored = 
entry.1.offset.load(std::sync::atomic::Ordering::Relaxed);
+                if stored > current_offset {
+                    tracing::warn!(
+                        "Consumer group {:?} offset {} ahead of partition \
+                         offset {} for stream {}, topic {}, partition {} - \
+                         clamping (lazy init recovery)",
+                        entry.0,
+                        stored,
+                        current_offset,
+                        stream_id,
+                        topic_id,
+                        partition_id
+                    );
+                    entry
+                        .1
+                        .offset
+                        .store(current_offset, 
std::sync::atomic::Ordering::Relaxed);
+                }
+            }
+        }
+
         let partition = LocalPartition::with_log(
             loaded_log,
             stats,
@@ -289,7 +359,7 @@ impl IggyShard {
             None,
             created_at,
             revision_id,
-            current_offset > 0,
+            should_increment_offset,
         );
 
         self.local_partitions.borrow_mut().insert(*ns, partition);
diff --git a/core/server/src/shard/system/segments.rs 
b/core/server/src/shard/system/segments.rs
index 43b634c73..b119bb5e3 100644
--- a/core/server/src/shard/system/segments.rs
+++ b/core/server/src/shard/system/segments.rs
@@ -19,7 +19,7 @@ use crate::configs::cache_indexes::CacheIndexesConfig;
 use crate::shard::IggyShard;
 use crate::streaming::segments::Segment;
 use iggy_common::sharding::IggyNamespace;
-use iggy_common::{IggyError, IggyExpiry, IggyTimestamp, MaxTopicSize};
+use iggy_common::{ConsumerKind, IggyError, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
 
 impl IggyShard {
     /// Performs all cleanup for a topic's partitions: time-based expiry then 
size-based trimming.
@@ -122,15 +122,36 @@ impl IggyShard {
             let Some(partition) = partitions.get(&ns) else {
                 return Ok((0, 0));
             };
+
+            let min_committed = Self::min_committed_offset(
+                &partition.consumer_offsets,
+                &partition.consumer_group_offsets,
+            );
+
             let segments = partition.log.segments();
             let last_idx = segments.len().saturating_sub(1);
+            let mut offsets = Vec::new();
 
-            segments
-                .iter()
-                .enumerate()
-                .filter(|(idx, seg)| *idx != last_idx && seg.is_expired(now, 
expiry))
-                .map(|(_, seg)| seg.start_offset)
-                .collect()
+            for (idx, seg) in segments.iter().enumerate() {
+                if idx == last_idx || !seg.is_expired(now, expiry) {
+                    continue;
+                }
+                if let Some((barrier, kind, id)) = &min_committed
+                    && seg.end_offset > *barrier
+                {
+                    tracing::warn!(
+                        "Segment [{}..{}] blocked from expiry-based deletion \
+                         by {kind} (ID: {id}) at offset {barrier} \
+                         in partition {partition_id} (stream: {stream_id}, 
topic: {topic_id})",
+                        seg.start_offset,
+                        seg.end_offset,
+                    );
+                    continue;
+                }
+                offsets.push(seg.start_offset);
+            }
+
+            offsets
         };
 
         let mut total_segments = 0u64;
@@ -172,6 +193,23 @@ impl IggyShard {
                 continue;
             }
 
+            let min_committed = Self::min_committed_offset(
+                &partition.consumer_offsets,
+                &partition.consumer_group_offsets,
+            );
+            if let Some((barrier, kind, id)) = &min_committed
+                && first.end_offset > *barrier
+            {
+                tracing::warn!(
+                    "Segment [{}..{}] blocked from size-based deletion \
+                     by {kind} (ID: {id}) at offset {barrier} \
+                     in partition {partition_id} (stream: {stream_id}, topic: 
{topic_id})",
+                    first.start_offset,
+                    first.end_offset,
+                );
+                continue;
+            }
+
             match &oldest {
                 None => oldest = Some((partition_id, first.start_offset, 
first.start_timestamp)),
                 Some((_, _, ts)) if first.start_timestamp < *ts => {
@@ -271,11 +309,11 @@ impl IggyShard {
     }
 
     /// Deletes the N oldest **sealed** segments from a partition, preserving 
the active segment
-    /// and partition offset. Reuses `remove_segment_by_offset` — same logic 
as the message cleaner.
+    /// and partition offset. Reuses `remove_segment_by_offset` - same logic 
as the message cleaner.
     ///
-    /// Segments containing unconsumed messages are protected: a segment is 
only eligible for
-    /// deletion if `end_offset <= min_committed_offset` across all consumers 
and consumer groups.
-    /// If no consumers exist, there is no barrier.
+    /// Segments containing unconsumed messages are protected by a barrier: 
deletion is skipped
+    /// when `end_offset > min_committed_offset` and a warning is logged 
identifying the
+    /// blocking consumer. If no consumers exist, there is no barrier.
     pub(crate) async fn delete_oldest_segments(
         &self,
         stream_id: usize,
@@ -298,18 +336,33 @@ impl IggyShard {
 
             let segments = partition.log.segments();
             let last_idx = segments.len().saturating_sub(1);
+            let mut offsets = Vec::new();
+            let mut collected = 0u32;
 
-            segments
-                .iter()
-                .enumerate()
-                .filter(|(idx, seg)| {
-                    *idx != last_idx
-                        && seg.sealed
-                        && min_committed.is_none_or(|barrier| seg.end_offset 
<= barrier)
-                })
-                .map(|(_, seg)| seg.start_offset)
-                .take(segments_count as usize)
-                .collect()
+            for (idx, seg) in segments.iter().enumerate() {
+                if collected >= segments_count {
+                    break;
+                }
+                if idx == last_idx || !seg.sealed {
+                    continue;
+                }
+                if let Some((barrier, kind, id)) = &min_committed
+                    && seg.end_offset > *barrier
+                {
+                    tracing::warn!(
+                        "Segment [{}..{}] blocked from size-based deletion \
+                         by {kind} (ID: {id}) at offset {barrier} \
+                         in partition {partition_id} (stream: {stream_id}, 
topic: {topic_id})",
+                        seg.start_offset,
+                        seg.end_offset,
+                    );
+                    continue;
+                }
+                offsets.push(seg.start_offset);
+                collected += 1;
+            }
+
+            offsets
         };
 
         let mut total_segments = 0u64;
@@ -325,20 +378,29 @@ impl IggyShard {
     }
 
     /// Returns the minimum committed offset across all consumers and consumer 
groups,
-    /// or `None` if no consumers exist (no barrier).
+    /// along with the identity of the consumer holding it. Returns `None` if 
no
+    /// consumers exist (no barrier).
     fn min_committed_offset(
         consumer_offsets: 
&crate::streaming::partitions::consumer_offsets::ConsumerOffsets,
         consumer_group_offsets: 
&crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets,
-    ) -> Option<u64> {
+    ) -> Option<(u64, ConsumerKind, u32)> {
         let co_guard = consumer_offsets.pin();
         let cg_guard = consumer_group_offsets.pin();
-        let consumers = co_guard
-            .iter()
-            .map(|(_, co)| 
co.offset.load(std::sync::atomic::Ordering::Relaxed));
-        let groups = cg_guard
-            .iter()
-            .map(|(_, co)| 
co.offset.load(std::sync::atomic::Ordering::Relaxed));
-        consumers.chain(groups).min()
+        let consumers = co_guard.iter().map(|(_, co)| {
+            (
+                co.offset.load(std::sync::atomic::Ordering::Relaxed),
+                co.kind,
+                co.consumer_id,
+            )
+        });
+        let groups = cg_guard.iter().map(|(_, co)| {
+            (
+                co.offset.load(std::sync::atomic::Ordering::Relaxed),
+                co.kind,
+                co.consumer_id,
+            )
+        });
+        consumers.chain(groups).min_by_key(|(offset, _, _)| *offset)
     }
 
     /// Drains all segments, deletes their files, and re-initializes the 
partition log at offset 0.
diff --git a/core/server/src/streaming/partitions/journal.rs 
b/core/server/src/streaming/partitions/journal.rs
index d16db4f6d..87bd9cfcb 100644
--- a/core/server/src/streaming/partitions/journal.rs
+++ b/core/server/src/streaming/partitions/journal.rs
@@ -19,11 +19,11 @@ use crate::streaming::segments::{IggyMessagesBatchMut, 
IggyMessagesBatchSet};
 use iggy_common::{IggyByteSize, IggyError};
 use std::fmt::Debug;
 
-// TODO: Will have to revisit this Journal abstraction....
-// I don't like that it has to leak impl detail via the `Inner` struct in 
order to be functional.
-
 #[derive(Default, Debug)]
 pub struct Inner {
+    /// Base offset for the next journal epoch. After commit(), set to
+    /// current_offset + 1. Used in `append()`: `current_offset = base_offset +
+    /// messages_count - 1`.
     pub base_offset: u64,
     pub current_offset: u64,
     pub first_timestamp: u64,
@@ -32,17 +32,30 @@ pub struct Inner {
     pub size: IggyByteSize,
 }
 
-#[derive(Default, Debug)]
+#[derive(Debug)]
 pub struct MemoryMessageJournal {
     batches: IggyMessagesBatchSet,
     inner: Inner,
 }
 
-impl Clone for MemoryMessageJournal {
-    fn clone(&self) -> Self {
+impl MemoryMessageJournal {
+    /// Create an empty journal for a fresh partition (no existing data).
+    pub fn empty() -> Self {
+        Self {
+            batches: IggyMessagesBatchSet::default(),
+            inner: Inner::default(),
+        }
+    }
+
+    /// Create an empty journal positioned at the given offset. Used after
+    /// bootstrap when the partition already has data on disk up to some 
offset.
+    pub fn at_offset(base_offset: u64) -> Self {
         Self {
-            batches: Default::default(),
-            inner: Default::default(),
+            batches: IggyMessagesBatchSet::default(),
+            inner: Inner {
+                base_offset,
+                ..Default::default()
+            },
         }
     }
 }
@@ -63,6 +76,26 @@ impl Journal for MemoryMessageJournal {
             batch_messages_count
         );
 
+        // Defense-in-depth: on first append after empty/default state, correct
+        // base_offset from the batch's actual first offset. Mirrors the 
existing
+        // first_timestamp initialization pattern below. Catches code paths 
that
+        // create a journal without calling init().
+        if self.inner.messages_count == 0
+            && let Some(first_offset) = entry.first_offset()
+        {
+            // Allow disagreement when either side is 0 (fresh partition or
+            // reset after purge). Only flag when both are non-zero and differ.
+            debug_assert!(
+                self.inner.base_offset == 0
+                    || first_offset == 0
+                    || self.inner.base_offset == first_offset,
+                "journal base_offset ({}) disagrees with batch first_offset 
({})",
+                self.inner.base_offset,
+                first_offset
+            );
+            self.inner.base_offset = first_offset;
+        }
+
         let batch_size = entry.size();
         let first_timestamp = entry.first_timestamp().unwrap();
         let last_timestamp = entry.last_timestamp().unwrap();
@@ -107,6 +140,38 @@ impl Journal for MemoryMessageJournal {
     fn inner(&self) -> &Self::Inner {
         &self.inner
     }
+
+    fn first_offset(&self) -> Option<u64> {
+        if self.is_empty() {
+            None
+        } else {
+            Some(self.inner.base_offset)
+        }
+    }
+
+    fn last_offset(&self) -> Option<u64> {
+        if self.is_empty() {
+            None
+        } else {
+            Some(self.inner.current_offset)
+        }
+    }
+
+    fn first_timestamp(&self) -> Option<u64> {
+        if self.is_empty() || self.inner.first_timestamp == 0 {
+            None
+        } else {
+            Some(self.inner.first_timestamp)
+        }
+    }
+
+    fn last_timestamp(&self) -> Option<u64> {
+        if self.is_empty() || self.inner.end_timestamp == 0 {
+            None
+        } else {
+            Some(self.inner.end_timestamp)
+        }
+    }
 }
 
 pub trait Journal {
@@ -127,6 +192,18 @@ pub trait Journal {
 
     fn inner(&self) -> &Self::Inner;
 
+    /// First offset of data in the journal, or None if empty.
+    fn first_offset(&self) -> Option<u64>;
+
+    /// Last offset of data in the journal, or None if empty.
+    fn last_offset(&self) -> Option<u64>;
+
+    /// Timestamp of first message in journal, or None if empty.
+    fn first_timestamp(&self) -> Option<u64>;
+
+    /// Timestamp of last message in journal, or None if empty.
+    fn last_timestamp(&self) -> Option<u64>;
+
     // `flush` is only useful in case of an journal that has disk backed WAL.
     // This could be merged together with `append`, but not doing this for two 
reasons.
     // 1. In case of the `Journal` being used as part of structure that 
utilizes interior mutability, async with borrow_mut is not possible.
diff --git a/core/server/src/streaming/partitions/local_partition.rs 
b/core/server/src/streaming/partitions/local_partition.rs
index 14f5e63d9..de49720ef 100644
--- a/core/server/src/streaming/partitions/local_partition.rs
+++ b/core/server/src/streaming/partitions/local_partition.rs
@@ -56,7 +56,9 @@ impl LocalPartition {
         should_increment_offset: bool,
     ) -> Self {
         Self {
-            log: SegmentedLog::default(),
+            log: SegmentedLog::new(
+                
crate::streaming::partitions::journal::MemoryMessageJournal::empty(),
+            ),
             offset,
             consumer_offsets,
             consumer_group_offsets,
diff --git a/core/server/src/streaming/partitions/log.rs 
b/core/server/src/streaming/partitions/log.rs
index ae7c1efaa..16318553e 100644
--- a/core/server/src/streaming/partitions/log.rs
+++ b/core/server/src/streaming/partitions/log.rs
@@ -64,6 +64,18 @@ impl<J> SegmentedLog<J>
 where
     J: Journal + Debug,
 {
+    pub fn new(journal: J) -> Self {
+        Self {
+            journal,
+            _access_map: 
AllocRingBuffer::with_capacity_power_of_2(ACCESS_MAP_CAPACITY),
+            _cache: (),
+            segments: Vec::with_capacity(SEGMENTS_CAPACITY),
+            storage: Vec::with_capacity(SEGMENTS_CAPACITY),
+            indexes: Vec::with_capacity(SEGMENTS_CAPACITY),
+            in_flight: IggyMessagesBatchSetInFlight::default(),
+        }
+    }
+
     pub fn has_segments(&self) -> bool {
         !self.segments.is_empty()
     }
diff --git a/core/server/src/streaming/partitions/mod.rs 
b/core/server/src/streaming/partitions/mod.rs
index f8a3593ed..ee15942f9 100644
--- a/core/server/src/streaming/partitions/mod.rs
+++ b/core/server/src/streaming/partitions/mod.rs
@@ -26,6 +26,8 @@ pub mod local_partition;
 pub mod local_partitions;
 pub mod log;
 pub mod ops;
+#[cfg(test)]
+mod ops_tests;
 pub mod segments;
 pub mod storage;
 
diff --git a/core/server/src/streaming/partitions/ops.rs 
b/core/server/src/streaming/partitions/ops.rs
index aa481b7b6..489e35b8a 100644
--- a/core/server/src/streaming/partitions/ops.rs
+++ b/core/server/src/streaming/partitions/ops.rs
@@ -20,6 +20,20 @@
 //!
 //! This module provides the core logic for polling and loading messages from 
partitions,
 //! avoiding code duplication between `IggyShard` and test harnesses.
+//!
+//! # Safety invariants
+//!
+//! The snapshot-then-read pattern in [`get_messages_by_offset`] and
+//! [`poll_messages_by_timestamp`] is safe only under **single-threaded shard
+//! execution** (compio runtime). Between the metadata snapshot and the actual
+//! reads, no other shard request can mutate the partition state because the
+//! message pump processes one request at a time.
+//!
+//! The poll + auto_commit sequence in the handler (`handlers.rs`) is likewise
+//! non-atomic but safe for the same reason.
+//!
+//! If the architecture ever moves to multi-threaded shard processing or adds
+//! compaction/message deletion, these invariants must be re-evaluated.
 
 use super::journal::Journal;
 use super::local_partitions::LocalPartitions;
@@ -131,100 +145,87 @@ pub async fn get_messages_by_offset(
         return Ok(IggyMessagesBatchSet::empty());
     }
 
-    // Get journal and in_flight metadata for routing
-    let (
-        is_journal_empty,
-        journal_first_offset,
-        journal_last_offset,
-        in_flight_empty,
-        in_flight_first,
-        in_flight_last,
-    ) = {
+    // Snapshot journal and in-flight metadata for routing decisions.
+    let (journal_first_offset, in_flight_empty, in_flight_first, 
in_flight_last) = {
         let store = local_partitions.borrow();
         let partition = store
             .get(namespace)
             .expect("local_partitions: partition must exist for poll");
 
         let journal = partition.log.journal();
-        let journal_inner = journal.inner();
         let in_flight = partition.log.in_flight();
         (
-            journal.is_empty(),
-            journal_inner.base_offset,
-            journal_inner.current_offset,
+            journal.first_offset(),
             in_flight.is_empty(),
             in_flight.first_offset(),
             in_flight.last_offset(),
         )
     };
 
-    let end_offset = start_offset + (count - 1).max(1) as u64;
-
-    // Case 0: Journal is empty - check in_flight buffer or disk
-    if is_journal_empty {
-        if !in_flight_empty && start_offset >= in_flight_first && start_offset 
<= in_flight_last {
-            let in_flight_batches = {
-                let store = local_partitions.borrow();
-                let partition = store
-                    .get(namespace)
-                    .expect("local_partitions: partition must exist for poll");
-                partition
-                    .log
-                    .in_flight()
-                    .get_by_offset(start_offset, count)
-                    .to_vec()
-            };
-            if !in_flight_batches.is_empty() {
-                let mut result = IggyMessagesBatchSet::empty();
-                result.add_immutable_batches(&in_flight_batches);
-                return Ok(result.get_by_offset(start_offset, count));
-            }
+    // Lookup ordered by ascending offset: disk -> in-flight -> journal.
+    //
+    // Offsets are sequential: disk < in-flight < journal. A request may span
+    // multiple tiers, so we advance `current` through each sequentially.
+    // See issue #2715.
+
+    let mut combined = IggyMessagesBatchSet::empty();
+    let mut remaining = count;
+    let mut current = start_offset;
+
+    // Lowest in-memory tier boundary (if any). Disk handles offsets below 
this.
+    let in_memory_floor = if !in_flight_empty {
+        in_flight_first
+    } else {
+        journal_first_offset.unwrap_or(u64::MAX)
+    };
+
+    // Disk (pre-tier): offsets below the lowest in-memory tier.
+    if remaining > 0 && current < in_memory_floor {
+        let disk_count =
+            ((in_memory_floor.min(current + remaining as u64) - current) as 
u32).min(remaining);
+        let disk_messages =
+            load_messages_from_disk(local_partitions, namespace, current, 
disk_count).await?;
+        let loaded = disk_messages.count();
+        if loaded > 0 {
+            current += loaded as u64;
+            remaining = remaining.saturating_sub(loaded);
+            combined.add_batch_set(disk_messages);
         }
-        return load_messages_from_disk(local_partitions, namespace, 
start_offset, count).await;
     }
 
-    // Case 1: All messages are in journal
-    if start_offset >= journal_first_offset && end_offset <= 
journal_last_offset {
-        let batches = {
+    // In-flight: committed data being persisted to disk.
+    if remaining > 0 && !in_flight_empty && current >= in_flight_first && 
current <= in_flight_last
+    {
+        let in_flight_count = ((in_flight_last - current + 1) as 
u32).min(remaining);
+        let in_flight_batches = {
             let store = local_partitions.borrow();
             let partition = store
                 .get(namespace)
                 .expect("local_partitions: partition must exist for poll");
             partition
                 .log
-                .journal()
-                .get(|batches| batches.get_by_offset(start_offset, count))
+                .in_flight()
+                .get_by_offset(current, in_flight_count)
+                .to_vec()
         };
-        return Ok(batches);
-    }
-
-    // Case 2: All messages on disk (end_offset < journal_first_offset)
-    if end_offset < journal_first_offset {
-        return load_messages_from_disk(local_partitions, namespace, 
start_offset, count).await;
-    }
-
-    // Case 3: Messages span disk and journal boundary
-    let disk_count = if start_offset < journal_first_offset {
-        ((journal_first_offset - start_offset) as u32).min(count)
-    } else {
-        0
-    };
-
-    let mut combined_batch_set = IggyMessagesBatchSet::empty();
-
-    // Load messages from disk if needed
-    if disk_count > 0 {
-        let disk_messages =
-            load_messages_from_disk(local_partitions, namespace, start_offset, 
disk_count).await?;
-        if !disk_messages.is_empty() {
-            combined_batch_set.add_batch_set(disk_messages);
+        if !in_flight_batches.is_empty() {
+            let mut result = IggyMessagesBatchSet::empty();
+            result.add_immutable_batches(&in_flight_batches);
+            let sliced = result.get_by_offset(current, in_flight_count);
+            let loaded = sliced.count();
+            if loaded > 0 {
+                current += loaded as u64;
+                remaining = remaining.saturating_sub(loaded);
+                combined.add_batch_set(sliced);
+            }
         }
     }
 
-    // Get remaining messages from journal
-    let remaining_count = count - combined_batch_set.count();
-    if remaining_count > 0 {
-        let journal_start_offset = std::cmp::max(start_offset, 
journal_first_offset);
+    // Journal: may hold data from recent appends.
+    if remaining > 0
+        && let Some(jfo) = journal_first_offset
+        && current >= jfo
+    {
         let journal_messages = {
             let store = local_partitions.borrow();
             let partition = store
@@ -233,14 +234,14 @@ pub async fn get_messages_by_offset(
             partition
                 .log
                 .journal()
-                .get(|batches| batches.get_by_offset(journal_start_offset, 
remaining_count))
+                .get(|batches| batches.get_by_offset(current, remaining))
         };
         if !journal_messages.is_empty() {
-            combined_batch_set.add_batch_set(journal_messages);
+            combined.add_batch_set(journal_messages);
         }
     }
 
-    Ok(combined_batch_set)
+    Ok(combined)
 }
 
 /// Poll messages by timestamp.
@@ -252,8 +253,15 @@ async fn poll_messages_by_timestamp(
 ) -> Result<(IggyPollMetadata, IggyMessagesBatchSet), IggyError> {
     let partition_id = namespace.partition_id();
 
-    // Get metadata and journal info
-    let (metadata, is_journal_empty, journal_first_timestamp, 
journal_last_timestamp) = {
+    // Snapshot metadata from journal and in-flight for routing decisions.
+    let (
+        metadata,
+        journal_first_ts,
+        journal_last_ts,
+        in_flight_empty,
+        in_flight_first_ts,
+        in_flight_last_ts,
+    ) = {
         let store = local_partitions.borrow();
         let partition = store
             .get(namespace)
@@ -263,12 +271,31 @@ async fn poll_messages_by_timestamp(
         let metadata = IggyPollMetadata::new(partition_id as u32, 
current_offset);
 
         let journal = partition.log.journal();
-        let journal_inner = journal.inner();
+
+        let in_flight = partition.log.in_flight();
+        let (ife, ifts, ilts) = if in_flight.is_empty() {
+            (true, 0u64, 0u64)
+        } else {
+            let first_ts = in_flight
+                .batches()
+                .first()
+                .and_then(|b| b.first_timestamp())
+                .unwrap_or(0);
+            let last_ts = in_flight
+                .batches()
+                .last()
+                .and_then(|b| b.last_timestamp())
+                .unwrap_or(0);
+            (false, first_ts, last_ts)
+        };
+
         (
             metadata,
-            journal.is_empty(),
-            journal_inner.first_timestamp,
-            journal_inner.end_timestamp,
+            journal.first_timestamp(),
+            journal.last_timestamp(),
+            ife,
+            ifts,
+            ilts,
         )
     };
 
@@ -276,22 +303,57 @@ async fn poll_messages_by_timestamp(
         return Ok((metadata, IggyMessagesBatchSet::empty()));
     }
 
-    // Case 0: Journal is empty, all messages on disk
-    if is_journal_empty {
-        let batches =
-            load_messages_from_disk_by_timestamp(local_partitions, namespace, 
timestamp, count)
+    // Three-tier timestamp lookup: disk -> in-flight -> journal.
+    // Same structure as offset-based polling (see issue #2715).
+
+    let mut combined = IggyMessagesBatchSet::empty();
+    let mut remaining = count;
+
+    // Phase 1: Disk - timestamps before in-flight range.
+    let disk_upper_ts = if !in_flight_empty {
+        in_flight_first_ts
+    } else {
+        journal_first_ts.unwrap_or(u64::MAX)
+    };
+
+    if timestamp < disk_upper_ts && remaining > 0 {
+        let disk_messages =
+            load_messages_from_disk_by_timestamp(local_partitions, namespace, 
timestamp, remaining)
                 .await?;
-        return Ok((metadata, batches));
+        let loaded = disk_messages.count();
+        if loaded > 0 {
+            remaining = remaining.saturating_sub(loaded);
+            combined.add_batch_set(disk_messages);
+        }
     }
 
-    // Case 1: Timestamp is after journal's last timestamp - no messages
-    if timestamp > journal_last_timestamp {
-        return Ok((metadata, IggyMessagesBatchSet::empty()));
+    // Phase 2: In-flight - committed data being persisted.
+    if remaining > 0 && !in_flight_empty && timestamp <= in_flight_last_ts {
+        let in_flight_batches = {
+            let store = local_partitions.borrow();
+            let partition = store
+                .get(namespace)
+                .expect("local_partitions: partition must exist for poll");
+            partition.log.in_flight().batches().to_vec()
+        };
+        if !in_flight_batches.is_empty() {
+            let mut batch_set = IggyMessagesBatchSet::empty();
+            batch_set.add_immutable_batches(&in_flight_batches);
+            let filtered = batch_set.get_by_timestamp(timestamp, remaining);
+            let loaded = filtered.count();
+            if loaded > 0 {
+                remaining = remaining.saturating_sub(loaded);
+                combined.add_batch_set(filtered);
+            }
+        }
     }
 
-    // Case 2: Timestamp is within journal range - get from journal
-    if timestamp >= journal_first_timestamp {
-        let batches = {
+    // Phase 3: Journal - newest appends (post-commit).
+    if remaining > 0
+        && let Some(jlts) = journal_last_ts
+        && timestamp <= jlts
+    {
+        let journal_messages = {
             let store = local_partitions.borrow();
             let partition = store
                 .get(namespace)
@@ -299,37 +361,14 @@ async fn poll_messages_by_timestamp(
             partition
                 .log
                 .journal()
-                .get(|batches| batches.get_by_timestamp(timestamp, count))
+                .get(|batches| batches.get_by_timestamp(timestamp, remaining))
         };
-        return Ok((metadata, batches));
-    }
-
-    // Case 3: Timestamp is before journal - need disk + possibly journal
-    let disk_messages =
-        load_messages_from_disk_by_timestamp(local_partitions, namespace, 
timestamp, count).await?;
-
-    if disk_messages.count() >= count {
-        return Ok((metadata, disk_messages));
+        if !journal_messages.is_empty() {
+            combined.add_batch_set(journal_messages);
+        }
     }
 
-    // Case 4: Messages span disk and journal
-    let remaining_count = count - disk_messages.count();
-    let journal_messages = {
-        let store = local_partitions.borrow();
-        let partition = store
-            .get(namespace)
-            .expect("local_partitions: partition must exist for poll");
-        partition
-            .log
-            .journal()
-            .get(|batches| batches.get_by_timestamp(timestamp, 
remaining_count))
-    };
-
-    let mut combined_batch_set = disk_messages;
-    if !journal_messages.is_empty() {
-        combined_batch_set.add_batch_set(journal_messages);
-    }
-    Ok((metadata, combined_batch_set))
+    Ok((metadata, combined))
 }
 
 /// Load messages from disk by offset.
@@ -388,7 +427,7 @@ pub async fn load_messages_from_disk(
             current_offset
         };
 
-        let mut end_offset = offset + (remaining_count - 1).max(1) as u64;
+        let mut end_offset = offset + (remaining_count - 1) as u64;
         if end_offset > segment_end_offset {
             end_offset = segment_end_offset;
         }
@@ -429,7 +468,7 @@ async fn load_segment_messages(
 ) -> Result<IggyMessagesBatchSet, IggyError> {
     let relative_start_offset = (start_offset - segment_start_offset) as u32;
 
-    // Check journal first for this segment's data
+    // Check journal for this segment's data (handles callers outside 
get_messages_by_offset).
     let journal_data = {
         let store = local_partitions.borrow();
         let partition = store
@@ -437,14 +476,10 @@ async fn load_segment_messages(
             .expect("local_partitions: partition must exist");
 
         let journal = partition.log.journal();
-        let is_journal_empty = journal.is_empty();
-        let journal_inner = journal.inner();
-        let journal_first_offset = journal_inner.base_offset;
-        let journal_last_offset = journal_inner.current_offset;
-
-        if !is_journal_empty
-            && start_offset >= journal_first_offset
-            && end_offset <= journal_last_offset
+
+        if let (Some(jfo), Some(jlo)) = (journal.first_offset(), 
journal.last_offset())
+            && start_offset >= jfo
+            && end_offset <= jlo
         {
             Some(journal.get(|batches| batches.get_by_offset(start_offset, 
count)))
         } else {
@@ -617,14 +652,10 @@ async fn load_segment_messages_by_timestamp(
             .expect("local_partitions: partition must exist");
 
         let journal = partition.log.journal();
-        let is_journal_empty = journal.is_empty();
-        let journal_inner = journal.inner();
-        let journal_first_timestamp = journal_inner.first_timestamp;
-        let journal_last_timestamp = journal_inner.end_timestamp;
-
-        if !is_journal_empty
-            && timestamp >= journal_first_timestamp
-            && timestamp <= journal_last_timestamp
+
+        if let (Some(jfts), Some(jlts)) = (journal.first_timestamp(), 
journal.last_timestamp())
+            && timestamp >= jfts
+            && timestamp <= jlts
         {
             Some(journal.get(|batches| batches.get_by_timestamp(timestamp, 
count)))
         } else {
diff --git a/core/server/src/streaming/partitions/ops_tests.rs 
b/core/server/src/streaming/partitions/ops_tests.rs
new file mode 100644
index 000000000..fbed5c2ed
--- /dev/null
+++ b/core/server/src/streaming/partitions/ops_tests.rs
@@ -0,0 +1,359 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Tests for the in-flight buffer visibility gap fix (issue #2715).
+//!
+//! These tests set up "State C" directly in memory:
+//!   - in-flight holds offsets [0..N-1]  (committed journal, not yet on disk)
+//!   - journal holds offsets  [N..N+M-1] (new appends after commit)
+//!   - no actual disk data
+//!
+//! Before the fix, Cases 1-3 in get_messages_by_offset never checked
+//! in-flight, causing the consumer to miss committed data and either
+//! get empty results or skip directly to journal offsets.
+
+#[cfg(test)]
+mod tests {
+    use 
crate::streaming::partitions::consumer_group_offsets::ConsumerGroupOffsets;
+    use crate::streaming::partitions::consumer_offsets::ConsumerOffsets;
+    use crate::streaming::partitions::journal::{Inner, Journal};
+    use crate::streaming::partitions::local_partition::LocalPartition;
+    use crate::streaming::partitions::local_partitions::LocalPartitions;
+    use crate::streaming::partitions::ops;
+    use crate::streaming::polling_consumer::PollingConsumer;
+    use crate::streaming::stats::{PartitionStats, StreamStats, TopicStats};
+    use iggy_common::sharding::IggyNamespace;
+    use iggy_common::{
+        IggyByteSize, IggyMessage, MemoryPool, MemoryPoolConfigOther, 
PollingStrategy, Sizeable,
+    };
+    use std::cell::RefCell;
+    use std::sync::Arc;
+    use std::sync::atomic::AtomicU64;
+
+    fn init_memory_pool() {
+        static INIT: std::sync::Once = std::sync::Once::new();
+        INIT.call_once(|| {
+            let config = MemoryPoolConfigOther {
+                enabled: false,
+                size: IggyByteSize::from(64 * 1024 * 1024u64),
+                bucket_capacity: 256,
+            };
+            MemoryPool::init_pool(&config);
+        });
+    }
+
+    fn create_test_partition(current_offset: u64) -> LocalPartition {
+        let stream_stats = Arc::new(StreamStats::default());
+        let topic_stats = Arc::new(TopicStats::new(stream_stats));
+        let partition_stats = Arc::new(PartitionStats::new(topic_stats));
+
+        LocalPartition::new(
+            partition_stats,
+            Arc::new(AtomicU64::new(current_offset)),
+            Arc::new(ConsumerOffsets::with_capacity(10)),
+            Arc::new(ConsumerGroupOffsets::with_capacity(10)),
+            None,
+            iggy_common::IggyTimestamp::now(),
+            1,
+            true,
+        )
+    }
+
+    fn create_batch(count: u32) -> iggy_common::IggyMessagesBatchMut {
+        let messages: Vec<IggyMessage> = (0..count)
+            .map(|_| {
+                IggyMessage::builder()
+                    .payload(bytes::Bytes::from("test-payload"))
+                    .build()
+                    .unwrap()
+            })
+            .collect();
+
+        let messages_size: u32 = messages
+            .iter()
+            .map(|m| m.get_size_bytes().as_bytes_u32())
+            .sum();
+        iggy_common::IggyMessagesBatchMut::from_messages(&messages, 
messages_size)
+    }
+
+    /// Sets up "State C": in-flight holds committed data, journal holds new
+    /// appends that arrived after commit but before persist completes.
+    ///
+    /// Layout:
+    ///   segment metadata:  [0..journal_end]   (no actual disk data)
+    ///   in-flight:         [0..in_flight_count-1]
+    ///   journal:           [in_flight_count..in_flight_count+journal_count-1]
+    ///   partition.offset:  in_flight_count + journal_count - 1
+    async fn setup_state_c(
+        in_flight_count: u32,
+        journal_count: u32,
+    ) -> (RefCell<LocalPartitions>, IggyNamespace) {
+        init_memory_pool();
+        let ns = IggyNamespace::new(1, 1, 0);
+
+        let in_flight_end = in_flight_count as u64 - 1;
+        let journal_base = in_flight_end + 1;
+        let journal_end = journal_base + journal_count as u64 - 1;
+
+        let mut partition = create_test_partition(journal_end);
+
+        let segment = iggy_common::Segment::new(0, 
IggyByteSize::from(1_073_741_824u64));
+        let storage = iggy_common::SegmentStorage::default();
+        partition.log.add_persisted_segment(segment, storage);
+
+        let seg = &mut partition.log.segments_mut()[0];
+        seg.end_offset = journal_end;
+        seg.start_timestamp = 1;
+        seg.end_timestamp = 2;
+
+        let mut in_flight_batch = create_batch(in_flight_count);
+        in_flight_batch.prepare_for_persistence(0, 0, 0, None).await;
+        let in_flight_size = in_flight_batch.size();
+        partition.log.set_in_flight(vec![in_flight_batch.freeze()]);
+
+        let journal_inner = Inner {
+            base_offset: journal_base,
+            current_offset: 0,
+            first_timestamp: 0,
+            end_timestamp: 0,
+            messages_count: 0,
+            size: IggyByteSize::default(),
+        };
+        partition.log.journal_mut().init(journal_inner);
+
+        let mut journal_batch = create_batch(journal_count);
+        journal_batch
+            .prepare_for_persistence(0, journal_base, in_flight_size as u32, 
None)
+            .await;
+        partition.log.journal_mut().append(journal_batch).unwrap();
+
+        let mut store = LocalPartitions::new();
+        store.insert(ns, partition);
+        (RefCell::new(store), ns)
+    }
+
+    // -----------------------------------------------------------------------
+    // Issue #2715: In-flight buffer must be reachable when journal is 
non-empty
+    // -----------------------------------------------------------------------
+
+    #[compio::test]
+    async fn in_flight_reachable_when_journal_non_empty() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let batches = ops::get_messages_by_offset(&store, &ns, 0, 5)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 5);
+        assert_eq!(batches.first_offset(), Some(0));
+    }
+
+    #[compio::test]
+    async fn spanning_in_flight_and_journal_returns_all_in_order() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let batches = ops::get_messages_by_offset(&store, &ns, 0, 15)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 15);
+        assert_eq!(batches.first_offset(), Some(0));
+    }
+
+    #[compio::test]
+    async fn polling_next_starts_from_in_flight_not_journal() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let consumer = PollingConsumer::Consumer(1, 0);
+        let args =
+            
crate::shard::system::messages::PollingArgs::new(PollingStrategy::next(), 15, 
false);
+        let (metadata, batches) = ops::poll_messages(&store, &ns, consumer, 
args)
+            .await
+            .unwrap();
+        assert_eq!(batches.first_offset(), Some(0));
+        assert!(metadata.current_offset >= 14);
+    }
+
+    #[compio::test]
+    async fn single_message_at_in_flight_journal_boundary() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let batches = ops::get_messages_by_offset(&store, &ns, 9, 1)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 1);
+        assert_eq!(batches.first_offset(), Some(9));
+    }
+
+    #[compio::test]
+    async fn single_message_from_in_flight_at_offset_zero() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let batches = ops::get_messages_by_offset(&store, &ns, 0, 1)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 1);
+        assert_eq!(batches.first_offset(), Some(0));
+    }
+
+    // -----------------------------------------------------------------------
+    // Existing correct behavior must still work
+    // -----------------------------------------------------------------------
+
+    #[compio::test]
+    async fn in_flight_reachable_when_journal_empty() {
+        init_memory_pool();
+        let ns = IggyNamespace::new(1, 1, 0);
+        let mut partition = create_test_partition(9);
+
+        let segment = iggy_common::Segment::new(0, 
IggyByteSize::from(1_073_741_824u64));
+        partition
+            .log
+            .add_persisted_segment(segment, 
iggy_common::SegmentStorage::default());
+        let seg = &mut partition.log.segments_mut()[0];
+        seg.end_offset = 9;
+        seg.start_timestamp = 1;
+        seg.end_timestamp = 2;
+
+        let mut batch = create_batch(10);
+        batch.prepare_for_persistence(0, 0, 0, None).await;
+        partition.log.set_in_flight(vec![batch.freeze()]);
+
+        let mut store = LocalPartitions::new();
+        store.insert(ns, partition);
+        let store = RefCell::new(store);
+
+        let batches = ops::get_messages_by_offset(&store, &ns, 0, 10)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 10);
+    }
+
+    #[compio::test]
+    async fn journal_reachable_when_in_flight_empty() {
+        init_memory_pool();
+        let ns = IggyNamespace::new(1, 1, 0);
+        let mut partition = create_test_partition(9);
+
+        let segment = iggy_common::Segment::new(0, 
IggyByteSize::from(1_073_741_824u64));
+        partition
+            .log
+            .add_persisted_segment(segment, 
iggy_common::SegmentStorage::default());
+        let seg = &mut partition.log.segments_mut()[0];
+        seg.end_offset = 9;
+        seg.start_timestamp = 1;
+        seg.end_timestamp = 2;
+
+        partition.log.journal_mut().init(Inner {
+            base_offset: 0,
+            current_offset: 0,
+            first_timestamp: 0,
+            end_timestamp: 0,
+            messages_count: 0,
+            size: IggyByteSize::default(),
+        });
+
+        let mut batch = create_batch(10);
+        batch.prepare_for_persistence(0, 0, 0, None).await;
+        partition.log.journal_mut().append(batch).unwrap();
+
+        let mut store = LocalPartitions::new();
+        store.insert(ns, partition);
+        let store = RefCell::new(store);
+
+        let batches = ops::get_messages_by_offset(&store, &ns, 0, 10)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 10);
+    }
+
+    #[compio::test]
+    async fn journal_single_message_at_specific_offset() {
+        let (store, ns) = setup_state_c(10, 5).await;
+        let batches = ops::get_messages_by_offset(&store, &ns, 12, 1)
+            .await
+            .unwrap();
+        assert_eq!(batches.count(), 1);
+        assert_eq!(batches.first_offset(), Some(12));
+    }
+
+    // -----------------------------------------------------------------------
+    // Bug reproduction: journal base_offset=0 after restart causes offset skip
+    // -----------------------------------------------------------------------
+
+    /// Verifies that journal self-heals base_offset on first append.
+    /// Without self-healing, a journal created via Default would have
+    /// base_offset=0, causing incorrect offset calculations.
+    #[compio::test]
+    async fn journal_self_heals_base_offset_on_first_append() {
+        init_memory_pool();
+
+        let mut journal = 
crate::streaming::partitions::journal::MemoryMessageJournal::empty();
+        assert_eq!(journal.inner().base_offset, 0);
+
+        let mut batch = create_batch(5);
+        batch.prepare_for_persistence(0, 100, 0, None).await;
+        journal.append(batch).unwrap();
+
+        assert_eq!(
+            journal.inner().base_offset,
+            100,
+            "Journal should self-heal base_offset from batch's first offset"
+        );
+        assert_eq!(
+            journal.inner().current_offset,
+            104,
+            "current_offset should be base_offset + messages_count - 1"
+        );
+    }
+
+    /// Verifies that slice_by_offset returns None when start_offset is below
+    /// the batch's range, instead of clamping to index 0 (the old bug).
+    #[compio::test]
+    async fn slice_by_offset_rejects_offset_below_range() {
+        init_memory_pool();
+
+        let mut batch = create_batch(10);
+        batch.prepare_for_persistence(0, 100, 0, None).await;
+
+        let result = batch.slice_by_offset(95, 10);
+
+        assert!(
+            result.is_none(),
+            "slice_by_offset should return None when start_offset(95) < 
first_offset(100), \
+             got {} messages at offset {:?}",
+            result.as_ref().map(|r| r.count()).unwrap_or(0),
+            result.as_ref().and_then(|r| r.first_offset())
+        );
+    }
+
+    /// After proper journal initialization, polling across the 
in-flight/journal
+    /// boundary returns contiguous messages with no gaps.
+    #[compio::test]
+    async fn post_restart_poll_with_correct_journal_init_no_skip() {
+        let (store, ns) = setup_state_c(100, 10).await;
+
+        let batches = ops::get_messages_by_offset(&store, &ns, 95, 10)
+            .await
+            .unwrap();
+
+        assert_eq!(batches.count(), 10, "should return exactly 10 messages");
+        assert_eq!(
+            batches.first_offset(),
+            Some(95),
+            "first message should be at requested offset 95"
+        );
+        assert_eq!(
+            batches.last_offset(),
+            Some(104),
+            "last message should be at offset 104 (contiguous)"
+        );
+    }
+}

Reply via email to