This is an automated email from the ASF dual-hosted git repository.
numinnex pushed a commit to branch replica_bootstrap
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/replica_bootstrap by this push:
new cc31a42c3 address code review nits
cc31a42c3 is described below
commit cc31a42c3465a78d3779e739356435c2a6a09965
Author: numinex <[email protected]>
AuthorDate: Wed May 6 10:38:35 2026 +0200
address code review nits
---
core/metadata/src/impls/recovery.rs | 5 ++-
core/partitions/src/iggy_index_writer.rs | 9 +++-
core/partitions/src/messages_writer.rs | 10 +++--
core/server-ng/src/bootstrap.rs | 76 +++++++++++++++++++++++++++++---
core/server-ng/src/server_error.rs | 24 ++++++++++
core/shard/src/router.rs | 38 +++++++---------
6 files changed, 127 insertions(+), 35 deletions(-)
diff --git a/core/metadata/src/impls/recovery.rs
b/core/metadata/src/impls/recovery.rs
index a75eca7c7..3d8d0d436 100644
--- a/core/metadata/src/impls/recovery.rs
+++ b/core/metadata/src/impls/recovery.rs
@@ -135,7 +135,10 @@ where
)
.await?;
- // 4. Replay journal entries after snapshot
+ // 4. Replay journal entries after snapshot.
+ // Intentional fail-fast for now: if replay hits a bad entry, recovery
+ // aborts and the operator must repair or truncate the WAL before the
+ // node can boot again.
let headers_to_replay = journal.iter_headers_from(replay_from);
let mut last_applied_op: Option<u64> = None;
diff --git a/core/partitions/src/iggy_index_writer.rs
b/core/partitions/src/iggy_index_writer.rs
index 8d2eb9c1b..0d9027bc5 100644
--- a/core/partitions/src/iggy_index_writer.rs
+++ b/core/partitions/src/iggy_index_writer.rs
@@ -43,14 +43,19 @@ impl IggyIndexWriter {
file_exists: bool,
) -> Result<Self, IggyError> {
let mut opts = OpenOptions::new();
- opts.create(true).write(true);
+ opts.write(true);
+ if !file_exists {
+ opts.create(true);
+ }
let file = opts
.open(file_path)
.await
.map_err(|_| IggyError::CannotReadFile)?;
if file_exists {
- let _ = file.sync_all().await;
+ file.sync_all()
+ .await
+ .map_err(|_| IggyError::CannotWriteToFile)?;
let actual_index_size = file
.metadata()
diff --git a/core/partitions/src/messages_writer.rs
b/core/partitions/src/messages_writer.rs
index 517a036c9..d45f31146 100644
--- a/core/partitions/src/messages_writer.rs
+++ b/core/partitions/src/messages_writer.rs
@@ -50,17 +50,19 @@ impl MessagesWriter {
file_exists: bool,
) -> Result<Self, IggyError> {
let mut opts = OpenOptions::new();
- opts.create(true).write(true);
+ opts.write(true);
+ if !file_exists {
+ opts.create(true);
+ }
let file = opts
.open(file_path)
.await
.map_err(|_| IggyError::CannotReadFile)?;
if file_exists {
- let _ = file
- .sync_all()
+ file.sync_all()
.await
- .map_err(|_| IggyError::CannotWriteToFile);
+ .map_err(|_| IggyError::CannotWriteToFile)?;
let actual_messages_size = file
.metadata()
diff --git a/core/server-ng/src/bootstrap.rs b/core/server-ng/src/bootstrap.rs
index ffbc0424f..24b9a9565 100644
--- a/core/server-ng/src/bootstrap.rs
+++ b/core/server-ng/src/bootstrap.rs
@@ -476,7 +476,7 @@ async fn load_partition(
.any(|segment| segment.size > IggyByteSize::default());
partition.stats.set_current_offset(current_offset);
- configure_consumer_offsets(&mut partition, config, namespace,
current_offset);
+ configure_consumer_offsets(&mut partition, config, namespace,
current_offset)?;
ensure_initial_segment(&mut partition, config, stream_id, topic_id,
partition_id).await?;
Ok(partition)
@@ -500,6 +500,17 @@ async fn hydrate_partition_log(
.zip(loaded_log.storages().iter().cloned())
.enumerate()
{
+ validate_recovered_segment(
+ stream_id,
+ topic_id,
+ partition_id,
+ segment,
+ &storage,
+ loaded_log
+ .indexes()
+ .get(segment_index)
+ .and_then(|indexes| indexes.as_ref()),
+ )?;
let max_timestamp = match loaded_log
.indexes()
.get(segment_index)
@@ -560,6 +571,34 @@ async fn hydrate_partition_log(
Ok(())
}
+fn validate_recovered_segment(
+ stream_id: usize,
+ topic_id: usize,
+ partition_id: usize,
+ segment: &iggy_common::Segment,
+ storage: &iggy_common::SegmentStorage,
+ indexes: Option<&server::streaming::segments::IggyIndexesMut>,
+) -> Result<(), ServerNgError> {
+ let messages_size_bytes = storage
+ .messages_reader
+ .as_ref()
+ .map_or(0, |reader| u64::from(reader.file_size()));
+ let indexed_size_bytes = indexes.map_or(0, |indexes|
u64::from(indexes.messages_size()));
+ if messages_size_bytes == indexed_size_bytes {
+ return Ok(());
+ }
+
+ Err(ServerNgError::RecoveredSegmentSizeDivergence {
+ stream_id,
+ topic_id,
+ partition_id,
+ start_offset: segment.start_offset,
+ end_offset: segment.end_offset,
+ messages_size_bytes,
+ indexed_size_bytes,
+ })
+}
+
fn convert_segment(segment: &iggy_common::Segment, max_timestamp: u64) ->
Segment {
Segment {
sealed: segment.sealed,
@@ -612,7 +651,7 @@ fn configure_consumer_offsets(
config: &ServerNgConfig,
namespace: IggyNamespace,
current_offset: u64,
-) {
+) -> Result<(), ServerNgError> {
let stream_id = namespace.stream_id();
let topic_id = namespace.topic_id();
let partition_id = namespace.partition_id();
@@ -631,8 +670,17 @@ fn configure_consumer_offsets(
{
let guard = consumer_offsets.pin();
for offset in loaded_consumer_offsets {
- if offset.offset.load(Ordering::Relaxed) > current_offset {
- offset.offset.store(current_offset, Ordering::Relaxed);
+ let recovered_offset = offset.offset.load(Ordering::Relaxed);
+ if recovered_offset > current_offset {
+ return Err(ServerNgError::RecoveredConsumerOffsetOutOfBounds {
+ consumer_kind: "consumer",
+ consumer_id: offset.consumer_id as usize,
+ offset: recovered_offset,
+ current_offset,
+ stream_id,
+ topic_id,
+ partition_id,
+ });
}
guard.insert(offset.consumer_id as usize, offset);
}
@@ -645,8 +693,17 @@ fn configure_consumer_offsets(
{
let guard = consumer_group_offsets.pin();
for (group_id, offset) in loaded_group_offsets {
- if offset.offset.load(Ordering::Relaxed) > current_offset {
- offset.offset.store(current_offset, Ordering::Relaxed);
+ let recovered_offset = offset.offset.load(Ordering::Relaxed);
+ if recovered_offset > current_offset {
+ return Err(ServerNgError::RecoveredConsumerOffsetOutOfBounds {
+ consumer_kind: "consumer group",
+ consumer_id: group_id.0,
+ offset: recovered_offset,
+ current_offset,
+ stream_id,
+ topic_id,
+ partition_id,
+ });
}
guard.insert(group_id, offset);
}
@@ -659,6 +716,7 @@ fn configure_consumer_offsets(
consumer_group_offsets,
config.system.partition.enforce_fsync,
);
+ Ok(())
}
async fn ensure_initial_segment(
@@ -740,6 +798,12 @@ fn resolve_tcp_topology(
let default_quic_addr =
resolve_optional_listener_addr(config.quic.enabled, "quic.address",
&config.quic.address)?;
if !config.cluster.enabled {
+ if let Some(replica_id) = current_replica_id {
+ warn!(
+ replica_id,
+ "cluster is disabled, ignoring --replica-id for single-node
server-ng startup"
+ );
+ }
return Ok(TcpTopology {
// Keep parity with the current server binary and the integration
// harness: `--replica-id` may be passed unconditionally, but in
diff --git a/core/server-ng/src/server_error.rs
b/core/server-ng/src/server_error.rs
index 67f574644..bfe848781 100644
--- a/core/server-ng/src/server_error.rs
+++ b/core/server-ng/src/server_error.rs
@@ -55,6 +55,30 @@ pub enum ServerNgError {
MissingReplicaId,
#[error("cluster node for replica {replica_id} is missing tcp_replica
port")]
ClusterReplicaPortMissing { replica_id: u8 },
+ #[error(
+ "recovered segment for stream {stream_id}, topic {topic_id}, partition
{partition_id} at start_offset {start_offset} has message/index divergence
(messages_size={messages_size_bytes}, indexed_size={indexed_size_bytes},
end_offset={end_offset})"
+ )]
+ RecoveredSegmentSizeDivergence {
+ stream_id: usize,
+ topic_id: usize,
+ partition_id: usize,
+ start_offset: u64,
+ end_offset: u64,
+ messages_size_bytes: u64,
+ indexed_size_bytes: u64,
+ },
+ #[error(
+ "recovered {consumer_kind} offset {offset} for id {consumer_id}
exceeds current_offset {current_offset} in stream {stream_id}, topic
{topic_id}, partition {partition_id}"
+ )]
+ RecoveredConsumerOffsetOutOfBounds {
+ consumer_kind: &'static str,
+ consumer_id: usize,
+ offset: u64,
+ current_offset: u64,
+ stream_id: usize,
+ topic_id: usize,
+ partition_id: usize,
+ },
#[error(
"recovered namespace stream {stream_id}, topic {topic_id}, partition
{partition_id} exceeds configured limits (max_streams={max_streams},
max_topics={max_topics}, max_partitions={max_partitions})"
)]
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 7fb31b214..c09ae2c6f 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -89,18 +89,17 @@ where
let target = if operation.is_metadata() {
0
} else if operation.is_partition() {
- self.shards_table
- .shard_for(partition_namespace)
- .unwrap_or_else(|| {
- tracing::warn!(
- shard = self.id,
- stream = partition_namespace.stream_id(),
- topic = partition_namespace.topic_id(),
- partition = partition_namespace.partition_id(),
- "namespace not found in shards_table, falling back to
shard 0"
- );
- 0
- })
+ let Some(target) =
self.shards_table.shard_for(partition_namespace) else {
+ tracing::error!(
+ shard = self.id,
+ stream = partition_namespace.stream_id(),
+ topic = partition_namespace.topic_id(),
+ partition = partition_namespace.partition_id(),
+ "namespace not found in shards_table, dropping message"
+ );
+ return;
+ };
+ target
} else {
// View-change / Commit messages carry an opaque u64 consensus
// namespace (not an `IggyNamespace`). Hash it with the same
@@ -196,16 +195,11 @@ where
} else if operation.is_partition() {
self.shards_table
.shard_for(partition_namespace)
- .unwrap_or_else(|| {
- tracing::warn!(
- shard = self.id,
- stream = partition_namespace.stream_id(),
- topic = partition_namespace.topic_id(),
- partition = partition_namespace.partition_id(),
- "namespace not found in shards_table, falling back to
shard 0"
- );
- 0
- })
+ .ok_or_else(|| {
+ ConsensusError::InvalidField(format!(
+ "namespace {raw_namespace} is not registered in
shards_table"
+ ))
+ })?
} else {
#[allow(clippy::cast_lossless)]
let shard_count =
u32::try_from(self.senders.len()).unwrap_or(u32::MAX);