This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new b55d065d8 feat(metadata): Persistent WAL journal with recovery and 
compaction (#2916)
b55d065d8 is described below

commit b55d065d80c90a27d9102c3b781a80662de9250e
Author: Krishna Vishal <[email protected]>
AuthorDate: Thu Mar 26 23:41:39 2026 +0530

    feat(metadata): Persistent WAL journal with recovery and compaction (#2916)
---
 Cargo.lock                            |  10 +
 core/journal/Cargo.toml               |   7 +
 core/journal/src/file_storage.rs      | 149 +++++++
 core/journal/src/lib.rs               |  43 +-
 core/journal/src/metadata_journal.rs  | 717 ++++++++++++++++++++++++++++++++++
 core/metadata/Cargo.toml              |   6 +
 core/metadata/src/impls/metadata.rs   | 211 +++++++++-
 core/metadata/src/impls/mod.rs        |   4 +
 core/metadata/src/impls/recovery.rs   | 269 +++++++++++++
 core/metadata/src/stm/snapshot.rs     |  54 +++
 core/partitions/src/iggy_partition.rs |   6 +-
 core/partitions/src/journal.rs        |  46 ++-
 core/partitions/src/log.rs            |   2 +-
 core/simulator/src/deps.rs            |  33 +-
 core/simulator/src/replica.rs         |  13 +-
 15 files changed, 1529 insertions(+), 41 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 827bef7a4..d43b9a2db 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6046,6 +6046,13 @@ dependencies = [
 [[package]]
 name = "journal"
 version = "0.1.0"
+dependencies = [
+ "bytemuck",
+ "bytes",
+ "compio",
+ "iggy_binary_protocol",
+ "tempfile",
+]
 
 [[package]]
 name = "js-sys"
@@ -6657,7 +6664,9 @@ name = "metadata"
 version = "0.1.0"
 dependencies = [
  "ahash 0.8.12",
+ "bytemuck",
  "bytes",
+ "compio",
  "consensus",
  "iggy_binary_protocol",
  "iggy_common",
@@ -6669,6 +6678,7 @@ dependencies = [
  "secrecy",
  "serde",
  "slab",
+ "tempfile",
  "tracing",
 ]
 
diff --git a/core/journal/Cargo.toml b/core/journal/Cargo.toml
index 32b46aff4..bd3a0e520 100644
--- a/core/journal/Cargo.toml
+++ b/core/journal/Cargo.toml
@@ -28,6 +28,13 @@ repository = "https://github.com/apache/iggy";
 readme = "../../../README.md"
 
 [dependencies]
+bytemuck = { workspace = true }
+bytes = { workspace = true }
+compio = { workspace = true }
+iggy_binary_protocol = { workspace = true }
+
+[dev-dependencies]
+tempfile = { workspace = true }
 
 [lints.clippy]
 enum_glob_use = "deny"
diff --git a/core/journal/src/file_storage.rs b/core/journal/src/file_storage.rs
new file mode 100644
index 000000000..6517f5b90
--- /dev/null
+++ b/core/journal/src/file_storage.rs
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Storage;
+use compio::buf::IoBuf;
+use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
+use std::cell::{Cell, UnsafeCell};
+use std::io;
+use std::path::{Path, PathBuf};
+
+/// File-backed storage implementing the `Storage` trait.
+pub struct FileStorage {
+    file: UnsafeCell<compio::fs::File>,
+    write_offset: Cell<u64>,
+    path: PathBuf,
+}
+
+#[allow(clippy::future_not_send)]
+impl FileStorage {
+    /// Open or create the file at `path`, setting `write_offset` to current 
file length.
+    ///
+    /// # Errors
+    /// Returns an I/O error if the file cannot be opened or created.
+    pub async fn open(path: &Path) -> io::Result<Self> {
+        let file = compio::fs::OpenOptions::new()
+            .read(true)
+            .write(true)
+            .create(true)
+            .truncate(false)
+            .open(path)
+            .await?;
+        let len = file.metadata().await?.len();
+        Ok(Self {
+            file: UnsafeCell::new(file),
+            write_offset: Cell::new(len),
+            path: path.to_path_buf(),
+        })
+    }
+
+    /// Current file size (tracks append position).
+    pub const fn file_len(&self) -> u64 {
+        self.write_offset.get()
+    }
+
+    /// Truncate the file to `len` bytes.
+    ///
+    /// # Errors
+    /// Returns an I/O error if truncation fails.
+    pub async fn truncate(&self, len: u64) -> io::Result<()> {
+        let file = unsafe { &*self.file.get() };
+        file.set_len(len).await?;
+        self.write_offset.set(len);
+        Ok(())
+    }
+
+    /// Fsync the file to disk.
+    ///
+    /// # Errors
+    /// Returns an I/O error if sync fails.
+    pub async fn fsync(&self) -> io::Result<()> {
+        // SAFETY: single-threaded compio runtime, no concurrent access to the 
file.
+        unsafe { &*self.file.get() }.sync_data().await
+    }
+
+    /// Positional read into `buf`. Returns the buffer with data filled in.
+    ///
+    /// # Errors
+    /// Returns an I/O error if the read fails.
+    pub async fn read_at(&self, offset: u64, buf: Vec<u8>) -> 
io::Result<Vec<u8>> {
+        // SAFETY: single-threaded compio runtime, no concurrent access to the 
file.
+        let file = unsafe { &*self.file.get() };
+        let (result, buf) = file.read_exact_at(buf, offset).await.into();
+        result?;
+        Ok(buf)
+    }
+
+    /// Append write, returns bytes written.
+    ///
+    /// # Errors
+    /// Returns an I/O error if the write fails.
+    #[allow(clippy::cast_possible_truncation)]
+    pub async fn write_append<B: IoBuf>(&self, buf: B) -> io::Result<usize> {
+        let len = buf.buf_len();
+        // SAFETY: single-threaded compio runtime, no concurrent access to the 
file.
+        let file = unsafe { &mut *self.file.get() };
+        let (result, _buf) = file.write_all_at(buf, 
self.write_offset.get()).await.into();
+        result?;
+        self.write_offset.set(self.write_offset.get() + len as u64);
+        Ok(len)
+    }
+
+    /// The file path this storage was opened with.
+    pub fn path(&self) -> &Path {
+        &self.path
+    }
+
+    /// Reopen the underlying file descriptor at the stored path.
+    ///
+    /// Used after an atomic rename replaces the file on disk.
+    ///
+    /// # Errors
+    /// Returns an I/O error if the file cannot be reopened.
+    pub async fn reopen(&self) -> io::Result<()> {
+        let file = compio::fs::OpenOptions::new()
+            .read(true)
+            .write(true)
+            .open(&self.path)
+            .await?;
+        let len = file.metadata().await?.len();
+        // SAFETY: single-threaded compio runtime, no concurrent access to the 
file.
+        unsafe { *self.file.get() = file };
+        self.write_offset.set(len);
+        Ok(())
+    }
+}
+
+#[allow(clippy::future_not_send)]
+impl Storage for FileStorage {
+    type Buffer = Vec<u8>;
+
+    async fn write_at(&self, offset: usize, buf: Self::Buffer) -> 
io::Result<usize> {
+        let len = buf.buf_len();
+        let file = unsafe { &mut *self.file.get() };
+        let (result, _buf) = file.write_all_at(buf, offset as 
u64).await.into();
+        result?;
+        Ok(len)
+    }
+
+    async fn read_at(&self, offset: usize, buffer: Self::Buffer) -> 
io::Result<Self::Buffer> {
+        let file = unsafe { &*self.file.get() };
+        let (result, buffer) = file.read_exact_at(buffer, offset as 
u64).await.into();
+        result?;
+        Ok(buffer)
+    }
+}
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index 55a6b14fb..9c610f187 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -15,7 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use std::ops::Deref;
+use std::io;
+use std::ops::{Deref, RangeInclusive};
+
+pub mod file_storage;
+pub mod metadata_journal;
 
 pub trait Journal<S>
 where
@@ -30,19 +34,44 @@ where
     fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>>;
     fn previous_header(&self, header: &Self::Header) -> 
Option<Self::HeaderRef<'_>>;
 
-    fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
+    fn append(&self, entry: Self::Entry) -> impl Future<Output = 
io::Result<()>>;
     fn entry(&self, header: &Self::Header) -> impl Future<Output = 
Option<Self::Entry>>;
+
+    /// Number of entries that can be appended before the journal would need
+    /// to evict un-snapshotted slots. Returns `None` for journals that don't 
persist to disk.
+    fn remaining_capacity(&self) -> Option<usize> {
+        None
+    }
+
+    /// Remove entries with ops in `ops` from the journal,
+    /// returning the removed entries sorted by op.
+    ///
+    /// Implementations that persist to disk should rewrite the underlying
+    /// storage to reclaim space. The default is a no-op for journals that
+    /// do not persist to disk.
+    ///
+    /// # Errors
+    /// Returns an I/O error if the drain fails.
+    fn drain(
+        &self,
+        _ops: RangeInclusive<u64>,
+    ) -> impl Future<Output = io::Result<Vec<Self::Entry>>> {
+        async { Ok(Vec::new()) }
+    }
 }
 
 // TODO: Move to other crate.
 pub trait Storage {
     type Buffer;
 
-    fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
-    // TODO: Get rid of the `len` usize, we need to do changes in `Simulator` 
in order to support that.
-    // Maybe we should go back to passing in the `Buffer` again, but I am not 
sure how to handle it in the `Partitions Journal`, since we use in-memory impl
-    // which extracts the buffer out of the `Vec<Message>` and we don't need 
to allocate a new buffer.
-    fn read(&self, offset: usize, len: usize) -> impl Future<Output = 
Self::Buffer>;
+    fn write_at(&self, offset: usize, buf: Self::Buffer)
+    -> impl Future<Output = io::Result<usize>>;
+
+    fn read_at(
+        &self,
+        offset: usize,
+        buffer: Self::Buffer,
+    ) -> impl Future<Output = io::Result<Self::Buffer>>;
 }
 
 pub trait JournalHandle {
diff --git a/core/journal/src/metadata_journal.rs 
b/core/journal/src/metadata_journal.rs
new file mode 100644
index 000000000..fe45e9f7b
--- /dev/null
+++ b/core/journal/src/metadata_journal.rs
@@ -0,0 +1,717 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::file_storage::FileStorage;
+use crate::{Journal, JournalHandle};
+use bytes::Bytes;
+use compio::io::AsyncWriteAtExt;
+use iggy_binary_protocol::consensus::message::Message;
+use iggy_binary_protocol::consensus::{Command2, PrepareHeader};
+use std::cell::{Cell, Ref, RefCell};
+use std::fmt;
+use std::io;
+use std::ops::RangeInclusive;
+use std::path::Path;
+
+const HEADER_SIZE: usize = size_of::<PrepareHeader>();
+
+/// Maximum allowed size for a single WAL entry (64 MiB).
+///
+/// A header with `size` exceeding this limit is treated as corrupt. This
+/// prevents a bit-flipped size field (e.g. `0xFFFF_FFFF`) from causing a
+/// multi-GiB allocation during the WAL scan.
+const MAX_ENTRY_SIZE: u64 = 64 * 1024 * 1024;
+
+/// Number of slots in the journal ring buffer.
+///
+/// Must be larger than the maximum number of entries between consecutive
+/// snapshots. If the journal wraps past this window, older un-snapshotted
+/// entries are silently evicted from the in-memory index (the WAL file
+/// still contains them, but they become unreachable for recovery).
+///
+/// **NOTE:** This number needs to be chosen in balance between number of
+/// entries in [`core::consensus::pipeline_prepare_queue_max`]. Because this 
number controls
+/// how many committed but not yet snapshotted entries that the buffer can
+/// hold. This may need to be tuned properly.
+pub(crate) const SLOT_COUNT: usize = 1024;
+
+/// Error type for journal operations.
+#[derive(Debug)]
+#[allow(clippy::module_name_repetitions)]
+pub enum JournalError {
+    Io(io::Error),
+}
+
+impl fmt::Display for JournalError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            Self::Io(e) => write!(f, "journal I/O error: {e}"),
+        }
+    }
+}
+
+impl std::error::Error for JournalError {
+    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+        match self {
+            Self::Io(e) => Some(e),
+        }
+    }
+}
+
+impl From<io::Error> for JournalError {
+    fn from(e: io::Error) -> Self {
+        Self::Io(e)
+    }
+}
+
+/// Persistent metadata journal backed by an append-only WAL file.
+///
+/// Each WAL entry is a raw `Message<PrepareHeader>`:
+/// `[PrepareHeader: 256 bytes][body: header.size - 256 bytes]`
+///
+/// The in-memory index is a fixed-size slot array indexed by
+/// `op % SLOT_COUNT`.
+pub struct MetadataJournal {
+    /// File-backed append-only WAL.
+    storage: FileStorage,
+    /// In-memory slot array of entry headers, indexed by `op % SLOT_COUNT`.
+    /// A slot is `None` if no entry occupies it (or it has been drained).
+    headers: RefCell<Vec<Option<PrepareHeader>>>,
+    /// Byte offset within the WAL file for each slot's entry, mirrors 
`headers`.
+    offsets: RefCell<Vec<Option<u64>>>,
+    /// Highest op number appended to the journal, or `None` if empty.
+    /// Used to detect forward progress and validate append ordering.
+    last_op: Cell<Option<u64>>,
+    /// Highest op that has been durably snapshotted. Entries with `op <= 
snapshot_op`
+    /// are safe to evict from the slot array. Appending an entry that would 
evict
+    /// an un-snapshotted entry (op > `snapshot_op`) panics and the upper 
layer must
+    /// take a snapshot before the journal wraps.
+    snapshot_op: Cell<u64>,
+}
+
+impl fmt::Debug for MetadataJournal {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        f.debug_struct("MetadataJournal")
+            .field("write_offset", &self.storage.file_len())
+            .field("last_op", &self.last_op.get())
+            .finish_non_exhaustive()
+    }
+}
+
+#[allow(clippy::cast_possible_truncation)]
+const fn slot_for_op(op: u64) -> usize {
+    op as usize % SLOT_COUNT
+}
+
+#[allow(clippy::cast_possible_truncation)]
+impl MetadataJournal {
+    /// Open the WAL file, scanning forward to rebuild the in-memory index.
+    ///
+    /// `snapshot_op` is the highest op that has been durably snapshotted.
+    /// It must be provided so that `append()` can detect slot collisions
+    /// that would evict un-snapshotted entries.
+    ///
+    /// If a truncated entry is found at the tail (crash during write),
+    /// the file is truncated to the last complete entry.
+    ///
+    /// # Errors
+    /// Returns `JournalError::Io` if the WAL file cannot be opened or read.
+    #[allow(clippy::future_not_send)]
+    pub async fn open(path: &Path, snapshot_op: u64) -> Result<Self, 
JournalError> {
+        let storage = FileStorage::open(path).await?;
+        let file_len = storage.file_len();
+        let mut headers: Vec<Option<PrepareHeader>> = vec![None; SLOT_COUNT];
+        let mut offsets: Vec<Option<u64>> = vec![None; SLOT_COUNT];
+        let mut last_op: Option<u64> = None;
+        let mut pos: u64 = 0;
+        let mut header_buf = vec![0u8; HEADER_SIZE];
+
+        while pos + HEADER_SIZE as u64 <= file_len {
+            // Read the 256-byte header
+            header_buf = storage.read_at(pos, header_buf).await?;
+            let header: PrepareHeader =
+                *bytemuck::checked::from_bytes::<PrepareHeader>(&header_buf);
+
+            // Validate: must be a Prepare command with sane size
+            if header.command != Command2::Prepare
+                || (header.size as usize) < HEADER_SIZE
+                || u64::from(header.size) > MAX_ENTRY_SIZE
+            {
+                // Corrupt or non-prepare entry, truncate here
+                storage.truncate(pos).await?;
+                break;
+            }
+
+            let entry_size = u64::from(header.size);
+
+            // Check if the full entry fits
+            if pos + entry_size > file_len {
+                // Truncated entry at tail
+                // This handles the case where crash happened during write and
+                // only header was written and body was not. so we truncate 
the file to the start of the entry.
+                storage.truncate(pos).await?;
+                break;
+            }
+
+            let slot = slot_for_op(header.op);
+
+            // Note: Regarding duplicate op in WAL. We rewrite it with 
whichever
+            // is the latest entry.
+            headers[slot] = Some(header);
+            offsets[slot] = Some(pos);
+
+            match last_op {
+                Some(current) if header.op > current => last_op = 
Some(header.op),
+                None => last_op = Some(header.op),
+                _ => {}
+            }
+
+            pos += entry_size;
+        }
+
+        // If there are leftover bytes less than a header, truncate them
+        if pos < storage.file_len() {
+            storage.truncate(pos).await?;
+        }
+
+        Ok(Self {
+            storage,
+            headers: RefCell::new(headers),
+            offsets: RefCell::new(offsets),
+            last_op: Cell::new(last_op),
+            snapshot_op: Cell::new(snapshot_op),
+        })
+    }
+
+    /// Return headers with `op >= from_op`, sorted by op.
+    pub fn iter_headers_from(&self, from_op: u64) -> Vec<PrepareHeader> {
+        let headers = self.headers.borrow();
+        let mut result: Vec<PrepareHeader> = headers
+            .iter()
+            .filter_map(|slot| slot.filter(|h| h.op >= from_op))
+            .collect();
+        result.sort_unstable_by_key(|h| h.op);
+        result
+    }
+
+    /// Highest op number in the index, or `None` if empty.
+    pub const fn last_op(&self) -> Option<u64> {
+        self.last_op.get()
+    }
+
+    /// Advance the snapshot watermark. The caller must ensure `op` is
+    /// monotonically increasing and corresponds to a durable snapshot.
+    ///
+    /// # Panics
+    /// Panics if `op` is less than the current snapshot watermark.
+    pub fn set_snapshot_op(&self, op: u64) {
+        assert!(
+            op >= self.snapshot_op.get(),
+            "snapshot_op must be monotonically increasing: {} -> {}",
+            self.snapshot_op.get(),
+            op
+        );
+        self.snapshot_op.set(op);
+    }
+
+    /// Access the underlying storage (for fsync in tests, etc.).
+    pub const fn storage_ref(&self) -> &FileStorage {
+        &self.storage
+    }
+
+    /// Async entry read for recovery.
+    ///
+    /// Returns `Ok(None)` if the op is not in the index.
+    ///
+    /// # Errors
+    /// Returns an I/O error if the read fails or the entry is malformed.
+    #[allow(clippy::future_not_send)]
+    pub async fn entry_at(
+        &self,
+        header: &PrepareHeader,
+    ) -> io::Result<Option<Message<PrepareHeader>>> {
+        let (offset, size) = {
+            let headers = self.headers.borrow();
+            let offsets = self.offsets.borrow();
+            let slot = slot_for_op(header.op);
+            let stored = match headers[slot].as_ref() {
+                Some(h) if h.op == header.op => h,
+                _ => return Ok(None),
+            };
+            let Some(offset) = offsets[slot] else {
+                return Ok(None);
+            };
+            (offset, stored.size as usize)
+        };
+        let buf = vec![0u8; size];
+        let buf = self.storage.read_at(offset, buf).await?;
+        let msg = Message::from_bytes(Bytes::from(buf))
+            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, 
e.to_string()))?;
+        Ok(Some(msg))
+    }
+}
+
+#[allow(
+    clippy::cast_possible_truncation,
+    clippy::cast_sign_loss,
+    clippy::future_not_send
+)]
+impl Journal<FileStorage> for MetadataJournal {
+    type Header = PrepareHeader;
+    type Entry = Message<PrepareHeader>;
+    type HeaderRef<'a> = Ref<'a, PrepareHeader>;
+
+    fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
+        let headers = self.headers.borrow();
+        Ref::filter_map(headers, |h| {
+            let slot = slot_for_op(idx as u64);
+            let header = h[slot].as_ref()?;
+            if header.op == idx as u64 {
+                Some(header)
+            } else {
+                None
+            }
+        })
+        .ok()
+    }
+
+    fn previous_header(&self, header: &Self::Header) -> 
Option<Self::HeaderRef<'_>> {
+        if header.op == 0 {
+            return None;
+        }
+        self.header((header.op - 1) as usize)
+    }
+
+    fn remaining_capacity(&self) -> Option<usize> {
+        let Some(last) = self.last_op.get() else {
+            return Some(SLOT_COUNT);
+        };
+        let snapshot = self.snapshot_op.get();
+        if last <= snapshot {
+            return Some(SLOT_COUNT);
+        }
+        let used = (last - snapshot) as usize;
+        Some(SLOT_COUNT.saturating_sub(used))
+    }
+
+    /// Remove entries with ops in `ops` from the journal,
+    /// returning the removed entries sorted by op.
+    ///
+    /// Internally advances the snapshot watermark to `end_op` so that
+    /// future appends treat drained slots as safe to overwrite. Rewrites
+    /// the WAL file keeping only entries outside the drained range.
+    async fn drain(&self, ops: RangeInclusive<u64>) -> 
io::Result<Vec<Self::Entry>> {
+        let end_op = *ops.end();
+
+        // Advance the snapshot watermark so future appends treat
+        // drained ops as safe to overwrite.
+        if end_op > self.snapshot_op.get() {
+            self.snapshot_op.set(end_op);
+        }
+
+        // Partition slots into drained and live entries.
+        let mut to_drain: Vec<(PrepareHeader, u64)> = Vec::new();
+        let mut live: Vec<(PrepareHeader, u64)> = Vec::new();
+        {
+            let headers = self.headers.borrow();
+            let offsets = self.offsets.borrow();
+            for slot in 0..SLOT_COUNT {
+                if let (Some(h), Some(off)) = (&headers[slot], offsets[slot]) {
+                    if ops.contains(&h.op) {
+                        to_drain.push((*h, off));
+                    } else {
+                        live.push((*h, off));
+                    }
+                }
+            }
+        }
+        to_drain.sort_unstable_by_key(|(h, _)| h.op);
+        live.sort_unstable_by_key(|(h, _)| h.op);
+
+        // Read drained entries from disk before rewriting the WAL.
+        let mut drained = Vec::with_capacity(to_drain.len());
+        for (header, offset) in &to_drain {
+            let buf = vec![0u8; header.size as usize];
+            let buf = self.storage.read_at(*offset, buf).await?;
+            let msg = Message::from_bytes(Bytes::from(buf))
+                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, 
e.to_string()))?;
+            drained.push(msg);
+        }
+
+        // Write live entries to a temp file.
+        let wal_path = self.storage.path();
+        let tmp_path = wal_path.with_extension("wal.tmp");
+        {
+            let mut tmp = compio::fs::File::create(&tmp_path).await?;
+            let mut write_pos: u64 = 0;
+            for (header, old_offset) in &live {
+                let size = header.size as usize;
+                let buf = vec![0u8; size];
+                let buf = self.storage.read_at(*old_offset, buf).await?;
+                let (result, _buf) = tmp.write_all_at(buf, 
write_pos).await.into();
+                result?;
+                write_pos += size as u64;
+            }
+            tmp.sync_all().await?;
+        }
+
+        // Atomic replace.
+        compio::fs::rename(&tmp_path, wal_path).await?;
+
+        // Fsync parent directory to make the rename durable.
+        if let Some(parent) = wal_path.parent() {
+            let dir = compio::fs::File::open(parent).await?;
+            dir.sync_all().await?;
+        }
+
+        // Reopen the file descriptor at the same path.
+        self.storage.reopen().await?;
+
+        // Rebuild offsets for the compacted layout and clear drained slots.
+        let mut headers = self.headers.borrow_mut();
+        let mut offsets = self.offsets.borrow_mut();
+        let mut pos: u64 = 0;
+        for (header, _) in &live {
+            let slot = slot_for_op(header.op);
+            offsets[slot] = Some(pos);
+            pos += u64::from(header.size);
+        }
+        for slot in 0..SLOT_COUNT {
+            if let Some(h) = &headers[slot]
+                && ops.contains(&h.op)
+            {
+                headers[slot] = None;
+                offsets[slot] = None;
+            }
+        }
+
+        Ok(drained)
+    }
+
+    async fn append(&self, entry: Self::Entry) -> io::Result<()> {
+        let header = *entry.header();
+        let offset = self.storage.file_len();
+
+        self.storage.write_append(entry.into_inner()).await?;
+        self.storage.fsync().await?;
+
+        let slot = slot_for_op(header.op);
+        let mut headers = self.headers.borrow_mut();
+        let mut offsets = self.offsets.borrow_mut();
+
+        if let Some(existing) = &headers[slot] {
+            assert!(
+                existing.op <= self.snapshot_op.get(),
+                "journal slot collision: appending op {} would evict op {} \
+                 which has not been snapshotted (snapshot_op={})",
+                header.op,
+                existing.op,
+                self.snapshot_op.get(),
+            );
+        }
+
+        headers[slot] = Some(header);
+        offsets[slot] = Some(offset);
+
+        match self.last_op.get() {
+            Some(current) if header.op > current => 
self.last_op.set(Some(header.op)),
+            None => self.last_op.set(Some(header.op)),
+            _ => {}
+        }
+
+        Ok(())
+    }
+
+    async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
+        let (size, offset) = {
+            let headers = self.headers.borrow();
+            let offsets = self.offsets.borrow();
+            let slot = slot_for_op(header.op);
+            let stored = headers[slot].as_ref()?;
+            if stored.op != header.op {
+                return None;
+            }
+            (stored.size as usize, offsets[slot]?)
+        };
+
+        let buffer = vec![0u8; size];
+        let buffer = self.storage.read_at(offset, buffer).await.ok()?;
+        Message::from_bytes(Bytes::from(buffer)).ok()
+    }
+}
+
+impl JournalHandle for MetadataJournal {
+    type Storage = FileStorage;
+    type Target = Self;
+
+    fn handle(&self) -> &Self::Target {
+        self
+    }
+}
+
+#[cfg(test)]
+#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
+mod tests {
+    use super::*;
+    use bytes::BytesMut;
+    use iggy_binary_protocol::consensus::Operation;
+    use tempfile::tempdir;
+
+    fn make_prepare(op: u64, body_size: usize) -> Message<PrepareHeader> {
+        let total_size = HEADER_SIZE + body_size;
+        let mut buffer = BytesMut::zeroed(total_size);
+
+        let header = bytemuck::checked::from_bytes_mut::<PrepareHeader>(&mut 
buffer[..HEADER_SIZE]);
+        header.size = total_size as u32;
+        header.command = Command2::Prepare;
+        header.op = op;
+        header.operation = Operation::CreateStream;
+
+        // Fill body with recognizable pattern
+        for (i, byte) in buffer[HEADER_SIZE..].iter_mut().enumerate() {
+            *byte = (op as u8).wrapping_add(i as u8);
+        }
+
+        Message::from_bytes(buffer.freeze()).unwrap()
+    }
+
+    #[compio::test]
+    async fn open_empty_wal() {
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("journal.wal");
+        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+        assert!(journal.last_op().is_none());
+        assert!(journal.header(0).is_none());
+    }
+
+    #[compio::test]
+    async fn append_and_read() {
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("journal.wal");
+        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+        let msg1 = make_prepare(1, 64);
+        let msg2 = make_prepare(2, 32);
+
+        journal.append(msg1.clone()).await.unwrap();
+        journal.append(msg2.clone()).await.unwrap();
+
+        assert_eq!(journal.last_op(), Some(2));
+        assert!(journal.header(1).is_some());
+        assert!(journal.header(2).is_some());
+        assert!(journal.header(3).is_none());
+
+        let entry1 = journal.entry(msg1.header()).await.unwrap();
+        assert_eq!(entry1.header().op, 1);
+        assert_eq!(entry1.body().len(), 64);
+
+        let entry2 = journal.entry(msg2.header()).await.unwrap();
+        assert_eq!(entry2.header().op, 2);
+        assert_eq!(entry2.body().len(), 32);
+    }
+
+    #[compio::test]
+    async fn reopen_rebuilds_index() {
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("journal.wal");
+
+        {
+            let journal = MetadataJournal::open(&path, 0).await.unwrap();
+            journal.append(make_prepare(1, 64)).await.unwrap();
+            journal.append(make_prepare(2, 128)).await.unwrap();
+            journal.append(make_prepare(3, 32)).await.unwrap();
+            journal.storage.fsync().await.unwrap();
+        }
+
+        // Reopen and verify index is rebuilt
+        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        assert_eq!(journal.last_op(), Some(3));
+
+        for op in 1..=3 {
+            let header = *journal.header(op).unwrap();
+            assert_eq!(header.op, op as u64);
+            let entry = journal.entry_at(&header).await.unwrap().unwrap();
+            assert_eq!(entry.header().op, op as u64);
+        }
+    }
+
+    #[compio::test]
+    async fn truncated_entry_on_reopen() {
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("journal.wal");
+
+        {
+            let journal = MetadataJournal::open(&path, 0).await.unwrap();
+            journal.append(make_prepare(1, 64)).await.unwrap();
+            journal.append(make_prepare(2, 128)).await.unwrap();
+            journal.storage.fsync().await.unwrap();
+        }
+
+        // Simulate crash: truncate the file to cut the second entry short
+        {
+            let storage = FileStorage::open(&path).await.unwrap();
+            let full_len = storage.file_len();
+            // Remove the last 10 bytes (partial second entry)
+            storage.truncate(full_len - 10).await.unwrap();
+            storage.fsync().await.unwrap();
+        }
+
+        // Reopen, should recover only the first entry
+        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+        assert_eq!(journal.last_op(), Some(1));
+        assert!(journal.header(2).is_none());
+
+        let h1 = *journal.header(1).unwrap();
+        let entry = journal.entry_at(&h1).await.unwrap().unwrap();
+        assert_eq!(entry.header().op, 1);
+    }
+
+    #[compio::test]
+    async fn iter_headers_from() {
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("journal.wal");
+        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+        journal.append(make_prepare(1, 32)).await.unwrap();
+        journal.append(make_prepare(2, 32)).await.unwrap();
+        journal.append(make_prepare(3, 32)).await.unwrap();
+        journal.append(make_prepare(5, 32)).await.unwrap();
+
+        let from_2 = journal.iter_headers_from(2);
+        assert_eq!(from_2.len(), 3);
+        assert_eq!(from_2[0].op, 2);
+        assert_eq!(from_2[1].op, 3);
+        assert_eq!(from_2[2].op, 5);
+
+        let from_4 = journal.iter_headers_from(4);
+        assert_eq!(from_4.len(), 1);
+        assert_eq!(from_4[0].op, 5);
+
+        let from_10 = journal.iter_headers_from(10);
+        assert!(from_10.is_empty());
+    }
+
+    #[compio::test]
+    async fn previous_header_navigation() {
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("journal.wal");
+        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+        journal.append(make_prepare(0, 32)).await.unwrap();
+        journal.append(make_prepare(1, 32)).await.unwrap();
+
+        let h1 = journal.header(1).unwrap();
+        let h0 = journal.previous_header(&h1).unwrap();
+        assert_eq!(h0.op, 0);
+        assert!(journal.previous_header(&h0).is_none());
+    }
+
+    #[compio::test]
+    async fn slot_wraparound_evicts_snapshotted_entry() {
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("journal.wal");
+        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+        // Op 3 goes to slot 3
+        journal.append(make_prepare(3, 32)).await.unwrap();
+        // Mark op 3 as snapshotted — safe to evict
+        journal.set_snapshot_op(3);
+        // Op 3 + SLOT_COUNT goes to the same slot, evicting op 3
+        let wraparound_op = 3 + SLOT_COUNT as u64;
+        journal
+            .append(make_prepare(wraparound_op, 32))
+            .await
+            .unwrap();
+
+        // Op 3 is evicted from the index
+        assert!(journal.header(3).is_none());
+        // The new op is present
+        assert!(journal.header(3 + SLOT_COUNT).is_some());
+        assert_eq!(journal.last_op(), Some(3 + SLOT_COUNT as u64));
+    }
+
+    #[compio::test]
+    async fn drain_shrinks_wal_and_preserves_live_entries() {
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("journal.wal");
+        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+        // Append 5 entries
+        for op in 1..=5 {
+            journal.append(make_prepare(op, 64)).await.unwrap();
+        }
+        let size_before = journal.storage.file_len();
+
+        // Drain entries 1-3
+        let drained = journal.drain(1..=3).await.unwrap();
+        assert_eq!(drained.len(), 3);
+        assert_eq!(drained[0].header().op, 1);
+        assert_eq!(drained[1].header().op, 2);
+        assert_eq!(drained[2].header().op, 3);
+
+        let size_after = journal.storage.file_len();
+        assert!(
+            size_after < size_before,
+            "WAL should shrink after drain: {size_before} -> {size_after}"
+        );
+
+        // Drained entries are gone from the index
+        for op in 1..=3 {
+            assert!(
+                journal.header(op as usize).is_none(),
+                "op {op} should be removed"
+            );
+        }
+
+        // Live entries are still readable
+        for op in 4..=5 {
+            let h = *journal.header(op as usize).unwrap();
+            assert_eq!(h.op, op);
+            let entry = journal.entry_at(&h).await.unwrap().unwrap();
+            assert_eq!(entry.header().op, op);
+            assert_eq!(entry.body().len(), 64);
+        }
+
+        // Reopen and verify the drained WAL is valid
+        drop(journal);
+        let journal = MetadataJournal::open(&path, 3).await.unwrap();
+        assert_eq!(journal.last_op(), Some(5));
+        for op in 4..=5 {
+            let h = *journal.header(op as usize).unwrap();
+            let entry = journal.entry_at(&h).await.unwrap().unwrap();
+            assert_eq!(entry.header().op, op);
+            assert_eq!(entry.body().len(), 64);
+        }
+    }
+
+    #[compio::test]
+    #[should_panic(expected = "journal slot collision")]
+    async fn append_panics_on_evicting_unsnapshotted_entry() {
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("journal.wal");
+        let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+        journal.append(make_prepare(3, 32)).await.unwrap();
+        // No snapshot taken, evicting op 3 must panic
+        let wraparound_op = 3 + SLOT_COUNT as u64;
+        journal
+            .append(make_prepare(wraparound_op, 32))
+            .await
+            .unwrap();
+    }
+}
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 3b12f52e1..81f88e7f2 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -43,6 +43,12 @@ serde = { workspace = true, features = ["derive"] }
 slab = { workspace = true }
 tracing = { workspace = true }
 
+[dev-dependencies]
+bytemuck = { workspace = true }
+bytes = { workspace = true }
+compio = { workspace = true }
+tempfile = { workspace = true }
+
 [lints.clippy]
 enum_glob_use = "deny"
 pedantic = "deny"
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 668f49766..820ae6649 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -29,7 +29,8 @@ use iggy_binary_protocol::{
 };
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
-use tracing::{debug, warn};
+use std::path::Path;
+use tracing::{debug, error, warn};
 
 #[derive(Debug, Clone)]
 #[allow(unused)]
@@ -50,6 +51,67 @@ impl IggySnapshot {
     pub const fn snapshot(&self) -> &MetadataSnapshot {
         &self.snapshot
     }
+
+    /// Persist the snapshot to disk.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError` if serialization or I/O fails.
+    pub fn persist(&self, path: &Path) -> Result<(), SnapshotError> {
+        use crate::stm::snapshot::PersistStage;
+        use std::fs;
+        use std::io::Write;
+
+        let encoded = self.encode()?;
+
+        let tmp_path = path.with_extension("bin.tmp");
+
+        let mut file = fs::File::create(&tmp_path).map_err(|e| 
SnapshotError::Persist {
+            stage: PersistStage::Write,
+            source: e,
+        })?;
+        file.write_all(&encoded)
+            .map_err(|e| SnapshotError::Persist {
+                stage: PersistStage::Write,
+                source: e,
+            })?;
+        file.sync_all().map_err(|e| SnapshotError::Persist {
+            stage: PersistStage::Sync,
+            source: e,
+        })?;
+        drop(file);
+
+        fs::rename(&tmp_path, path).map_err(|e| SnapshotError::Persist {
+            stage: PersistStage::Rename,
+            source: e,
+        })?;
+
+        // Fsync the parent directory to ensure the rename is durable.
+        if let Some(parent) = path.parent() {
+            let dir = fs::File::open(parent).map_err(|e| 
SnapshotError::Persist {
+                stage: PersistStage::DirSync,
+                source: e,
+            })?;
+            dir.sync_all().map_err(|e| SnapshotError::Persist {
+                stage: PersistStage::DirSync,
+                source: e,
+            })?;
+        }
+
+        Ok(())
+    }
+
+    /// Load a snapshot from disk.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError` if the file cannot be read or deserialization 
fails.
+    pub fn load(path: &Path) -> Result<Self, SnapshotError> {
+        let data = std::fs::read(path)?;
+
+        // TODO: when checksum is added we need to check
+        // if data.len() is atleast the size of checksum
+
+        Self::decode(data.as_slice())
+    }
 }
 
 impl Snapshot for IggySnapshot {
@@ -87,7 +149,88 @@ impl Snapshot for IggySnapshot {
     }
 }
 
-#[derive(Debug)]
+/// Coordinates snapshot creation, persistence, and WAL compaction.
+///
+/// Owns the data directory path and the snapshot creation function.
+pub struct SnapshotCoordinator<M> {
+    data_dir: std::path::PathBuf,
+    create_snapshot: fn(&M, u64) -> Result<IggySnapshot, SnapshotError>,
+}
+
+impl<M> SnapshotCoordinator<M> {
+    /// Number of remaining journal slots at which a checkpoint is forced.
+    // TODO: tune this margin size
+    const CHECKPOINT_MARGIN: usize = 64;
+
+    #[must_use]
+    pub fn new(
+        data_dir: std::path::PathBuf,
+        create_snapshot: fn(&M, u64) -> Result<IggySnapshot, SnapshotError>,
+    ) -> Self {
+        Self {
+            data_dir,
+            create_snapshot,
+        }
+    }
+
+    /// Create a snapshot, persist it, and drain snapshotted entries from the
+    /// journal to reclaim WAL space.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError` if snapshotting, persistence, or drain fails.
+    #[allow(clippy::future_not_send)]
+    pub async fn checkpoint<J>(
+        &self,
+        stm: &M,
+        journal: &J,
+        last_op: u64,
+    ) -> Result<(), SnapshotError>
+    where
+        J: JournalHandle,
+    {
+        let snapshot = (self.create_snapshot)(stm, last_op)?;
+        let path = 
self.data_dir.join(super::METADATA_DIR).join("snapshot.bin");
+        snapshot.persist(&path)?;
+
+        let _ = journal
+            .handle()
+            .drain(0..=last_op)
+            .await
+            .map_err(SnapshotError::Io)?;
+
+        Ok(())
+    }
+
+    /// Force a checkpoint if the journal is running low on capacity.
+    ///
+    /// Returns `Ok(true)` if a checkpoint was taken, `Ok(false)` if not 
needed.
+    ///
+    /// # Errors
+    /// Returns `SnapshotError` if the checkpoint fails.
+    #[allow(clippy::future_not_send)]
+    pub async fn checkpoint_if_needed<J>(
+        &self,
+        stm: &M,
+        journal: &J,
+        commit_op: u64,
+    ) -> Result<bool, SnapshotError>
+    where
+        J: JournalHandle,
+    {
+        let needs_checkpoint = journal
+            .handle()
+            .remaining_capacity()
+            .is_some_and(|c| c <= Self::CHECKPOINT_MARGIN);
+
+        if needs_checkpoint {
+            self.checkpoint(stm, journal, commit_op).await?;
+            Ok(true)
+        } else {
+            Ok(false)
+        }
+    }
+}
+
 pub struct IggyMetadata<C, J, S, M> {
     /// Some on shard0, None on other shards
     pub consensus: Option<C>,
@@ -97,6 +240,36 @@ pub struct IggyMetadata<C, J, S, M> {
     pub snapshot: Option<S>,
     /// State machine - lives on all shards
     pub mux_stm: M,
+    /// Snapshot coordinator - present when persistent checkpointing is 
configured.
+    pub coordinator: Option<SnapshotCoordinator<M>>,
+}
+
+impl<C, J, S, M> IggyMetadata<C, J, S, M>
+where
+    M: FillSnapshot<MetadataSnapshot>,
+{
+    /// Create a new `IggyMetadata` instance.
+    ///
+    /// The `FillSnapshot<MetadataSnapshot>` bound is captured here via a
+    /// function pointer so that no downstream caller needs the bound.
+    #[must_use]
+    pub fn new(
+        consensus: Option<C>,
+        journal: Option<J>,
+        snapshot: Option<S>,
+        mux_stm: M,
+        data_dir: Option<std::path::PathBuf>,
+    ) -> Self {
+        let coordinator = data_dir
+            .map(|dir| SnapshotCoordinator::new(dir, |stm, seq| 
IggySnapshot::create(stm, seq)));
+        Self {
+            consensus,
+            journal,
+            snapshot,
+            mux_stm,
+            coordinator,
+        }
+    }
 }
 
 #[allow(clippy::future_not_send)]
@@ -149,10 +322,32 @@ where
 
         // TODO add assertions for valid state here.
 
-        // TODO verify that the current prepare fits in the WAL.
-
         // TODO handle gap in ops.
 
+        // Force a checkpoint if the journal is running low on capacity.
+        if let Some(coordinator) = &self.coordinator {
+            let snap_op = consensus.commit();
+            match coordinator
+                .checkpoint_if_needed(&self.mux_stm, journal, snap_op)
+                .await
+            {
+                Ok(true) => {
+                    debug!(
+                        replica = consensus.replica(),
+                        "on_replicate: forced checkpoint at op={snap_op}"
+                    );
+                }
+                Ok(false) => {}
+                Err(e) => {
+                    error!(
+                        replica = consensus.replica(),
+                        "on_replicate: forced checkpoint failed: {e}"
+                    );
+                    return;
+                }
+            }
+        }
+
         // Verify hash chain integrity.
         if let Some(previous) = journal.handle().previous_header(header) {
             panic_if_hash_chain_would_break_in_same_view(&previous, header);
@@ -164,7 +359,13 @@ where
         consensus.set_last_prepare_checksum(header.checksum);
 
         // Append to journal.
-        journal.handle().append(message.clone()).await;
+        if let Err(e) = journal.handle().append(message.clone()).await {
+            error!(
+                replica = consensus.replica(),
+                "on_replicate: journal append failed: {e}"
+            );
+            return;
+        }
 
         // After successful journal write, send prepare_ok to primary.
         self.send_prepare_ok(header).await;
diff --git a/core/metadata/src/impls/mod.rs b/core/metadata/src/impls/mod.rs
index 1fb71a3cf..e076346e9 100644
--- a/core/metadata/src/impls/mod.rs
+++ b/core/metadata/src/impls/mod.rs
@@ -15,3 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 pub mod metadata;
+pub mod recovery;
+
+/// Subdirectory under the data root where metadata state is stored.
+pub const METADATA_DIR: &str = "metadata";
diff --git a/core/metadata/src/impls/recovery.rs 
b/core/metadata/src/impls/recovery.rs
new file mode 100644
index 000000000..ef461f18b
--- /dev/null
+++ b/core/metadata/src/impls/recovery.rs
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::impls::metadata::IggySnapshot;
+use crate::stm::StateMachine;
+use crate::stm::snapshot::{MetadataSnapshot, RestoreSnapshot, Snapshot, 
SnapshotError};
+use iggy_binary_protocol::consensus::PrepareHeader;
+use iggy_binary_protocol::consensus::message::Message;
+use iggy_common::IggyError;
+use journal::metadata_journal::{JournalError, MetadataJournal};
+use std::fmt;
+use std::path::Path;
+
+/// Error type for metadata recovery.
+#[derive(Debug)]
+pub enum RecoveryError {
+    Snapshot(SnapshotError),
+    Journal(JournalError),
+    StateMachine(IggyError),
+    Io(std::io::Error),
+}
+
+impl fmt::Display for RecoveryError {
+    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+        match self {
+            Self::Snapshot(e) => write!(f, "recovery snapshot error: {e}"),
+            Self::Journal(e) => write!(f, "recovery journal error: {e}"),
+            Self::StateMachine(e) => write!(f, "recovery state machine error: 
{e}"),
+            Self::Io(e) => write!(f, "recovery I/O error: {e}"),
+        }
+    }
+}
+
+impl std::error::Error for RecoveryError {
+    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+        match self {
+            Self::Snapshot(e) => Some(e),
+            Self::Journal(e) => Some(e),
+            Self::StateMachine(e) => Some(e),
+            Self::Io(e) => Some(e),
+        }
+    }
+}
+
+impl From<SnapshotError> for RecoveryError {
+    fn from(e: SnapshotError) -> Self {
+        Self::Snapshot(e)
+    }
+}
+
+impl From<JournalError> for RecoveryError {
+    fn from(e: JournalError) -> Self {
+        Self::Journal(e)
+    }
+}
+
+impl From<IggyError> for RecoveryError {
+    fn from(e: IggyError) -> Self {
+        Self::StateMachine(e)
+    }
+}
+
+impl From<std::io::Error> for RecoveryError {
+    fn from(e: std::io::Error) -> Self {
+        Self::Io(e)
+    }
+}
+
+/// Result of a successful metadata recovery.
+pub struct RecoveredMetadata<M> {
+    pub journal: MetadataJournal,
+    pub snapshot: IggySnapshot,
+    pub mux_stm: M,
+    /// `None` means no snapshot existed and no journal entries were replayed.
+    /// `Some(op)` is the highest op applied, either from the snapshot or 
journal replay.
+    pub last_applied_op: Option<u64>,
+}
+
+/// Recover metadata state from disk.
+///
+/// 1. Load snapshot from `{data_dir}/metadata/snapshot.bin` (or empty default)
+/// 2. Restore state machine from snapshot
+/// 3. Open WAL at `{data_dir}/metadata/journal.wal`, scan and rebuild index
+/// 4. Replay journal entries from `snapshot.sequence_number + 1` through the 
state machine
+/// 5. Return the assembled `RecoveredMetadata`
+///
+/// # Errors
+/// Returns `RecoveryError` if snapshot loading, journal opening, or replay 
fails.
+#[allow(clippy::future_not_send)]
+pub async fn recover<M>(data_dir: &Path) -> Result<RecoveredMetadata<M>, 
RecoveryError>
+where
+    M: StateMachine<Input = Message<PrepareHeader>, Error = IggyError>
+        + RestoreSnapshot<MetadataSnapshot>,
+{
+    let metadata_dir = data_dir.join(super::METADATA_DIR);
+    std::fs::create_dir_all(&metadata_dir)?;
+
+    // 1. Load snapshot (or empty default if missing)
+    let snapshot_path = metadata_dir.join("snapshot.bin");
+    let (snapshot, replay_from) = if snapshot_path.exists() {
+        let s = IggySnapshot::load(&snapshot_path)?;
+        let from = s.sequence_number() + 1;
+        (s, from)
+    } else {
+        // No snapshot, replay from op 0.
+        (IggySnapshot::new(0), 0)
+    };
+
+    // 2. Restore state machine from snapshot
+    let mux_stm = M::restore_snapshot(snapshot.snapshot())?;
+
+    // 3. Open journal, scan the WAL and build index
+    let journal_path = metadata_dir.join("journal.wal");
+    let journal = MetadataJournal::open(&journal_path, 
snapshot.sequence_number()).await?;
+
+    // 4. Replay journal entries after snapshot
+    let headers_to_replay = journal.iter_headers_from(replay_from);
+
+    let mut last_applied_op: Option<u64> = None;
+    for header in &headers_to_replay {
+        // TODO: Check hash chain integrity against `previous_header`. On a
+        // same-view break, stop replay here and mark the remaining entries for
+        // repair via VSR instead of panicking.
+
+        let entry = journal.entry_at(header).await?.ok_or_else(|| {
+            RecoveryError::Io(std::io::Error::new(
+                std::io::ErrorKind::InvalidData,
+                format!("failed to read journal entry for op={}", header.op),
+            ))
+        })?;
+        mux_stm.update(entry)?;
+        last_applied_op = Some(header.op);
+    }
+
+    Ok(RecoveredMetadata {
+        journal,
+        snapshot,
+        mux_stm,
+        last_applied_op,
+    })
+}
+
+#[cfg(test)]
+#[allow(clippy::cast_possible_truncation)]
+mod tests {
+    use super::*;
+    use bytes::BytesMut;
+    use iggy_binary_protocol::consensus::{Command2, Operation};
+    use journal::Journal;
+    use tempfile::tempdir;
+
+    use crate::MuxStateMachine;
+
+    type TestStm = MuxStateMachine<()>;
+
+    const HEADER_SIZE: usize = size_of::<PrepareHeader>();
+
+    fn make_prepare(op: u64, body_size: usize) -> Message<PrepareHeader> {
+        let total_size = HEADER_SIZE + body_size;
+        let mut buffer = BytesMut::zeroed(total_size);
+        let header = bytemuck::checked::from_bytes_mut::<PrepareHeader>(&mut 
buffer[..HEADER_SIZE]);
+        header.size = total_size as u32;
+        header.command = Command2::Prepare;
+        header.op = op;
+        header.operation = Operation::CreateStream;
+        Message::from_bytes(buffer.freeze()).unwrap()
+    }
+
+    #[compio::test]
+    async fn recover_empty_state() {
+        let dir = tempdir().unwrap();
+        let recovered = recover::<TestStm>(dir.path()).await.unwrap();
+
+        assert_eq!(recovered.last_applied_op, None);
+        assert!(recovered.journal.last_op().is_none());
+    }
+
+    #[compio::test]
+    async fn recover_snapshot_only() {
+        let dir = tempdir().unwrap();
+        let metadata_dir = dir.path().join("metadata");
+        std::fs::create_dir_all(&metadata_dir).unwrap();
+
+        let snapshot = IggySnapshot::new(42);
+        snapshot
+            .persist(&metadata_dir.join("snapshot.bin"))
+            .unwrap();
+
+        let recovered = recover::<TestStm>(dir.path()).await.unwrap();
+        assert_eq!(recovered.snapshot.sequence_number(), 42);
+        assert_eq!(recovered.last_applied_op, None);
+    }
+
+    #[compio::test]
+    async fn recover_journal_only() {
+        let dir = tempdir().unwrap();
+        let metadata_dir = dir.path().join("metadata");
+        std::fs::create_dir_all(&metadata_dir).unwrap();
+
+        {
+            let journal = 
MetadataJournal::open(&metadata_dir.join("journal.wal"), 0)
+                .await
+                .unwrap();
+            journal.append(make_prepare(1, 32)).await.unwrap();
+            journal.append(make_prepare(2, 32)).await.unwrap();
+            journal.append(make_prepare(3, 32)).await.unwrap();
+            journal.storage_ref().fsync().await.unwrap();
+        }
+
+        let recovered = recover::<TestStm>(dir.path()).await.unwrap();
+        assert_eq!(recovered.last_applied_op, Some(3));
+        assert_eq!(recovered.journal.last_op(), Some(3));
+    }
+
+    #[compio::test]
+    async fn recover_snapshot_plus_journal() {
+        let dir = tempdir().unwrap();
+        let metadata_dir = dir.path().join("metadata");
+        std::fs::create_dir_all(&metadata_dir).unwrap();
+
+        // Snapshot at op 5
+        let snapshot = IggySnapshot::new(5);
+        snapshot
+            .persist(&metadata_dir.join("snapshot.bin"))
+            .unwrap();
+
+        // WAL has ops 1-10
+        {
+            let journal = 
MetadataJournal::open(&metadata_dir.join("journal.wal"), 0)
+                .await
+                .unwrap();
+            for op in 1..=10 {
+                journal.append(make_prepare(op, 32)).await.unwrap();
+            }
+            journal.storage_ref().fsync().await.unwrap();
+        }
+
+        let recovered = recover::<TestStm>(dir.path()).await.unwrap();
+        // Should replay ops 6-10 (snapshot was at 5)
+        assert_eq!(recovered.last_applied_op, Some(10));
+        assert_eq!(recovered.snapshot.sequence_number(), 5);
+    }
+
+    #[test]
+    fn snapshot_persist_load_roundtrip() {
+        let dir = tempdir().unwrap();
+        let path = dir.path().join("snapshot.bin");
+
+        let snapshot = IggySnapshot::new(99);
+        snapshot.persist(&path).unwrap();
+
+        let loaded = IggySnapshot::load(&path).unwrap();
+        assert_eq!(loaded.sequence_number(), 99);
+    }
+}
diff --git a/core/metadata/src/stm/snapshot.rs 
b/core/metadata/src/stm/snapshot.rs
index 6daadc03a..b3f2ffe7c 100644
--- a/core/metadata/src/stm/snapshot.rs
+++ b/core/metadata/src/stm/snapshot.rs
@@ -28,6 +28,36 @@ pub enum SnapshotError {
     Serialize(rmp_serde::encode::Error),
     /// Deserialization failed.
     Deserialize(rmp_serde::decode::Error),
+    /// I/O error during snapshot persist/load.
+    Io(std::io::Error),
+    /// I/O error during a specific stage of snapshot persistence.
+    /// The caller can inspect the stage to decide whether to retry
+    /// (e.g. `Rename`) or start from scratch (e.g. `Write`, `Sync`).
+    Persist {
+        stage: PersistStage,
+        source: std::io::Error,
+    },
+    /// Checksum mismatch on snapshot load.
+    ChecksumMismatch { expected: u32, actual: u32 },
+    /// Snapshot file is too short to contain a valid checksum.
+    Truncated { size: u64 },
+}
+
+/// Stage at which snapshot persistence failed.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum PersistStage {
+    /// Creating or writing the temp file failed. The temp file may contain
+    /// partial data. Safe to delete and retry from scratch.
+    Write,
+    /// Fsync of the temp file failed. The data may not be durable.
+    /// Safe to delete the temp file and retry from scratch.
+    Sync,
+    /// Atomic rename of temp -> final path failed. The temp file contains
+    /// a complete, synced snapshot. Safe to retry just the rename.
+    Rename,
+    /// Fsync of the parent directory after rename failed. The rename
+    /// succeeded but may not be durable. Safe to retry just the dir sync.
+    DirSync,
 }
 
 impl fmt::Display for SnapshotError {
@@ -35,6 +65,22 @@ impl fmt::Display for SnapshotError {
         match self {
             Self::Serialize(e) => write!(f, "snapshot serialization failed: 
{e}"),
             Self::Deserialize(e) => write!(f, "snapshot deserialization 
failed: {e}"),
+            Self::Io(e) => write!(f, "snapshot I/O error: {e}"),
+            Self::Persist { stage, source } => {
+                write!(f, "snapshot persist failed at {stage:?} stage: 
{source}")
+            }
+            Self::ChecksumMismatch { expected, actual } => {
+                write!(
+                    f,
+                    "snapshot checksum mismatch: expected {expected:#010x}, 
actual {actual:#010x}"
+                )
+            }
+            Self::Truncated { size } => {
+                write!(
+                    f,
+                    "snapshot file truncated: {size} bytes (too short for 
checksum)"
+                )
+            }
         }
     }
 }
@@ -44,10 +90,18 @@ impl std::error::Error for SnapshotError {
         match self {
             Self::Serialize(e) => Some(e),
             Self::Deserialize(e) => Some(e),
+            Self::Io(e) | Self::Persist { source: e, .. } => Some(e),
+            Self::ChecksumMismatch { .. } | Self::Truncated { .. } => None,
         }
     }
 }
 
+impl From<std::io::Error> for SnapshotError {
+    fn from(e: std::io::Error) -> Self {
+        Self::Io(e)
+    }
+}
+
 /// The snapshot container for all metadata state machines.
 /// Each field corresponds to one state machine's serialized state.
 #[derive(Debug, Clone, Serialize, Deserialize)]
diff --git a/core/partitions/src/iggy_partition.rs 
b/core/partitions/src/iggy_partition.rs
index 393e59fc1..31bc3ba53 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -163,7 +163,11 @@ impl Partition for IggyPartition {
         }
 
         let message = Self::prepare_message_from_batch(header, &batch);
-        journal.inner.append(message).await;
+        journal
+            .inner
+            .append(message)
+            .await
+            .map_err(|e| IggyError::IoError(e.to_string()))?;
 
         Ok(AppendResult::new(
             dirty_offset,
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index 5be236975..c21cc8d60 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -24,7 +24,23 @@ use std::{
     collections::{BTreeMap, HashMap},
 };
 
-const ZERO_LEN: usize = 0;
+// TODO: Fix that, we need to figure out how to store the 
`IggyMessagesBatchSet`.
+/// No-op storage backend for the in-memory partition journal.
+#[allow(dead_code)]
+#[derive(Debug)]
+pub struct Noop;
+
+impl Storage for Noop {
+    type Buffer = ();
+
+    async fn write_at(&self, _offset: usize, _buf: ()) -> 
std::io::Result<usize> {
+        Ok(0)
+    }
+
+    async fn read_at(&self, _offset: usize, _buffer: ()) -> 
std::io::Result<()> {
+        Ok(())
+    }
+}
 
 /// Lookup key for querying messages from the journal.
 #[derive(Debug, Clone, Copy)]
@@ -65,7 +81,7 @@ pub struct PartitionJournalMemStorage {
 impl Storage for PartitionJournalMemStorage {
     type Buffer = Bytes;
 
-    async fn write(&self, buf: Self::Buffer) -> usize {
+    async fn write_at(&self, _offset: usize, buf: Self::Buffer) -> 
std::io::Result<usize> {
         let len = buf.len();
         let entries = unsafe { &mut *self.entries.get() };
         let offset_to_index = unsafe { &mut *self.offset_to_index.get() };
@@ -78,17 +94,17 @@ impl Storage for PartitionJournalMemStorage {
         let write_offset = *current_offset;
         *current_offset += len;
 
-        write_offset
+        Ok(write_offset)
     }
 
-    async fn read(&self, offset: usize, _len: usize) -> Self::Buffer {
+    async fn read_at(&self, offset: usize, _buffer: Self::Buffer) -> 
std::io::Result<Self::Buffer> {
         let offset_to_index = unsafe { &*self.offset_to_index.get() };
         let Some(&index) = offset_to_index.get(&offset) else {
-            return Bytes::new();
+            return Ok(Bytes::new());
         };
 
         let entries = unsafe { &*self.entries.get() };
-        entries.get(index).cloned().unwrap_or_default()
+        Ok(entries.get(index).cloned().unwrap_or_default())
     }
 }
 
@@ -244,7 +260,11 @@ where
 
         let bytes = {
             let inner = unsafe { &*self.inner.get() };
-            inner.storage.read(storage_offset, ZERO_LEN).await
+            inner
+                .storage
+                .read_at(storage_offset, Bytes::new())
+                .await
+                .unwrap_or_default()
         };
 
         if bytes.is_empty() {
@@ -287,7 +307,11 @@ where
 
             let bytes = {
                 let inner = unsafe { &*self.inner.get() };
-                inner.storage.read(storage_offset, ZERO_LEN).await
+                inner
+                    .storage
+                    .read_at(storage_offset, Bytes::new())
+                    .await
+                    .unwrap_or_default()
             };
 
             if bytes.is_empty() {
@@ -331,7 +355,7 @@ where
         headers.iter().find(|candidate| candidate.op == prev_op)
     }
 
-    async fn append(&self, entry: Self::Entry) {
+    async fn append(&self, entry: Self::Entry) -> std::io::Result<()> {
         let first_offset_and_timestamp = Self::message_to_batch(&entry)
             .and_then(|batch| Some((batch.first_offset()?, 
batch.first_timestamp()?)));
 
@@ -346,7 +370,7 @@ where
         let bytes = entry.into_inner();
         let storage_offset = {
             let inner = unsafe { &*self.inner.get() };
-            inner.storage.write(bytes).await
+            inner.storage.write_at(0, bytes).await?
         };
 
         {
@@ -361,6 +385,8 @@ where
             let timestamp_to_op = unsafe { &mut *self.timestamp_to_op.get() };
             timestamp_to_op.insert(timestamp, op);
         }
+
+        Ok(())
     }
 
     async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
diff --git a/core/partitions/src/log.rs b/core/partitions/src/log.rs
index 7d88f60c2..e3798a10e 100644
--- a/core/partitions/src/log.rs
+++ b/core/partitions/src/log.rs
@@ -69,7 +69,7 @@ where
         self.inner.previous_header(header)
     }
 
-    fn append(&self, entry: Self::Entry) -> impl Future<Output = ()> {
+    fn append(&self, entry: Self::Entry) -> impl Future<Output = 
std::io::Result<()>> {
         self.inner.append(entry)
     }
 
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 76fdd1ae6..e6ca06b89 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -30,28 +30,34 @@ use std::collections::HashMap;
 #[derive(Debug, Default)]
 pub struct MemStorage {
     data: RefCell<Vec<u8>>,
-    offset: Cell<u64>,
 }
 
 #[allow(clippy::future_not_send)]
 impl Storage for MemStorage {
     type Buffer = Vec<u8>;
 
-    async fn write(&self, buf: Self::Buffer) -> usize {
+    async fn write_at(&self, offset: usize, buf: Self::Buffer) -> 
std::io::Result<usize> {
         let len = buf.len();
-        self.data.borrow_mut().extend_from_slice(&buf);
-        self.offset.set(self.offset.get() + len as u64);
-        len
+        let mut data = self.data.borrow_mut();
+        let end = offset + len;
+        if end > data.len() {
+            data.resize(end, 0);
+        }
+        data[offset..end].copy_from_slice(&buf);
+        Ok(len)
     }
 
-    async fn read(&self, offset: usize, len: usize) -> Self::Buffer {
-        let mut buffer = vec![0; len];
+    async fn read_at(
+        &self,
+        offset: usize,
+        mut buffer: Self::Buffer,
+    ) -> std::io::Result<Self::Buffer> {
         let data = self.data.borrow();
         let end = offset + buffer.len();
         if offset < data.len() && end <= data.len() {
             buffer[..].copy_from_slice(&data[offset..end]);
         }
-        buffer
+        Ok(buffer)
     }
 }
 
@@ -106,7 +112,8 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for 
SimJournal<S> {
         let header = headers.get(&header.op)?;
         let offset = *offsets.get(&header.op)?;
 
-        let buffer = self.storage.read(offset, header.size as usize).await;
+        let buffer = vec![0; header.size as usize];
+        let buffer = self.storage.read_at(offset, buffer).await.ok()?;
         let message =
             Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes 
should be valid");
         Some(message)
@@ -119,16 +126,20 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for 
SimJournal<S> {
         unsafe { &*self.headers.get() }.get(&(header.op - 1))
     }
 
-    async fn append(&self, entry: Self::Entry) {
+    async fn append(&self, entry: Self::Entry) -> std::io::Result<()> {
         let header = *entry.header();
         let message_bytes = entry.as_bytes();
 
-        let bytes_written = self.storage.write(message_bytes.to_vec()).await;
+        let bytes_written = self
+            .storage
+            .write_at(self.write_offset.get(), message_bytes.to_vec())
+            .await?;
 
         let offset = self.write_offset.get();
         unsafe { &mut *self.headers.get() }.insert(header.op, header);
         unsafe { &mut *self.offsets.get() }.insert(header.op, offset);
         self.write_offset.set(offset + bytes_written);
+        Ok(())
     }
 
     fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 5711da0ce..fd5891956 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -53,12 +53,13 @@ pub fn new_replica(id: u8, name: String, bus: &Arc<MemBus>, 
replica_count: u8) -
     );
     metadata_consensus.init();
 
-    let metadata = IggyMetadata {
-        consensus: Some(metadata_consensus),
-        journal: Some(SimJournal::<MemStorage>::default()),
-        snapshot: Some(SimSnapshot::default()),
-        mux_stm: mux,
-    };
+    let metadata = IggyMetadata::new(
+        Some(metadata_consensus),
+        Some(SimJournal::<MemStorage>::default()),
+        Some(SimSnapshot::default()),
+        mux,
+        None,
+    );
 
     let partitions_config = PartitionsConfig {
         messages_required_to_save: 1000,

Reply via email to