This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new b55d065d8 feat(metadata): Persistent WAL journal with recovery and
compaction (#2916)
b55d065d8 is described below
commit b55d065d80c90a27d9102c3b781a80662de9250e
Author: Krishna Vishal <[email protected]>
AuthorDate: Thu Mar 26 23:41:39 2026 +0530
feat(metadata): Persistent WAL journal with recovery and compaction (#2916)
---
Cargo.lock | 10 +
core/journal/Cargo.toml | 7 +
core/journal/src/file_storage.rs | 149 +++++++
core/journal/src/lib.rs | 43 +-
core/journal/src/metadata_journal.rs | 717 ++++++++++++++++++++++++++++++++++
core/metadata/Cargo.toml | 6 +
core/metadata/src/impls/metadata.rs | 211 +++++++++-
core/metadata/src/impls/mod.rs | 4 +
core/metadata/src/impls/recovery.rs | 269 +++++++++++++
core/metadata/src/stm/snapshot.rs | 54 +++
core/partitions/src/iggy_partition.rs | 6 +-
core/partitions/src/journal.rs | 46 ++-
core/partitions/src/log.rs | 2 +-
core/simulator/src/deps.rs | 33 +-
core/simulator/src/replica.rs | 13 +-
15 files changed, 1529 insertions(+), 41 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
index 827bef7a4..d43b9a2db 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -6046,6 +6046,13 @@ dependencies = [
[[package]]
name = "journal"
version = "0.1.0"
+dependencies = [
+ "bytemuck",
+ "bytes",
+ "compio",
+ "iggy_binary_protocol",
+ "tempfile",
+]
[[package]]
name = "js-sys"
@@ -6657,7 +6664,9 @@ name = "metadata"
version = "0.1.0"
dependencies = [
"ahash 0.8.12",
+ "bytemuck",
"bytes",
+ "compio",
"consensus",
"iggy_binary_protocol",
"iggy_common",
@@ -6669,6 +6678,7 @@ dependencies = [
"secrecy",
"serde",
"slab",
+ "tempfile",
"tracing",
]
diff --git a/core/journal/Cargo.toml b/core/journal/Cargo.toml
index 32b46aff4..bd3a0e520 100644
--- a/core/journal/Cargo.toml
+++ b/core/journal/Cargo.toml
@@ -28,6 +28,13 @@ repository = "https://github.com/apache/iggy"
readme = "../../../README.md"
[dependencies]
+bytemuck = { workspace = true }
+bytes = { workspace = true }
+compio = { workspace = true }
+iggy_binary_protocol = { workspace = true }
+
+[dev-dependencies]
+tempfile = { workspace = true }
[lints.clippy]
enum_glob_use = "deny"
diff --git a/core/journal/src/file_storage.rs b/core/journal/src/file_storage.rs
new file mode 100644
index 000000000..6517f5b90
--- /dev/null
+++ b/core/journal/src/file_storage.rs
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::Storage;
+use compio::buf::IoBuf;
+use compio::io::{AsyncReadAtExt, AsyncWriteAtExt};
+use std::cell::{Cell, UnsafeCell};
+use std::io;
+use std::path::{Path, PathBuf};
+
+/// File-backed storage implementing the `Storage` trait.
+pub struct FileStorage {
+ file: UnsafeCell<compio::fs::File>,
+ write_offset: Cell<u64>,
+ path: PathBuf,
+}
+
+#[allow(clippy::future_not_send)]
+impl FileStorage {
+ /// Open or create the file at `path`, setting `write_offset` to current
file length.
+ ///
+ /// # Errors
+ /// Returns an I/O error if the file cannot be opened or created.
+ pub async fn open(path: &Path) -> io::Result<Self> {
+ let file = compio::fs::OpenOptions::new()
+ .read(true)
+ .write(true)
+ .create(true)
+ .truncate(false)
+ .open(path)
+ .await?;
+ let len = file.metadata().await?.len();
+ Ok(Self {
+ file: UnsafeCell::new(file),
+ write_offset: Cell::new(len),
+ path: path.to_path_buf(),
+ })
+ }
+
+ /// Current file size (tracks append position).
+ pub const fn file_len(&self) -> u64 {
+ self.write_offset.get()
+ }
+
+ /// Truncate the file to `len` bytes.
+ ///
+ /// # Errors
+ /// Returns an I/O error if truncation fails.
+ pub async fn truncate(&self, len: u64) -> io::Result<()> {
+ let file = unsafe { &*self.file.get() };
+ file.set_len(len).await?;
+ self.write_offset.set(len);
+ Ok(())
+ }
+
+ /// Fsync the file to disk.
+ ///
+ /// # Errors
+ /// Returns an I/O error if sync fails.
+ pub async fn fsync(&self) -> io::Result<()> {
+ // SAFETY: single-threaded compio runtime, no concurrent access to the
file.
+ unsafe { &*self.file.get() }.sync_data().await
+ }
+
+ /// Positional read into `buf`. Returns the buffer with data filled in.
+ ///
+ /// # Errors
+ /// Returns an I/O error if the read fails.
+ pub async fn read_at(&self, offset: u64, buf: Vec<u8>) ->
io::Result<Vec<u8>> {
+ // SAFETY: single-threaded compio runtime, no concurrent access to the
file.
+ let file = unsafe { &*self.file.get() };
+ let (result, buf) = file.read_exact_at(buf, offset).await.into();
+ result?;
+ Ok(buf)
+ }
+
+ /// Append write, returns bytes written.
+ ///
+ /// # Errors
+ /// Returns an I/O error if the write fails.
+ #[allow(clippy::cast_possible_truncation)]
+ pub async fn write_append<B: IoBuf>(&self, buf: B) -> io::Result<usize> {
+ let len = buf.buf_len();
+ // SAFETY: single-threaded compio runtime, no concurrent access to the
file.
+ let file = unsafe { &mut *self.file.get() };
+ let (result, _buf) = file.write_all_at(buf,
self.write_offset.get()).await.into();
+ result?;
+ self.write_offset.set(self.write_offset.get() + len as u64);
+ Ok(len)
+ }
+
+ /// The file path this storage was opened with.
+ pub fn path(&self) -> &Path {
+ &self.path
+ }
+
+ /// Reopen the underlying file descriptor at the stored path.
+ ///
+ /// Used after an atomic rename replaces the file on disk.
+ ///
+ /// # Errors
+ /// Returns an I/O error if the file cannot be reopened.
+ pub async fn reopen(&self) -> io::Result<()> {
+ let file = compio::fs::OpenOptions::new()
+ .read(true)
+ .write(true)
+ .open(&self.path)
+ .await?;
+ let len = file.metadata().await?.len();
+ // SAFETY: single-threaded compio runtime, no concurrent access to the
file.
+ unsafe { *self.file.get() = file };
+ self.write_offset.set(len);
+ Ok(())
+ }
+}
+
+#[allow(clippy::future_not_send)]
+impl Storage for FileStorage {
+ type Buffer = Vec<u8>;
+
+ async fn write_at(&self, offset: usize, buf: Self::Buffer) ->
io::Result<usize> {
+ let len = buf.buf_len();
+ let file = unsafe { &mut *self.file.get() };
+ let (result, _buf) = file.write_all_at(buf, offset as
u64).await.into();
+ result?;
+ Ok(len)
+ }
+
+ async fn read_at(&self, offset: usize, buffer: Self::Buffer) ->
io::Result<Self::Buffer> {
+ let file = unsafe { &*self.file.get() };
+ let (result, buffer) = file.read_exact_at(buffer, offset as
u64).await.into();
+ result?;
+ Ok(buffer)
+ }
+}
diff --git a/core/journal/src/lib.rs b/core/journal/src/lib.rs
index 55a6b14fb..9c610f187 100644
--- a/core/journal/src/lib.rs
+++ b/core/journal/src/lib.rs
@@ -15,7 +15,11 @@
// specific language governing permissions and limitations
// under the License.
-use std::ops::Deref;
+use std::io;
+use std::ops::{Deref, RangeInclusive};
+
+pub mod file_storage;
+pub mod metadata_journal;
pub trait Journal<S>
where
@@ -30,19 +34,44 @@ where
fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>>;
fn previous_header(&self, header: &Self::Header) ->
Option<Self::HeaderRef<'_>>;
- fn append(&self, entry: Self::Entry) -> impl Future<Output = ()>;
+ fn append(&self, entry: Self::Entry) -> impl Future<Output =
io::Result<()>>;
fn entry(&self, header: &Self::Header) -> impl Future<Output =
Option<Self::Entry>>;
+
+ /// Number of entries that can be appended before the journal would need
+ /// to evict un-snapshotted slots. Returns `None` for journals that don't
persist to disk.
+ fn remaining_capacity(&self) -> Option<usize> {
+ None
+ }
+
+ /// Remove entries with ops in `ops` from the journal,
+ /// returning the removed entries sorted by op.
+ ///
+ /// Implementations that persist to disk should rewrite the underlying
+ /// storage to reclaim space. The default is a no-op for journals that
+ /// do not persist to disk.
+ ///
+ /// # Errors
+ /// Returns an I/O error if the drain fails.
+ fn drain(
+ &self,
+ _ops: RangeInclusive<u64>,
+ ) -> impl Future<Output = io::Result<Vec<Self::Entry>>> {
+ async { Ok(Vec::new()) }
+ }
}
// TODO: Move to other crate.
pub trait Storage {
type Buffer;
- fn write(&self, buf: Self::Buffer) -> impl Future<Output = usize>;
- // TODO: Get rid of the `len` usize, we need to do changes in `Simulator`
in order to support that.
- // Maybe we should go back to passing in the `Buffer` again, but I am not
sure how to handle it in the `Partitions Journal`, since we use in-memory impl
- // which extracts the buffer out of the `Vec<Message>` and we don't need
to allocate a new buffer.
- fn read(&self, offset: usize, len: usize) -> impl Future<Output =
Self::Buffer>;
+ fn write_at(&self, offset: usize, buf: Self::Buffer)
+ -> impl Future<Output = io::Result<usize>>;
+
+ fn read_at(
+ &self,
+ offset: usize,
+ buffer: Self::Buffer,
+ ) -> impl Future<Output = io::Result<Self::Buffer>>;
}
pub trait JournalHandle {
diff --git a/core/journal/src/metadata_journal.rs
b/core/journal/src/metadata_journal.rs
new file mode 100644
index 000000000..fe45e9f7b
--- /dev/null
+++ b/core/journal/src/metadata_journal.rs
@@ -0,0 +1,717 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::file_storage::FileStorage;
+use crate::{Journal, JournalHandle};
+use bytes::Bytes;
+use compio::io::AsyncWriteAtExt;
+use iggy_binary_protocol::consensus::message::Message;
+use iggy_binary_protocol::consensus::{Command2, PrepareHeader};
+use std::cell::{Cell, Ref, RefCell};
+use std::fmt;
+use std::io;
+use std::ops::RangeInclusive;
+use std::path::Path;
+
+const HEADER_SIZE: usize = size_of::<PrepareHeader>();
+
+/// Maximum allowed size for a single WAL entry (64 MiB).
+///
+/// A header with `size` exceeding this limit is treated as corrupt. This
+/// prevents a bit-flipped size field (e.g. `0xFFFF_FFFF`) from causing a
+/// multi-GiB allocation during the WAL scan.
+const MAX_ENTRY_SIZE: u64 = 64 * 1024 * 1024;
+
+/// Number of slots in the journal ring buffer.
+///
+/// Must be larger than the maximum number of entries between consecutive
+/// snapshots. If the journal wraps past this window, older un-snapshotted
+/// entries are silently evicted from the in-memory index (the WAL file
+/// still contains them, but they become unreachable for recovery).
+///
+/// **NOTE:** This number needs to be chosen in balance between number of
+/// entries in [`core::consensus::pipeline_prepare_queue_max`]. Because this
number controls
+/// how many committed but not yet snapshotted entries that the buffer can
+/// hold. This may need to be tuned properly.
+pub(crate) const SLOT_COUNT: usize = 1024;
+
+/// Error type for journal operations.
+#[derive(Debug)]
+#[allow(clippy::module_name_repetitions)]
+pub enum JournalError {
+ Io(io::Error),
+}
+
+impl fmt::Display for JournalError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Self::Io(e) => write!(f, "journal I/O error: {e}"),
+ }
+ }
+}
+
+impl std::error::Error for JournalError {
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ match self {
+ Self::Io(e) => Some(e),
+ }
+ }
+}
+
+impl From<io::Error> for JournalError {
+ fn from(e: io::Error) -> Self {
+ Self::Io(e)
+ }
+}
+
+/// Persistent metadata journal backed by an append-only WAL file.
+///
+/// Each WAL entry is a raw `Message<PrepareHeader>`:
+/// `[PrepareHeader: 256 bytes][body: header.size - 256 bytes]`
+///
+/// The in-memory index is a fixed-size slot array indexed by
+/// `op % SLOT_COUNT`.
+pub struct MetadataJournal {
+ /// File-backed append-only WAL.
+ storage: FileStorage,
+ /// In-memory slot array of entry headers, indexed by `op % SLOT_COUNT`.
+ /// A slot is `None` if no entry occupies it (or it has been drained).
+ headers: RefCell<Vec<Option<PrepareHeader>>>,
+ /// Byte offset within the WAL file for each slot's entry, mirrors
`headers`.
+ offsets: RefCell<Vec<Option<u64>>>,
+ /// Highest op number appended to the journal, or `None` if empty.
+ /// Used to detect forward progress and validate append ordering.
+ last_op: Cell<Option<u64>>,
+ /// Highest op that has been durably snapshotted. Entries with `op <=
snapshot_op`
+ /// are safe to evict from the slot array. Appending an entry that would
evict
+ /// an un-snapshotted entry (op > `snapshot_op`) panics and the upper
layer must
+ /// take a snapshot before the journal wraps.
+ snapshot_op: Cell<u64>,
+}
+
+impl fmt::Debug for MetadataJournal {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ f.debug_struct("MetadataJournal")
+ .field("write_offset", &self.storage.file_len())
+ .field("last_op", &self.last_op.get())
+ .finish_non_exhaustive()
+ }
+}
+
+#[allow(clippy::cast_possible_truncation)]
+const fn slot_for_op(op: u64) -> usize {
+ op as usize % SLOT_COUNT
+}
+
+#[allow(clippy::cast_possible_truncation)]
+impl MetadataJournal {
+ /// Open the WAL file, scanning forward to rebuild the in-memory index.
+ ///
+ /// `snapshot_op` is the highest op that has been durably snapshotted.
+ /// It must be provided so that `append()` can detect slot collisions
+ /// that would evict un-snapshotted entries.
+ ///
+ /// If a truncated entry is found at the tail (crash during write),
+ /// the file is truncated to the last complete entry.
+ ///
+ /// # Errors
+ /// Returns `JournalError::Io` if the WAL file cannot be opened or read.
+ #[allow(clippy::future_not_send)]
+ pub async fn open(path: &Path, snapshot_op: u64) -> Result<Self,
JournalError> {
+ let storage = FileStorage::open(path).await?;
+ let file_len = storage.file_len();
+ let mut headers: Vec<Option<PrepareHeader>> = vec![None; SLOT_COUNT];
+ let mut offsets: Vec<Option<u64>> = vec![None; SLOT_COUNT];
+ let mut last_op: Option<u64> = None;
+ let mut pos: u64 = 0;
+ let mut header_buf = vec![0u8; HEADER_SIZE];
+
+ while pos + HEADER_SIZE as u64 <= file_len {
+ // Read the 256-byte header
+ header_buf = storage.read_at(pos, header_buf).await?;
+ let header: PrepareHeader =
+ *bytemuck::checked::from_bytes::<PrepareHeader>(&header_buf);
+
+ // Validate: must be a Prepare command with sane size
+ if header.command != Command2::Prepare
+ || (header.size as usize) < HEADER_SIZE
+ || u64::from(header.size) > MAX_ENTRY_SIZE
+ {
+ // Corrupt or non-prepare entry, truncate here
+ storage.truncate(pos).await?;
+ break;
+ }
+
+ let entry_size = u64::from(header.size);
+
+ // Check if the full entry fits
+ if pos + entry_size > file_len {
+ // Truncated entry at tail
+ // This handles the case where crash happened during write and
+ // only header was written and body was not. so we truncate
the file to the start of the entry.
+ storage.truncate(pos).await?;
+ break;
+ }
+
+ let slot = slot_for_op(header.op);
+
+ // Note: Regarding duplicate op in WAL. We rewrite it with
whichever
+ // is the latest entry.
+ headers[slot] = Some(header);
+ offsets[slot] = Some(pos);
+
+ match last_op {
+ Some(current) if header.op > current => last_op =
Some(header.op),
+ None => last_op = Some(header.op),
+ _ => {}
+ }
+
+ pos += entry_size;
+ }
+
+ // If there are leftover bytes less than a header, truncate them
+ if pos < storage.file_len() {
+ storage.truncate(pos).await?;
+ }
+
+ Ok(Self {
+ storage,
+ headers: RefCell::new(headers),
+ offsets: RefCell::new(offsets),
+ last_op: Cell::new(last_op),
+ snapshot_op: Cell::new(snapshot_op),
+ })
+ }
+
+ /// Return headers with `op >= from_op`, sorted by op.
+ pub fn iter_headers_from(&self, from_op: u64) -> Vec<PrepareHeader> {
+ let headers = self.headers.borrow();
+ let mut result: Vec<PrepareHeader> = headers
+ .iter()
+ .filter_map(|slot| slot.filter(|h| h.op >= from_op))
+ .collect();
+ result.sort_unstable_by_key(|h| h.op);
+ result
+ }
+
+ /// Highest op number in the index, or `None` if empty.
+ pub const fn last_op(&self) -> Option<u64> {
+ self.last_op.get()
+ }
+
+ /// Advance the snapshot watermark. The caller must ensure `op` is
+ /// monotonically increasing and corresponds to a durable snapshot.
+ ///
+ /// # Panics
+ /// Panics if `op` is less than the current snapshot watermark.
+ pub fn set_snapshot_op(&self, op: u64) {
+ assert!(
+ op >= self.snapshot_op.get(),
+ "snapshot_op must be monotonically increasing: {} -> {}",
+ self.snapshot_op.get(),
+ op
+ );
+ self.snapshot_op.set(op);
+ }
+
+ /// Access the underlying storage (for fsync in tests, etc.).
+ pub const fn storage_ref(&self) -> &FileStorage {
+ &self.storage
+ }
+
+ /// Async entry read for recovery.
+ ///
+ /// Returns `Ok(None)` if the op is not in the index.
+ ///
+ /// # Errors
+ /// Returns an I/O error if the read fails or the entry is malformed.
+ #[allow(clippy::future_not_send)]
+ pub async fn entry_at(
+ &self,
+ header: &PrepareHeader,
+ ) -> io::Result<Option<Message<PrepareHeader>>> {
+ let (offset, size) = {
+ let headers = self.headers.borrow();
+ let offsets = self.offsets.borrow();
+ let slot = slot_for_op(header.op);
+ let stored = match headers[slot].as_ref() {
+ Some(h) if h.op == header.op => h,
+ _ => return Ok(None),
+ };
+ let Some(offset) = offsets[slot] else {
+ return Ok(None);
+ };
+ (offset, stored.size as usize)
+ };
+ let buf = vec![0u8; size];
+ let buf = self.storage.read_at(offset, buf).await?;
+ let msg = Message::from_bytes(Bytes::from(buf))
+ .map_err(|e| io::Error::new(io::ErrorKind::InvalidData,
e.to_string()))?;
+ Ok(Some(msg))
+ }
+}
+
+#[allow(
+ clippy::cast_possible_truncation,
+ clippy::cast_sign_loss,
+ clippy::future_not_send
+)]
+impl Journal<FileStorage> for MetadataJournal {
+ type Header = PrepareHeader;
+ type Entry = Message<PrepareHeader>;
+ type HeaderRef<'a> = Ref<'a, PrepareHeader>;
+
+ fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
+ let headers = self.headers.borrow();
+ Ref::filter_map(headers, |h| {
+ let slot = slot_for_op(idx as u64);
+ let header = h[slot].as_ref()?;
+ if header.op == idx as u64 {
+ Some(header)
+ } else {
+ None
+ }
+ })
+ .ok()
+ }
+
+ fn previous_header(&self, header: &Self::Header) ->
Option<Self::HeaderRef<'_>> {
+ if header.op == 0 {
+ return None;
+ }
+ self.header((header.op - 1) as usize)
+ }
+
+ fn remaining_capacity(&self) -> Option<usize> {
+ let Some(last) = self.last_op.get() else {
+ return Some(SLOT_COUNT);
+ };
+ let snapshot = self.snapshot_op.get();
+ if last <= snapshot {
+ return Some(SLOT_COUNT);
+ }
+ let used = (last - snapshot) as usize;
+ Some(SLOT_COUNT.saturating_sub(used))
+ }
+
+ /// Remove entries with ops in `ops` from the journal,
+ /// returning the removed entries sorted by op.
+ ///
+ /// Internally advances the snapshot watermark to `end_op` so that
+ /// future appends treat drained slots as safe to overwrite. Rewrites
+ /// the WAL file keeping only entries outside the drained range.
+ async fn drain(&self, ops: RangeInclusive<u64>) ->
io::Result<Vec<Self::Entry>> {
+ let end_op = *ops.end();
+
+ // Advance the snapshot watermark so future appends treat
+ // drained ops as safe to overwrite.
+ if end_op > self.snapshot_op.get() {
+ self.snapshot_op.set(end_op);
+ }
+
+ // Partition slots into drained and live entries.
+ let mut to_drain: Vec<(PrepareHeader, u64)> = Vec::new();
+ let mut live: Vec<(PrepareHeader, u64)> = Vec::new();
+ {
+ let headers = self.headers.borrow();
+ let offsets = self.offsets.borrow();
+ for slot in 0..SLOT_COUNT {
+ if let (Some(h), Some(off)) = (&headers[slot], offsets[slot]) {
+ if ops.contains(&h.op) {
+ to_drain.push((*h, off));
+ } else {
+ live.push((*h, off));
+ }
+ }
+ }
+ }
+ to_drain.sort_unstable_by_key(|(h, _)| h.op);
+ live.sort_unstable_by_key(|(h, _)| h.op);
+
+ // Read drained entries from disk before rewriting the WAL.
+ let mut drained = Vec::with_capacity(to_drain.len());
+ for (header, offset) in &to_drain {
+ let buf = vec![0u8; header.size as usize];
+ let buf = self.storage.read_at(*offset, buf).await?;
+ let msg = Message::from_bytes(Bytes::from(buf))
+ .map_err(|e| io::Error::new(io::ErrorKind::InvalidData,
e.to_string()))?;
+ drained.push(msg);
+ }
+
+ // Write live entries to a temp file.
+ let wal_path = self.storage.path();
+ let tmp_path = wal_path.with_extension("wal.tmp");
+ {
+ let mut tmp = compio::fs::File::create(&tmp_path).await?;
+ let mut write_pos: u64 = 0;
+ for (header, old_offset) in &live {
+ let size = header.size as usize;
+ let buf = vec![0u8; size];
+ let buf = self.storage.read_at(*old_offset, buf).await?;
+ let (result, _buf) = tmp.write_all_at(buf,
write_pos).await.into();
+ result?;
+ write_pos += size as u64;
+ }
+ tmp.sync_all().await?;
+ }
+
+ // Atomic replace.
+ compio::fs::rename(&tmp_path, wal_path).await?;
+
+ // Fsync parent directory to make the rename durable.
+ if let Some(parent) = wal_path.parent() {
+ let dir = compio::fs::File::open(parent).await?;
+ dir.sync_all().await?;
+ }
+
+ // Reopen the file descriptor at the same path.
+ self.storage.reopen().await?;
+
+ // Rebuild offsets for the compacted layout and clear drained slots.
+ let mut headers = self.headers.borrow_mut();
+ let mut offsets = self.offsets.borrow_mut();
+ let mut pos: u64 = 0;
+ for (header, _) in &live {
+ let slot = slot_for_op(header.op);
+ offsets[slot] = Some(pos);
+ pos += u64::from(header.size);
+ }
+ for slot in 0..SLOT_COUNT {
+ if let Some(h) = &headers[slot]
+ && ops.contains(&h.op)
+ {
+ headers[slot] = None;
+ offsets[slot] = None;
+ }
+ }
+
+ Ok(drained)
+ }
+
+ async fn append(&self, entry: Self::Entry) -> io::Result<()> {
+ let header = *entry.header();
+ let offset = self.storage.file_len();
+
+ self.storage.write_append(entry.into_inner()).await?;
+ self.storage.fsync().await?;
+
+ let slot = slot_for_op(header.op);
+ let mut headers = self.headers.borrow_mut();
+ let mut offsets = self.offsets.borrow_mut();
+
+ if let Some(existing) = &headers[slot] {
+ assert!(
+ existing.op <= self.snapshot_op.get(),
+ "journal slot collision: appending op {} would evict op {} \
+ which has not been snapshotted (snapshot_op={})",
+ header.op,
+ existing.op,
+ self.snapshot_op.get(),
+ );
+ }
+
+ headers[slot] = Some(header);
+ offsets[slot] = Some(offset);
+
+ match self.last_op.get() {
+ Some(current) if header.op > current =>
self.last_op.set(Some(header.op)),
+ None => self.last_op.set(Some(header.op)),
+ _ => {}
+ }
+
+ Ok(())
+ }
+
+ async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
+ let (size, offset) = {
+ let headers = self.headers.borrow();
+ let offsets = self.offsets.borrow();
+ let slot = slot_for_op(header.op);
+ let stored = headers[slot].as_ref()?;
+ if stored.op != header.op {
+ return None;
+ }
+ (stored.size as usize, offsets[slot]?)
+ };
+
+ let buffer = vec![0u8; size];
+ let buffer = self.storage.read_at(offset, buffer).await.ok()?;
+ Message::from_bytes(Bytes::from(buffer)).ok()
+ }
+}
+
+impl JournalHandle for MetadataJournal {
+ type Storage = FileStorage;
+ type Target = Self;
+
+ fn handle(&self) -> &Self::Target {
+ self
+ }
+}
+
+#[cfg(test)]
+#[allow(clippy::cast_possible_truncation, clippy::cast_sign_loss)]
+mod tests {
+ use super::*;
+ use bytes::BytesMut;
+ use iggy_binary_protocol::consensus::Operation;
+ use tempfile::tempdir;
+
+ fn make_prepare(op: u64, body_size: usize) -> Message<PrepareHeader> {
+ let total_size = HEADER_SIZE + body_size;
+ let mut buffer = BytesMut::zeroed(total_size);
+
+ let header = bytemuck::checked::from_bytes_mut::<PrepareHeader>(&mut
buffer[..HEADER_SIZE]);
+ header.size = total_size as u32;
+ header.command = Command2::Prepare;
+ header.op = op;
+ header.operation = Operation::CreateStream;
+
+ // Fill body with recognizable pattern
+ for (i, byte) in buffer[HEADER_SIZE..].iter_mut().enumerate() {
+ *byte = (op as u8).wrapping_add(i as u8);
+ }
+
+ Message::from_bytes(buffer.freeze()).unwrap()
+ }
+
+ #[compio::test]
+ async fn open_empty_wal() {
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("journal.wal");
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+ assert!(journal.last_op().is_none());
+ assert!(journal.header(0).is_none());
+ }
+
+ #[compio::test]
+ async fn append_and_read() {
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("journal.wal");
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+ let msg1 = make_prepare(1, 64);
+ let msg2 = make_prepare(2, 32);
+
+ journal.append(msg1.clone()).await.unwrap();
+ journal.append(msg2.clone()).await.unwrap();
+
+ assert_eq!(journal.last_op(), Some(2));
+ assert!(journal.header(1).is_some());
+ assert!(journal.header(2).is_some());
+ assert!(journal.header(3).is_none());
+
+ let entry1 = journal.entry(msg1.header()).await.unwrap();
+ assert_eq!(entry1.header().op, 1);
+ assert_eq!(entry1.body().len(), 64);
+
+ let entry2 = journal.entry(msg2.header()).await.unwrap();
+ assert_eq!(entry2.header().op, 2);
+ assert_eq!(entry2.body().len(), 32);
+ }
+
+ #[compio::test]
+ async fn reopen_rebuilds_index() {
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("journal.wal");
+
+ {
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ journal.append(make_prepare(1, 64)).await.unwrap();
+ journal.append(make_prepare(2, 128)).await.unwrap();
+ journal.append(make_prepare(3, 32)).await.unwrap();
+ journal.storage.fsync().await.unwrap();
+ }
+
+ // Reopen and verify index is rebuilt
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ assert_eq!(journal.last_op(), Some(3));
+
+ for op in 1..=3 {
+ let header = *journal.header(op).unwrap();
+ assert_eq!(header.op, op as u64);
+ let entry = journal.entry_at(&header).await.unwrap().unwrap();
+ assert_eq!(entry.header().op, op as u64);
+ }
+ }
+
+ #[compio::test]
+ async fn truncated_entry_on_reopen() {
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("journal.wal");
+
+ {
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ journal.append(make_prepare(1, 64)).await.unwrap();
+ journal.append(make_prepare(2, 128)).await.unwrap();
+ journal.storage.fsync().await.unwrap();
+ }
+
+ // Simulate crash: truncate the file to cut the second entry short
+ {
+ let storage = FileStorage::open(&path).await.unwrap();
+ let full_len = storage.file_len();
+ // Remove the last 10 bytes (partial second entry)
+ storage.truncate(full_len - 10).await.unwrap();
+ storage.fsync().await.unwrap();
+ }
+
+ // Reopen, should recover only the first entry
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+ assert_eq!(journal.last_op(), Some(1));
+ assert!(journal.header(2).is_none());
+
+ let h1 = *journal.header(1).unwrap();
+ let entry = journal.entry_at(&h1).await.unwrap().unwrap();
+ assert_eq!(entry.header().op, 1);
+ }
+
+ #[compio::test]
+ async fn iter_headers_from() {
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("journal.wal");
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+ journal.append(make_prepare(1, 32)).await.unwrap();
+ journal.append(make_prepare(2, 32)).await.unwrap();
+ journal.append(make_prepare(3, 32)).await.unwrap();
+ journal.append(make_prepare(5, 32)).await.unwrap();
+
+ let from_2 = journal.iter_headers_from(2);
+ assert_eq!(from_2.len(), 3);
+ assert_eq!(from_2[0].op, 2);
+ assert_eq!(from_2[1].op, 3);
+ assert_eq!(from_2[2].op, 5);
+
+ let from_4 = journal.iter_headers_from(4);
+ assert_eq!(from_4.len(), 1);
+ assert_eq!(from_4[0].op, 5);
+
+ let from_10 = journal.iter_headers_from(10);
+ assert!(from_10.is_empty());
+ }
+
+ #[compio::test]
+ async fn previous_header_navigation() {
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("journal.wal");
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+ journal.append(make_prepare(0, 32)).await.unwrap();
+ journal.append(make_prepare(1, 32)).await.unwrap();
+
+ let h1 = journal.header(1).unwrap();
+ let h0 = journal.previous_header(&h1).unwrap();
+ assert_eq!(h0.op, 0);
+ assert!(journal.previous_header(&h0).is_none());
+ }
+
+ #[compio::test]
+ async fn slot_wraparound_evicts_snapshotted_entry() {
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("journal.wal");
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+ // Op 3 goes to slot 3
+ journal.append(make_prepare(3, 32)).await.unwrap();
+ // Mark op 3 as snapshotted — safe to evict
+ journal.set_snapshot_op(3);
+ // Op 3 + SLOT_COUNT goes to the same slot, evicting op 3
+ let wraparound_op = 3 + SLOT_COUNT as u64;
+ journal
+ .append(make_prepare(wraparound_op, 32))
+ .await
+ .unwrap();
+
+ // Op 3 is evicted from the index
+ assert!(journal.header(3).is_none());
+ // The new op is present
+ assert!(journal.header(3 + SLOT_COUNT).is_some());
+ assert_eq!(journal.last_op(), Some(3 + SLOT_COUNT as u64));
+ }
+
+ #[compio::test]
+ async fn drain_shrinks_wal_and_preserves_live_entries() {
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("journal.wal");
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+ // Append 5 entries
+ for op in 1..=5 {
+ journal.append(make_prepare(op, 64)).await.unwrap();
+ }
+ let size_before = journal.storage.file_len();
+
+ // Drain entries 1-3
+ let drained = journal.drain(1..=3).await.unwrap();
+ assert_eq!(drained.len(), 3);
+ assert_eq!(drained[0].header().op, 1);
+ assert_eq!(drained[1].header().op, 2);
+ assert_eq!(drained[2].header().op, 3);
+
+ let size_after = journal.storage.file_len();
+ assert!(
+ size_after < size_before,
+ "WAL should shrink after drain: {size_before} -> {size_after}"
+ );
+
+ // Drained entries are gone from the index
+ for op in 1..=3 {
+ assert!(
+ journal.header(op as usize).is_none(),
+ "op {op} should be removed"
+ );
+ }
+
+ // Live entries are still readable
+ for op in 4..=5 {
+ let h = *journal.header(op as usize).unwrap();
+ assert_eq!(h.op, op);
+ let entry = journal.entry_at(&h).await.unwrap().unwrap();
+ assert_eq!(entry.header().op, op);
+ assert_eq!(entry.body().len(), 64);
+ }
+
+ // Reopen and verify the drained WAL is valid
+ drop(journal);
+ let journal = MetadataJournal::open(&path, 3).await.unwrap();
+ assert_eq!(journal.last_op(), Some(5));
+ for op in 4..=5 {
+ let h = *journal.header(op as usize).unwrap();
+ let entry = journal.entry_at(&h).await.unwrap().unwrap();
+ assert_eq!(entry.header().op, op);
+ assert_eq!(entry.body().len(), 64);
+ }
+ }
+
+ #[compio::test]
+ #[should_panic(expected = "journal slot collision")]
+ async fn append_panics_on_evicting_unsnapshotted_entry() {
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("journal.wal");
+ let journal = MetadataJournal::open(&path, 0).await.unwrap();
+
+ journal.append(make_prepare(3, 32)).await.unwrap();
+ // No snapshot taken, evicting op 3 must panic
+ let wraparound_op = 3 + SLOT_COUNT as u64;
+ journal
+ .append(make_prepare(wraparound_op, 32))
+ .await
+ .unwrap();
+ }
+}
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 3b12f52e1..81f88e7f2 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -43,6 +43,12 @@ serde = { workspace = true, features = ["derive"] }
slab = { workspace = true }
tracing = { workspace = true }
+[dev-dependencies]
+bytemuck = { workspace = true }
+bytes = { workspace = true }
+compio = { workspace = true }
+tempfile = { workspace = true }
+
[lints.clippy]
enum_glob_use = "deny"
pedantic = "deny"
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index 668f49766..820ae6649 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -29,7 +29,8 @@ use iggy_binary_protocol::{
};
use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
-use tracing::{debug, warn};
+use std::path::Path;
+use tracing::{debug, error, warn};
#[derive(Debug, Clone)]
#[allow(unused)]
@@ -50,6 +51,67 @@ impl IggySnapshot {
pub const fn snapshot(&self) -> &MetadataSnapshot {
&self.snapshot
}
+
+ /// Persist the snapshot to disk.
+ ///
+ /// # Errors
+ /// Returns `SnapshotError` if serialization or I/O fails.
+ pub fn persist(&self, path: &Path) -> Result<(), SnapshotError> {
+ use crate::stm::snapshot::PersistStage;
+ use std::fs;
+ use std::io::Write;
+
+ let encoded = self.encode()?;
+
+ let tmp_path = path.with_extension("bin.tmp");
+
+ let mut file = fs::File::create(&tmp_path).map_err(|e|
SnapshotError::Persist {
+ stage: PersistStage::Write,
+ source: e,
+ })?;
+ file.write_all(&encoded)
+ .map_err(|e| SnapshotError::Persist {
+ stage: PersistStage::Write,
+ source: e,
+ })?;
+ file.sync_all().map_err(|e| SnapshotError::Persist {
+ stage: PersistStage::Sync,
+ source: e,
+ })?;
+ drop(file);
+
+ fs::rename(&tmp_path, path).map_err(|e| SnapshotError::Persist {
+ stage: PersistStage::Rename,
+ source: e,
+ })?;
+
+ // Fsync the parent directory to ensure the rename is durable.
+ if let Some(parent) = path.parent() {
+ let dir = fs::File::open(parent).map_err(|e|
SnapshotError::Persist {
+ stage: PersistStage::DirSync,
+ source: e,
+ })?;
+ dir.sync_all().map_err(|e| SnapshotError::Persist {
+ stage: PersistStage::DirSync,
+ source: e,
+ })?;
+ }
+
+ Ok(())
+ }
+
+ /// Load a snapshot from disk.
+ ///
+ /// # Errors
+ /// Returns `SnapshotError` if the file cannot be read or deserialization
fails.
+ pub fn load(path: &Path) -> Result<Self, SnapshotError> {
+ let data = std::fs::read(path)?;
+
+ // TODO: when checksum is added we need to check
+ // if data.len() is atleast the size of checksum
+
+ Self::decode(data.as_slice())
+ }
}
impl Snapshot for IggySnapshot {
@@ -87,7 +149,88 @@ impl Snapshot for IggySnapshot {
}
}
-#[derive(Debug)]
+/// Coordinates snapshot creation, persistence, and WAL compaction.
+///
+/// Owns the data directory path and the snapshot creation function.
+pub struct SnapshotCoordinator<M> {
+ data_dir: std::path::PathBuf,
+ create_snapshot: fn(&M, u64) -> Result<IggySnapshot, SnapshotError>,
+}
+
+impl<M> SnapshotCoordinator<M> {
+ /// Number of remaining journal slots at which a checkpoint is forced.
+ // TODO: tune this margin size
+ const CHECKPOINT_MARGIN: usize = 64;
+
+ #[must_use]
+ pub fn new(
+ data_dir: std::path::PathBuf,
+ create_snapshot: fn(&M, u64) -> Result<IggySnapshot, SnapshotError>,
+ ) -> Self {
+ Self {
+ data_dir,
+ create_snapshot,
+ }
+ }
+
+ /// Create a snapshot, persist it, and drain snapshotted entries from the
+ /// journal to reclaim WAL space.
+ ///
+ /// # Errors
+ /// Returns `SnapshotError` if snapshotting, persistence, or drain fails.
+ #[allow(clippy::future_not_send)]
+ pub async fn checkpoint<J>(
+ &self,
+ stm: &M,
+ journal: &J,
+ last_op: u64,
+ ) -> Result<(), SnapshotError>
+ where
+ J: JournalHandle,
+ {
+ let snapshot = (self.create_snapshot)(stm, last_op)?;
+ let path =
self.data_dir.join(super::METADATA_DIR).join("snapshot.bin");
+ snapshot.persist(&path)?;
+
+ let _ = journal
+ .handle()
+ .drain(0..=last_op)
+ .await
+ .map_err(SnapshotError::Io)?;
+
+ Ok(())
+ }
+
+ /// Force a checkpoint if the journal is running low on capacity.
+ ///
+ /// Returns `Ok(true)` if a checkpoint was taken, `Ok(false)` if not
needed.
+ ///
+ /// # Errors
+ /// Returns `SnapshotError` if the checkpoint fails.
+ #[allow(clippy::future_not_send)]
+ pub async fn checkpoint_if_needed<J>(
+ &self,
+ stm: &M,
+ journal: &J,
+ commit_op: u64,
+ ) -> Result<bool, SnapshotError>
+ where
+ J: JournalHandle,
+ {
+ let needs_checkpoint = journal
+ .handle()
+ .remaining_capacity()
+ .is_some_and(|c| c <= Self::CHECKPOINT_MARGIN);
+
+ if needs_checkpoint {
+ self.checkpoint(stm, journal, commit_op).await?;
+ Ok(true)
+ } else {
+ Ok(false)
+ }
+ }
+}
+
pub struct IggyMetadata<C, J, S, M> {
/// Some on shard0, None on other shards
pub consensus: Option<C>,
@@ -97,6 +240,36 @@ pub struct IggyMetadata<C, J, S, M> {
pub snapshot: Option<S>,
/// State machine - lives on all shards
pub mux_stm: M,
+ /// Snapshot coordinator - present when persistent checkpointing is
configured.
+ pub coordinator: Option<SnapshotCoordinator<M>>,
+}
+
+impl<C, J, S, M> IggyMetadata<C, J, S, M>
+where
+ M: FillSnapshot<MetadataSnapshot>,
+{
+ /// Create a new `IggyMetadata` instance.
+ ///
+ /// The `FillSnapshot<MetadataSnapshot>` bound is captured here via a
+ /// function pointer so that no downstream caller needs the bound.
+ #[must_use]
+ pub fn new(
+ consensus: Option<C>,
+ journal: Option<J>,
+ snapshot: Option<S>,
+ mux_stm: M,
+ data_dir: Option<std::path::PathBuf>,
+ ) -> Self {
+ let coordinator = data_dir
+ .map(|dir| SnapshotCoordinator::new(dir, |stm, seq|
IggySnapshot::create(stm, seq)));
+ Self {
+ consensus,
+ journal,
+ snapshot,
+ mux_stm,
+ coordinator,
+ }
+ }
}
#[allow(clippy::future_not_send)]
@@ -149,10 +322,32 @@ where
// TODO add assertions for valid state here.
- // TODO verify that the current prepare fits in the WAL.
-
// TODO handle gap in ops.
+ // Force a checkpoint if the journal is running low on capacity.
+ if let Some(coordinator) = &self.coordinator {
+ let snap_op = consensus.commit();
+ match coordinator
+ .checkpoint_if_needed(&self.mux_stm, journal, snap_op)
+ .await
+ {
+ Ok(true) => {
+ debug!(
+ replica = consensus.replica(),
+ "on_replicate: forced checkpoint at op={snap_op}"
+ );
+ }
+ Ok(false) => {}
+ Err(e) => {
+ error!(
+ replica = consensus.replica(),
+ "on_replicate: forced checkpoint failed: {e}"
+ );
+ return;
+ }
+ }
+ }
+
// Verify hash chain integrity.
if let Some(previous) = journal.handle().previous_header(header) {
panic_if_hash_chain_would_break_in_same_view(&previous, header);
@@ -164,7 +359,13 @@ where
consensus.set_last_prepare_checksum(header.checksum);
// Append to journal.
- journal.handle().append(message.clone()).await;
+ if let Err(e) = journal.handle().append(message.clone()).await {
+ error!(
+ replica = consensus.replica(),
+ "on_replicate: journal append failed: {e}"
+ );
+ return;
+ }
// After successful journal write, send prepare_ok to primary.
self.send_prepare_ok(header).await;
diff --git a/core/metadata/src/impls/mod.rs b/core/metadata/src/impls/mod.rs
index 1fb71a3cf..e076346e9 100644
--- a/core/metadata/src/impls/mod.rs
+++ b/core/metadata/src/impls/mod.rs
@@ -15,3 +15,7 @@
// specific language governing permissions and limitations
// under the License.
pub mod metadata;
+pub mod recovery;
+
+/// Subdirectory under the data root where metadata state is stored.
+pub const METADATA_DIR: &str = "metadata";
diff --git a/core/metadata/src/impls/recovery.rs
b/core/metadata/src/impls/recovery.rs
new file mode 100644
index 000000000..ef461f18b
--- /dev/null
+++ b/core/metadata/src/impls/recovery.rs
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::impls::metadata::IggySnapshot;
+use crate::stm::StateMachine;
+use crate::stm::snapshot::{MetadataSnapshot, RestoreSnapshot, Snapshot,
SnapshotError};
+use iggy_binary_protocol::consensus::PrepareHeader;
+use iggy_binary_protocol::consensus::message::Message;
+use iggy_common::IggyError;
+use journal::metadata_journal::{JournalError, MetadataJournal};
+use std::fmt;
+use std::path::Path;
+
+/// Error type for metadata recovery.
+#[derive(Debug)]
+pub enum RecoveryError {
+ Snapshot(SnapshotError),
+ Journal(JournalError),
+ StateMachine(IggyError),
+ Io(std::io::Error),
+}
+
+impl fmt::Display for RecoveryError {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ match self {
+ Self::Snapshot(e) => write!(f, "recovery snapshot error: {e}"),
+ Self::Journal(e) => write!(f, "recovery journal error: {e}"),
+ Self::StateMachine(e) => write!(f, "recovery state machine error:
{e}"),
+ Self::Io(e) => write!(f, "recovery I/O error: {e}"),
+ }
+ }
+}
+
+impl std::error::Error for RecoveryError {
+ fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
+ match self {
+ Self::Snapshot(e) => Some(e),
+ Self::Journal(e) => Some(e),
+ Self::StateMachine(e) => Some(e),
+ Self::Io(e) => Some(e),
+ }
+ }
+}
+
+impl From<SnapshotError> for RecoveryError {
+ fn from(e: SnapshotError) -> Self {
+ Self::Snapshot(e)
+ }
+}
+
+impl From<JournalError> for RecoveryError {
+ fn from(e: JournalError) -> Self {
+ Self::Journal(e)
+ }
+}
+
+impl From<IggyError> for RecoveryError {
+ fn from(e: IggyError) -> Self {
+ Self::StateMachine(e)
+ }
+}
+
+impl From<std::io::Error> for RecoveryError {
+ fn from(e: std::io::Error) -> Self {
+ Self::Io(e)
+ }
+}
+
+/// Result of a successful metadata recovery.
+pub struct RecoveredMetadata<M> {
+ pub journal: MetadataJournal,
+ pub snapshot: IggySnapshot,
+ pub mux_stm: M,
+ /// `None` means no snapshot existed and no journal entries were replayed.
+ /// `Some(op)` is the highest op applied, either from the snapshot or
journal replay.
+ pub last_applied_op: Option<u64>,
+}
+
+/// Recover metadata state from disk.
+///
+/// 1. Load snapshot from `{data_dir}/metadata/snapshot.bin` (or empty default)
+/// 2. Restore state machine from snapshot
+/// 3. Open WAL at `{data_dir}/metadata/journal.wal`, scan and rebuild index
+/// 4. Replay journal entries from `snapshot.sequence_number + 1` through the
state machine
+/// 5. Return the assembled `RecoveredMetadata`
+///
+/// # Errors
+/// Returns `RecoveryError` if snapshot loading, journal opening, or replay
fails.
+#[allow(clippy::future_not_send)]
+pub async fn recover<M>(data_dir: &Path) -> Result<RecoveredMetadata<M>,
RecoveryError>
+where
+ M: StateMachine<Input = Message<PrepareHeader>, Error = IggyError>
+ + RestoreSnapshot<MetadataSnapshot>,
+{
+ let metadata_dir = data_dir.join(super::METADATA_DIR);
+ std::fs::create_dir_all(&metadata_dir)?;
+
+ // 1. Load snapshot (or empty default if missing)
+ let snapshot_path = metadata_dir.join("snapshot.bin");
+ let (snapshot, replay_from) = if snapshot_path.exists() {
+ let s = IggySnapshot::load(&snapshot_path)?;
+ let from = s.sequence_number() + 1;
+ (s, from)
+ } else {
+ // No snapshot, replay from op 0.
+ (IggySnapshot::new(0), 0)
+ };
+
+ // 2. Restore state machine from snapshot
+ let mux_stm = M::restore_snapshot(snapshot.snapshot())?;
+
+ // 3. Open journal, scan the WAL and build index
+ let journal_path = metadata_dir.join("journal.wal");
+ let journal = MetadataJournal::open(&journal_path,
snapshot.sequence_number()).await?;
+
+ // 4. Replay journal entries after snapshot
+ let headers_to_replay = journal.iter_headers_from(replay_from);
+
+ let mut last_applied_op: Option<u64> = None;
+ for header in &headers_to_replay {
+ // TODO: Check hash chain integrity against `previous_header`. On a
+ // same-view break, stop replay here and mark the remaining entries for
+ // repair via VSR instead of panicking.
+
+ let entry = journal.entry_at(header).await?.ok_or_else(|| {
+ RecoveryError::Io(std::io::Error::new(
+ std::io::ErrorKind::InvalidData,
+ format!("failed to read journal entry for op={}", header.op),
+ ))
+ })?;
+ mux_stm.update(entry)?;
+ last_applied_op = Some(header.op);
+ }
+
+ Ok(RecoveredMetadata {
+ journal,
+ snapshot,
+ mux_stm,
+ last_applied_op,
+ })
+}
+
+#[cfg(test)]
+#[allow(clippy::cast_possible_truncation)]
+mod tests {
+ use super::*;
+ use bytes::BytesMut;
+ use iggy_binary_protocol::consensus::{Command2, Operation};
+ use journal::Journal;
+ use tempfile::tempdir;
+
+ use crate::MuxStateMachine;
+
+ type TestStm = MuxStateMachine<()>;
+
+ const HEADER_SIZE: usize = size_of::<PrepareHeader>();
+
+ fn make_prepare(op: u64, body_size: usize) -> Message<PrepareHeader> {
+ let total_size = HEADER_SIZE + body_size;
+ let mut buffer = BytesMut::zeroed(total_size);
+ let header = bytemuck::checked::from_bytes_mut::<PrepareHeader>(&mut
buffer[..HEADER_SIZE]);
+ header.size = total_size as u32;
+ header.command = Command2::Prepare;
+ header.op = op;
+ header.operation = Operation::CreateStream;
+ Message::from_bytes(buffer.freeze()).unwrap()
+ }
+
+ #[compio::test]
+ async fn recover_empty_state() {
+ let dir = tempdir().unwrap();
+ let recovered = recover::<TestStm>(dir.path()).await.unwrap();
+
+ assert_eq!(recovered.last_applied_op, None);
+ assert!(recovered.journal.last_op().is_none());
+ }
+
+ #[compio::test]
+ async fn recover_snapshot_only() {
+ let dir = tempdir().unwrap();
+ let metadata_dir = dir.path().join("metadata");
+ std::fs::create_dir_all(&metadata_dir).unwrap();
+
+ let snapshot = IggySnapshot::new(42);
+ snapshot
+ .persist(&metadata_dir.join("snapshot.bin"))
+ .unwrap();
+
+ let recovered = recover::<TestStm>(dir.path()).await.unwrap();
+ assert_eq!(recovered.snapshot.sequence_number(), 42);
+ assert_eq!(recovered.last_applied_op, None);
+ }
+
+ #[compio::test]
+ async fn recover_journal_only() {
+ let dir = tempdir().unwrap();
+ let metadata_dir = dir.path().join("metadata");
+ std::fs::create_dir_all(&metadata_dir).unwrap();
+
+ {
+ let journal =
MetadataJournal::open(&metadata_dir.join("journal.wal"), 0)
+ .await
+ .unwrap();
+ journal.append(make_prepare(1, 32)).await.unwrap();
+ journal.append(make_prepare(2, 32)).await.unwrap();
+ journal.append(make_prepare(3, 32)).await.unwrap();
+ journal.storage_ref().fsync().await.unwrap();
+ }
+
+ let recovered = recover::<TestStm>(dir.path()).await.unwrap();
+ assert_eq!(recovered.last_applied_op, Some(3));
+ assert_eq!(recovered.journal.last_op(), Some(3));
+ }
+
+ #[compio::test]
+ async fn recover_snapshot_plus_journal() {
+ let dir = tempdir().unwrap();
+ let metadata_dir = dir.path().join("metadata");
+ std::fs::create_dir_all(&metadata_dir).unwrap();
+
+ // Snapshot at op 5
+ let snapshot = IggySnapshot::new(5);
+ snapshot
+ .persist(&metadata_dir.join("snapshot.bin"))
+ .unwrap();
+
+ // WAL has ops 1-10
+ {
+ let journal =
MetadataJournal::open(&metadata_dir.join("journal.wal"), 0)
+ .await
+ .unwrap();
+ for op in 1..=10 {
+ journal.append(make_prepare(op, 32)).await.unwrap();
+ }
+ journal.storage_ref().fsync().await.unwrap();
+ }
+
+ let recovered = recover::<TestStm>(dir.path()).await.unwrap();
+ // Should replay ops 6-10 (snapshot was at 5)
+ assert_eq!(recovered.last_applied_op, Some(10));
+ assert_eq!(recovered.snapshot.sequence_number(), 5);
+ }
+
+ #[test]
+ fn snapshot_persist_load_roundtrip() {
+ let dir = tempdir().unwrap();
+ let path = dir.path().join("snapshot.bin");
+
+ let snapshot = IggySnapshot::new(99);
+ snapshot.persist(&path).unwrap();
+
+ let loaded = IggySnapshot::load(&path).unwrap();
+ assert_eq!(loaded.sequence_number(), 99);
+ }
+}
diff --git a/core/metadata/src/stm/snapshot.rs
b/core/metadata/src/stm/snapshot.rs
index 6daadc03a..b3f2ffe7c 100644
--- a/core/metadata/src/stm/snapshot.rs
+++ b/core/metadata/src/stm/snapshot.rs
@@ -28,6 +28,36 @@ pub enum SnapshotError {
Serialize(rmp_serde::encode::Error),
/// Deserialization failed.
Deserialize(rmp_serde::decode::Error),
+ /// I/O error during snapshot persist/load.
+ Io(std::io::Error),
+ /// I/O error during a specific stage of snapshot persistence.
+ /// The caller can inspect the stage to decide whether to retry
+ /// (e.g. `Rename`) or start from scratch (e.g. `Write`, `Sync`).
+ Persist {
+ stage: PersistStage,
+ source: std::io::Error,
+ },
+ /// Checksum mismatch on snapshot load.
+ ChecksumMismatch { expected: u32, actual: u32 },
+ /// Snapshot file is too short to contain a valid checksum.
+ Truncated { size: u64 },
+}
+
+/// Stage at which snapshot persistence failed.
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum PersistStage {
+ /// Creating or writing the temp file failed. The temp file may contain
+ /// partial data. Safe to delete and retry from scratch.
+ Write,
+ /// Fsync of the temp file failed. The data may not be durable.
+ /// Safe to delete the temp file and retry from scratch.
+ Sync,
+ /// Atomic rename of temp -> final path failed. The temp file contains
+ /// a complete, synced snapshot. Safe to retry just the rename.
+ Rename,
+ /// Fsync of the parent directory after rename failed. The rename
+ /// succeeded but may not be durable. Safe to retry just the dir sync.
+ DirSync,
}
impl fmt::Display for SnapshotError {
@@ -35,6 +65,22 @@ impl fmt::Display for SnapshotError {
match self {
Self::Serialize(e) => write!(f, "snapshot serialization failed:
{e}"),
Self::Deserialize(e) => write!(f, "snapshot deserialization
failed: {e}"),
+ Self::Io(e) => write!(f, "snapshot I/O error: {e}"),
+ Self::Persist { stage, source } => {
+ write!(f, "snapshot persist failed at {stage:?} stage:
{source}")
+ }
+ Self::ChecksumMismatch { expected, actual } => {
+ write!(
+ f,
+ "snapshot checksum mismatch: expected {expected:#010x},
actual {actual:#010x}"
+ )
+ }
+ Self::Truncated { size } => {
+ write!(
+ f,
+ "snapshot file truncated: {size} bytes (too short for
checksum)"
+ )
+ }
}
}
}
@@ -44,10 +90,18 @@ impl std::error::Error for SnapshotError {
match self {
Self::Serialize(e) => Some(e),
Self::Deserialize(e) => Some(e),
+ Self::Io(e) | Self::Persist { source: e, .. } => Some(e),
+ Self::ChecksumMismatch { .. } | Self::Truncated { .. } => None,
}
}
}
+impl From<std::io::Error> for SnapshotError {
+ fn from(e: std::io::Error) -> Self {
+ Self::Io(e)
+ }
+}
+
/// The snapshot container for all metadata state machines.
/// Each field corresponds to one state machine's serialized state.
#[derive(Debug, Clone, Serialize, Deserialize)]
diff --git a/core/partitions/src/iggy_partition.rs
b/core/partitions/src/iggy_partition.rs
index 393e59fc1..31bc3ba53 100644
--- a/core/partitions/src/iggy_partition.rs
+++ b/core/partitions/src/iggy_partition.rs
@@ -163,7 +163,11 @@ impl Partition for IggyPartition {
}
let message = Self::prepare_message_from_batch(header, &batch);
- journal.inner.append(message).await;
+ journal
+ .inner
+ .append(message)
+ .await
+ .map_err(|e| IggyError::IoError(e.to_string()))?;
Ok(AppendResult::new(
dirty_offset,
diff --git a/core/partitions/src/journal.rs b/core/partitions/src/journal.rs
index 5be236975..c21cc8d60 100644
--- a/core/partitions/src/journal.rs
+++ b/core/partitions/src/journal.rs
@@ -24,7 +24,23 @@ use std::{
collections::{BTreeMap, HashMap},
};
-const ZERO_LEN: usize = 0;
+// TODO: Fix that, we need to figure out how to store the
`IggyMessagesBatchSet`.
+/// No-op storage backend for the in-memory partition journal.
+#[allow(dead_code)]
+#[derive(Debug)]
+pub struct Noop;
+
+impl Storage for Noop {
+ type Buffer = ();
+
+ async fn write_at(&self, _offset: usize, _buf: ()) ->
std::io::Result<usize> {
+ Ok(0)
+ }
+
+ async fn read_at(&self, _offset: usize, _buffer: ()) ->
std::io::Result<()> {
+ Ok(())
+ }
+}
/// Lookup key for querying messages from the journal.
#[derive(Debug, Clone, Copy)]
@@ -65,7 +81,7 @@ pub struct PartitionJournalMemStorage {
impl Storage for PartitionJournalMemStorage {
type Buffer = Bytes;
- async fn write(&self, buf: Self::Buffer) -> usize {
+ async fn write_at(&self, _offset: usize, buf: Self::Buffer) ->
std::io::Result<usize> {
let len = buf.len();
let entries = unsafe { &mut *self.entries.get() };
let offset_to_index = unsafe { &mut *self.offset_to_index.get() };
@@ -78,17 +94,17 @@ impl Storage for PartitionJournalMemStorage {
let write_offset = *current_offset;
*current_offset += len;
- write_offset
+ Ok(write_offset)
}
- async fn read(&self, offset: usize, _len: usize) -> Self::Buffer {
+ async fn read_at(&self, offset: usize, _buffer: Self::Buffer) ->
std::io::Result<Self::Buffer> {
let offset_to_index = unsafe { &*self.offset_to_index.get() };
let Some(&index) = offset_to_index.get(&offset) else {
- return Bytes::new();
+ return Ok(Bytes::new());
};
let entries = unsafe { &*self.entries.get() };
- entries.get(index).cloned().unwrap_or_default()
+ Ok(entries.get(index).cloned().unwrap_or_default())
}
}
@@ -244,7 +260,11 @@ where
let bytes = {
let inner = unsafe { &*self.inner.get() };
- inner.storage.read(storage_offset, ZERO_LEN).await
+ inner
+ .storage
+ .read_at(storage_offset, Bytes::new())
+ .await
+ .unwrap_or_default()
};
if bytes.is_empty() {
@@ -287,7 +307,11 @@ where
let bytes = {
let inner = unsafe { &*self.inner.get() };
- inner.storage.read(storage_offset, ZERO_LEN).await
+ inner
+ .storage
+ .read_at(storage_offset, Bytes::new())
+ .await
+ .unwrap_or_default()
};
if bytes.is_empty() {
@@ -331,7 +355,7 @@ where
headers.iter().find(|candidate| candidate.op == prev_op)
}
- async fn append(&self, entry: Self::Entry) {
+ async fn append(&self, entry: Self::Entry) -> std::io::Result<()> {
let first_offset_and_timestamp = Self::message_to_batch(&entry)
.and_then(|batch| Some((batch.first_offset()?,
batch.first_timestamp()?)));
@@ -346,7 +370,7 @@ where
let bytes = entry.into_inner();
let storage_offset = {
let inner = unsafe { &*self.inner.get() };
- inner.storage.write(bytes).await
+ inner.storage.write_at(0, bytes).await?
};
{
@@ -361,6 +385,8 @@ where
let timestamp_to_op = unsafe { &mut *self.timestamp_to_op.get() };
timestamp_to_op.insert(timestamp, op);
}
+
+ Ok(())
}
async fn entry(&self, header: &Self::Header) -> Option<Self::Entry> {
diff --git a/core/partitions/src/log.rs b/core/partitions/src/log.rs
index 7d88f60c2..e3798a10e 100644
--- a/core/partitions/src/log.rs
+++ b/core/partitions/src/log.rs
@@ -69,7 +69,7 @@ where
self.inner.previous_header(header)
}
- fn append(&self, entry: Self::Entry) -> impl Future<Output = ()> {
+ fn append(&self, entry: Self::Entry) -> impl Future<Output =
std::io::Result<()>> {
self.inner.append(entry)
}
diff --git a/core/simulator/src/deps.rs b/core/simulator/src/deps.rs
index 76fdd1ae6..e6ca06b89 100644
--- a/core/simulator/src/deps.rs
+++ b/core/simulator/src/deps.rs
@@ -30,28 +30,34 @@ use std::collections::HashMap;
#[derive(Debug, Default)]
pub struct MemStorage {
data: RefCell<Vec<u8>>,
- offset: Cell<u64>,
}
#[allow(clippy::future_not_send)]
impl Storage for MemStorage {
type Buffer = Vec<u8>;
- async fn write(&self, buf: Self::Buffer) -> usize {
+ async fn write_at(&self, offset: usize, buf: Self::Buffer) ->
std::io::Result<usize> {
let len = buf.len();
- self.data.borrow_mut().extend_from_slice(&buf);
- self.offset.set(self.offset.get() + len as u64);
- len
+ let mut data = self.data.borrow_mut();
+ let end = offset + len;
+ if end > data.len() {
+ data.resize(end, 0);
+ }
+ data[offset..end].copy_from_slice(&buf);
+ Ok(len)
}
- async fn read(&self, offset: usize, len: usize) -> Self::Buffer {
- let mut buffer = vec![0; len];
+ async fn read_at(
+ &self,
+ offset: usize,
+ mut buffer: Self::Buffer,
+ ) -> std::io::Result<Self::Buffer> {
let data = self.data.borrow();
let end = offset + buffer.len();
if offset < data.len() && end <= data.len() {
buffer[..].copy_from_slice(&data[offset..end]);
}
- buffer
+ Ok(buffer)
}
}
@@ -106,7 +112,8 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for
SimJournal<S> {
let header = headers.get(&header.op)?;
let offset = *offsets.get(&header.op)?;
- let buffer = self.storage.read(offset, header.size as usize).await;
+ let buffer = vec![0; header.size as usize];
+ let buffer = self.storage.read_at(offset, buffer).await.ok()?;
let message =
Message::from_bytes(Bytes::from(buffer)).expect("simulator: bytes
should be valid");
Some(message)
@@ -119,16 +126,20 @@ impl<S: Storage<Buffer = Vec<u8>>> Journal<S> for
SimJournal<S> {
unsafe { &*self.headers.get() }.get(&(header.op - 1))
}
- async fn append(&self, entry: Self::Entry) {
+ async fn append(&self, entry: Self::Entry) -> std::io::Result<()> {
let header = *entry.header();
let message_bytes = entry.as_bytes();
- let bytes_written = self.storage.write(message_bytes.to_vec()).await;
+ let bytes_written = self
+ .storage
+ .write_at(self.write_offset.get(), message_bytes.to_vec())
+ .await?;
let offset = self.write_offset.get();
unsafe { &mut *self.headers.get() }.insert(header.op, header);
unsafe { &mut *self.offsets.get() }.insert(header.op, offset);
self.write_offset.set(offset + bytes_written);
+ Ok(())
}
fn header(&self, idx: usize) -> Option<Self::HeaderRef<'_>> {
diff --git a/core/simulator/src/replica.rs b/core/simulator/src/replica.rs
index 5711da0ce..fd5891956 100644
--- a/core/simulator/src/replica.rs
+++ b/core/simulator/src/replica.rs
@@ -53,12 +53,13 @@ pub fn new_replica(id: u8, name: String, bus: &Arc<MemBus>,
replica_count: u8) -
);
metadata_consensus.init();
- let metadata = IggyMetadata {
- consensus: Some(metadata_consensus),
- journal: Some(SimJournal::<MemStorage>::default()),
- snapshot: Some(SimSnapshot::default()),
- mux_stm: mux,
- };
+ let metadata = IggyMetadata::new(
+ Some(metadata_consensus),
+ Some(SimJournal::<MemStorage>::default()),
+ Some(SimSnapshot::default()),
+ mux,
+ None,
+ );
let partitions_config = PartitionsConfig {
messages_required_to_save: 1000,