This is an automated email from the ASF dual-hosted git repository.
numinnex pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 36307999b feat(consensus): impl consensus group mechanism (#3138)
36307999b is described below
commit 36307999b27c3904248d1a414d3706a25404e6cd
Author: Grzegorz Koszyk <[email protected]>
AuthorDate: Mon Apr 20 11:38:07 2026 +0200
feat(consensus): impl consensus group mechanism (#3138)
Add `consensus_group` field, required to distinguish between different
instances of consensus that are stored in `metadata` and `partition`.
---
core/binary_protocol/src/consensus/operation.rs | 50 +++-
core/binary_protocol/src/dispatch.rs | 5 +-
core/binary_protocol/src/lib.rs | 1 +
core/binary_protocol/src/primitives/mod.rs | 1 +
.../src/primitives/partition_assignment.rs | 75 ++++++
.../create_partitions_with_assignments.rs | 126 ++++++++++
.../binary_protocol/src/requests/partitions/mod.rs | 3 +
.../topics/create_topic_with_assignments.rs | 131 ++++++++++
core/binary_protocol/src/requests/topics/mod.rs | 2 +
core/consensus/src/observability.rs | 2 +
core/metadata/Cargo.toml | 2 +-
core/metadata/src/impls/metadata.rs | 280 ++++++++++++++++++---
core/metadata/src/stm/mod.rs | 41 +++
core/metadata/src/stm/mux.rs | 5 +
core/metadata/src/stm/snapshot.rs | 33 ++-
core/metadata/src/stm/stream.rs | 238 ++++++++++++++++--
core/shard/src/lib.rs | 17 +-
core/shard/src/router.rs | 5 +-
18 files changed, 930 insertions(+), 87 deletions(-)
diff --git a/core/binary_protocol/src/consensus/operation.rs
b/core/binary_protocol/src/consensus/operation.rs
index 7f0a7f5a9..f0a08a29d 100644
--- a/core/binary_protocol/src/consensus/operation.rs
+++ b/core/binary_protocol/src/consensus/operation.rs
@@ -32,6 +32,9 @@ pub enum Operation {
/// but skips state machine dispatch at commit time, the metadata
/// plane calls `commit_register` directly. Session number = commit op.
Register = 1,
+ // Internal metadata operations (journal / replica-only)
+ CreateTopicWithAssignments = 64,
+ CreatePartitionsWithAssignments = 65,
// Metadata operations (shard 0)
CreateStream = 128,
@@ -65,10 +68,25 @@ pub enum Operation {
}
impl Operation {
+ pub const INTERNAL_START: u8 = Self::CreateTopicWithAssignments as u8;
+ pub const METADATA_START: u8 = Self::CreateStream as u8;
+ pub const PARTITION_START: u8 = Self::SendMessages as u8;
+
+ /// Internal-only operations reserved for replica / journal use.
+ #[must_use]
+ #[inline]
+ pub const fn is_internal(&self) -> bool {
+ (*self as u8) >= Self::INTERNAL_START && (*self as u8) <
Self::METADATA_START
+ }
+
/// Metadata / control-plane operations handled by shard 0.
#[must_use]
#[inline]
pub const fn is_metadata(&self) -> bool {
+ if self.is_internal() {
+ return true;
+ }
+
matches!(
self,
Self::CreateStream
@@ -105,13 +123,14 @@ impl Operation {
#[must_use]
#[inline]
pub const fn is_partition(&self) -> bool {
- matches!(
- self,
- Self::SendMessages
- | Self::StoreConsumerOffset
- | Self::DeleteConsumerOffset
- | Self::DeleteSegments
- )
+ matches!(self, Self::DeleteSegments) || (*self as u8) >=
Self::PARTITION_START
+ }
+
+ /// Operations clients are allowed to send directly.
+ #[must_use]
+ #[inline]
+ pub const fn is_client_allowed(&self) -> bool {
+ !matches!(self, Self::Reserved) && !self.is_internal()
}
/// Bidirectional mapping: `Operation` -> client command code.
@@ -120,7 +139,10 @@ impl Operation {
#[must_use]
pub const fn to_command_code(&self) -> Option<u32> {
match self {
- Self::Reserved | Self::Register => None,
+ Self::Reserved
+ | Self::Register
+ | Self::CreateTopicWithAssignments
+ | Self::CreatePartitionsWithAssignments => None,
Self::CreateStream
| Self::UpdateStream
| Self::DeleteStream
@@ -217,6 +239,14 @@ mod tests {
assert!(!Operation::SendMessages.is_vsr_reserved());
assert!(!Operation::Register.is_metadata());
assert!(!Operation::Register.is_partition());
+ assert_eq!(
+ Operation::CreateTopicWithAssignments.to_command_code(),
+ None
+ );
+ assert_eq!(
+ Operation::CreatePartitionsWithAssignments.to_command_code(),
+ None
+ );
}
#[test]
@@ -230,8 +260,12 @@ mod tests {
#[test]
fn metadata_vs_partition() {
+ assert!(Operation::CreateTopicWithAssignments.is_internal());
+ assert!(Operation::CreateTopicWithAssignments.is_metadata());
+ assert!(!Operation::CreateTopicWithAssignments.is_client_allowed());
assert!(Operation::CreateStream.is_metadata());
assert!(!Operation::CreateStream.is_partition());
+ assert!(Operation::CreateStream.is_client_allowed());
assert!(Operation::SendMessages.is_partition());
assert!(!Operation::SendMessages.is_metadata());
assert!(Operation::DeleteSegments.is_partition());
diff --git a/core/binary_protocol/src/dispatch.rs
b/core/binary_protocol/src/dispatch.rs
index e79432f38..fb5e33683 100644
--- a/core/binary_protocol/src/dispatch.rs
+++ b/core/binary_protocol/src/dispatch.rs
@@ -270,7 +270,10 @@ pub const fn lookup_by_operation(op: Operation) ->
Option<&'static CommandMeta>
Operation::SendMessages => 22,
Operation::StoreConsumerOffset => 25,
Operation::DeleteConsumerOffset => 26,
- Operation::Reserved | Operation::Register => return None,
+ Operation::CreateTopicWithAssignments
+ | Operation::CreatePartitionsWithAssignments
+ | Operation::Reserved
+ | Operation::Register => return None,
};
Some(&COMMAND_TABLE[idx])
}
diff --git a/core/binary_protocol/src/lib.rs b/core/binary_protocol/src/lib.rs
index 4571bdc6e..1b02a127d 100644
--- a/core/binary_protocol/src/lib.rs
+++ b/core/binary_protocol/src/lib.rs
@@ -82,6 +82,7 @@ pub use message_view::{
};
pub use primitives::consumer::WireConsumer;
pub use primitives::identifier::{MAX_WIRE_NAME_LENGTH, WireIdentifier,
WireName};
+pub use primitives::partition_assignment::CreatedPartitionAssignment;
pub use primitives::partitioning::{MAX_MESSAGES_KEY_LENGTH, WirePartitioning};
pub use primitives::permissions::{
WireGlobalPermissions, WirePermissions, WireStreamPermissions,
WireTopicPermissions,
diff --git a/core/binary_protocol/src/primitives/mod.rs
b/core/binary_protocol/src/primitives/mod.rs
index d1d7ee56a..064f0fcfb 100644
--- a/core/binary_protocol/src/primitives/mod.rs
+++ b/core/binary_protocol/src/primitives/mod.rs
@@ -19,6 +19,7 @@
pub mod consumer;
pub mod identifier;
+pub mod partition_assignment;
pub mod partitioning;
pub mod permissions;
pub mod polling_strategy;
diff --git a/core/binary_protocol/src/primitives/partition_assignment.rs
b/core/binary_protocol/src/primitives/partition_assignment.rs
new file mode 100644
index 000000000..2c36732bd
--- /dev/null
+++ b/core/binary_protocol/src/primitives/partition_assignment.rs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le};
+use bytes::{BufMut, BytesMut};
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreatedPartitionAssignment {
+ pub partition_id: u32,
+ pub consensus_group_id: u64,
+}
+
+impl WireEncode for CreatedPartitionAssignment {
+ fn encoded_size(&self) -> usize {
+ 12
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ buf.put_u32_le(self.partition_id);
+ buf.put_u64_le(self.consensus_group_id);
+ }
+}
+
+impl WireDecode for CreatedPartitionAssignment {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ if buf.len() < 12 {
+ return Err(WireError::UnexpectedEof {
+ offset: 0,
+ need: 12,
+ have: buf.len(),
+ });
+ }
+
+ Ok((
+ Self {
+ partition_id: read_u32_le(buf, 0)?,
+ consensus_group_id: read_u64_le(buf, 4)?,
+ },
+ 12,
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::CreatedPartitionAssignment;
+ use crate::codec::{WireDecode, WireEncode};
+
+ #[test]
+ fn roundtrip() {
+ let request = CreatedPartitionAssignment {
+ partition_id: 7,
+ consensus_group_id: 42,
+ };
+ let bytes = request.to_bytes();
+ let (decoded, consumed) =
CreatedPartitionAssignment::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, request);
+ }
+}
diff --git
a/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs
b/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs
new file mode 100644
index 000000000..1350541af
--- /dev/null
+++
b/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le};
+use crate::primitives::partition_assignment::CreatedPartitionAssignment;
+use crate::requests::partitions::CreatePartitionsRequest;
+use bytes::{BufMut, BytesMut};
+
+fn usize_to_u32(value: usize, context: &str) -> u32 {
+ u32::try_from(value).unwrap_or_else(|_| panic!("{context} exceeds u32"))
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreatePartitionsWithAssignmentsRequest {
+ pub request: CreatePartitionsRequest,
+ pub partitions: Vec<CreatedPartitionAssignment>,
+}
+
+impl WireEncode for CreatePartitionsWithAssignmentsRequest {
+ fn encoded_size(&self) -> usize {
+ 4 + self.request.encoded_size()
+ + 4
+ + self
+ .partitions
+ .iter()
+ .map(WireEncode::encoded_size)
+ .sum::<usize>()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ buf.put_u32_le(usize_to_u32(
+ self.request.encoded_size(),
+ "create partitions request size",
+ ));
+ self.request.encode(buf);
+ buf.put_u32_le(usize_to_u32(
+ self.partitions.len(),
+ "create partitions partition count",
+ ));
+ for partition in &self.partitions {
+ partition.encode(buf);
+ }
+ }
+}
+
+impl WireDecode for CreatePartitionsWithAssignmentsRequest {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let request_size = read_u32_le(buf, 0)? as usize;
+ let request_start = 4;
+ let request_end = request_start + request_size;
+ if buf.len() < request_end {
+ return Err(WireError::UnexpectedEof {
+ offset: request_start,
+ need: request_size,
+ have: buf.len().saturating_sub(request_start),
+ });
+ }
+
+ let request =
CreatePartitionsRequest::decode_from(&buf[request_start..request_end])?;
+ let partitions_count = read_u32_le(buf, request_end)? as usize;
+ let mut offset = request_end + 4;
+ let mut partitions = Vec::with_capacity(partitions_count);
+ for _ in 0..partitions_count {
+ let (partition, consumed) =
CreatedPartitionAssignment::decode(&buf[offset..])?;
+ offset += consumed;
+ partitions.push(partition);
+ }
+
+ Ok((
+ Self {
+ request,
+ partitions,
+ },
+ offset,
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::CreatePartitionsWithAssignmentsRequest;
+ use crate::WireIdentifier;
+ use crate::codec::{WireDecode, WireEncode};
+ use crate::primitives::partition_assignment::CreatedPartitionAssignment;
+ use crate::requests::partitions::CreatePartitionsRequest;
+
+ #[test]
+ fn roundtrip() {
+ let request = CreatePartitionsWithAssignmentsRequest {
+ request: CreatePartitionsRequest {
+ stream_id: WireIdentifier::numeric(1),
+ topic_id: WireIdentifier::numeric(2),
+ partitions_count: 2,
+ },
+ partitions: vec![
+ CreatedPartitionAssignment {
+ partition_id: 3,
+ consensus_group_id: 11,
+ },
+ CreatedPartitionAssignment {
+ partition_id: 4,
+ consensus_group_id: 12,
+ },
+ ],
+ };
+ let bytes = request.to_bytes();
+ let (decoded, consumed) =
CreatePartitionsWithAssignmentsRequest::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, request);
+ }
+}
diff --git a/core/binary_protocol/src/requests/partitions/mod.rs
b/core/binary_protocol/src/requests/partitions/mod.rs
index 8a5d0a1a1..77c6c4d1e 100644
--- a/core/binary_protocol/src/requests/partitions/mod.rs
+++ b/core/binary_protocol/src/requests/partitions/mod.rs
@@ -16,7 +16,10 @@
// under the License.
pub mod create_partitions;
+pub mod create_partitions_with_assignments;
pub mod delete_partitions;
+pub use crate::primitives::partition_assignment::CreatedPartitionAssignment;
pub use create_partitions::CreatePartitionsRequest;
+pub use
create_partitions_with_assignments::CreatePartitionsWithAssignmentsRequest;
pub use delete_partitions::DeletePartitionsRequest;
diff --git
a/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs
b/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs
new file mode 100644
index 000000000..b5199abb3
--- /dev/null
+++ b/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le};
+use crate::primitives::partition_assignment::CreatedPartitionAssignment;
+use crate::requests::topics::CreateTopicRequest;
+use bytes::{BufMut, BytesMut};
+
+fn usize_to_u32(value: usize, context: &str) -> u32 {
+ u32::try_from(value).unwrap_or_else(|_| panic!("{context} exceeds u32"))
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreateTopicWithAssignmentsRequest {
+ pub request: CreateTopicRequest,
+ pub partitions: Vec<CreatedPartitionAssignment>,
+}
+
+impl WireEncode for CreateTopicWithAssignmentsRequest {
+ fn encoded_size(&self) -> usize {
+ 4 + self.request.encoded_size()
+ + 4
+ + self
+ .partitions
+ .iter()
+ .map(WireEncode::encoded_size)
+ .sum::<usize>()
+ }
+
+ fn encode(&self, buf: &mut BytesMut) {
+ buf.put_u32_le(usize_to_u32(
+ self.request.encoded_size(),
+ "create topic request size",
+ ));
+ self.request.encode(buf);
+ buf.put_u32_le(usize_to_u32(
+ self.partitions.len(),
+ "create topic partition count",
+ ));
+ for partition in &self.partitions {
+ partition.encode(buf);
+ }
+ }
+}
+
+impl WireDecode for CreateTopicWithAssignmentsRequest {
+ fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+ let request_size = read_u32_le(buf, 0)? as usize;
+ let request_start = 4;
+ let request_end = request_start + request_size;
+ if buf.len() < request_end {
+ return Err(WireError::UnexpectedEof {
+ offset: request_start,
+ need: request_size,
+ have: buf.len().saturating_sub(request_start),
+ });
+ }
+
+ let request =
CreateTopicRequest::decode_from(&buf[request_start..request_end])?;
+ let partitions_count = read_u32_le(buf, request_end)? as usize;
+ let mut offset = request_end + 4;
+ let mut partitions = Vec::with_capacity(partitions_count);
+ for _ in 0..partitions_count {
+ let (partition, consumed) =
CreatedPartitionAssignment::decode(&buf[offset..])?;
+ offset += consumed;
+ partitions.push(partition);
+ }
+
+ Ok((
+ Self {
+ request,
+ partitions,
+ },
+ offset,
+ ))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::CreateTopicWithAssignmentsRequest;
+ use crate::WireIdentifier;
+ use crate::codec::{WireDecode, WireEncode};
+ use crate::primitives::identifier::WireName;
+ use crate::primitives::partition_assignment::CreatedPartitionAssignment;
+ use crate::requests::topics::CreateTopicRequest;
+
+ #[test]
+ fn roundtrip() {
+ let request = CreateTopicWithAssignmentsRequest {
+ request: CreateTopicRequest {
+ stream_id: WireIdentifier::numeric(1),
+ partitions_count: 2,
+ compression_algorithm: 1,
+ message_expiry: 3600,
+ max_topic_size: 1024,
+ replication_factor: 1,
+ name: WireName::new("events").unwrap(),
+ },
+ partitions: vec![
+ CreatedPartitionAssignment {
+ partition_id: 0,
+ consensus_group_id: 1,
+ },
+ CreatedPartitionAssignment {
+ partition_id: 1,
+ consensus_group_id: 2,
+ },
+ ],
+ };
+ let bytes = request.to_bytes();
+ let (decoded, consumed) =
CreateTopicWithAssignmentsRequest::decode(&bytes).unwrap();
+ assert_eq!(consumed, bytes.len());
+ assert_eq!(decoded, request);
+ }
+}
diff --git a/core/binary_protocol/src/requests/topics/mod.rs
b/core/binary_protocol/src/requests/topics/mod.rs
index 680105ae3..a3e00a1d4 100644
--- a/core/binary_protocol/src/requests/topics/mod.rs
+++ b/core/binary_protocol/src/requests/topics/mod.rs
@@ -16,6 +16,7 @@
// under the License.
pub mod create_topic;
+pub mod create_topic_with_assignments;
pub mod delete_topic;
pub mod get_topic;
pub mod get_topics;
@@ -23,6 +24,7 @@ pub mod purge_topic;
pub mod update_topic;
pub use create_topic::CreateTopicRequest;
+pub use create_topic_with_assignments::CreateTopicWithAssignmentsRequest;
pub use delete_topic::DeleteTopicRequest;
pub use get_topic::GetTopicRequest;
pub use get_topics::GetTopicsRequest;
diff --git a/core/consensus/src/observability.rs
b/core/consensus/src/observability.rs
index 3306985da..bb042af10 100644
--- a/core/consensus/src/observability.rs
+++ b/core/consensus/src/observability.rs
@@ -618,6 +618,8 @@ pub const fn status_as_str(status: Status) -> &'static str {
pub const fn operation_as_str(operation: Operation) -> &'static str {
match operation {
Operation::Reserved => "reserved",
+ Operation::CreateTopicWithAssignments =>
"create_topic_with_assignments",
+ Operation::CreatePartitionsWithAssignments =>
"create_partitions_with_assignments",
Operation::CreateStream => "create_stream",
Operation::UpdateStream => "update_stream",
Operation::DeleteStream => "delete_stream",
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index c437c7efa..213303a81 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -30,6 +30,7 @@ publish = false
[dependencies]
ahash = { workspace = true }
+bytemuck = { workspace = true }
bytes = { workspace = true }
consensus = { workspace = true }
iggy_binary_protocol = { workspace = true }
@@ -44,7 +45,6 @@ slab = { workspace = true }
tracing = { workspace = true }
[dev-dependencies]
-bytemuck = { workspace = true }
bytes = { workspace = true }
compio = { workspace = true }
tempfile = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs
b/core/metadata/src/impls/metadata.rs
index c362821ec..f3397dd8e 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,8 +14,12 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
-use crate::stm::StateMachine;
+use crate::MuxStateMachine;
+use crate::stm::consumer_group::ConsumerGroups;
use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot,
SnapshotError};
+use crate::stm::stream::Streams;
+use crate::stm::user::Users;
+use crate::stm::{ConsensusGroupAllocator, StateMachine};
use consensus::{
CommitLogEvent, Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity,
PlaneKind, Project,
ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus,
ack_preflight,
@@ -24,12 +28,20 @@ use consensus::{
pipeline_prepare_common, register_preflight, replicate_preflight,
replicate_to_next_in_chain,
request_preflight, send_prepare_ok as send_prepare_ok_common,
};
+use
iggy_binary_protocol::primitives::partition_assignment::CreatedPartitionAssignment;
+use iggy_binary_protocol::requests::partitions::CreatePartitionsRequest as
WireCreatePartitionsRequest;
+use
iggy_binary_protocol::requests::partitions::CreatePartitionsWithAssignmentsRequest
as PersistedCreatePartitionsRequest;
+use iggy_binary_protocol::requests::topics::CreateTopicRequest as
WireCreateTopicRequest;
+use iggy_binary_protocol::requests::topics::CreateTopicWithAssignmentsRequest
as PersistedCreateTopicRequest;
use iggy_binary_protocol::{
Command2, ConsensusHeader, GenericHeader, Message, Operation,
PrepareHeader, PrepareOkHeader,
- RequestHeader,
+ RequestHeader, WireDecode, WireEncode,
};
+use iggy_common::IggyError;
+use iggy_common::variadic;
use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
+use std::mem::size_of;
use std::path::Path;
use tracing::{debug, error, warn};
@@ -37,6 +49,17 @@ const fn freeze_client_reply(message:
Message<GenericHeader>) -> Message<Generic
message
}
+pub trait StreamsFrontend {
+ #[must_use]
+ fn streams(&self) -> &Streams;
+}
+
+impl StreamsFrontend for MuxStateMachine<variadic!(Users, Streams,
ConsumerGroups)> {
+ fn streams(&self) -> &Streams {
+ &self.inner().1.0
+ }
+}
+
#[derive(Debug, Clone)]
#[allow(unused)]
pub struct IggySnapshot {
@@ -245,13 +268,14 @@ pub struct IggyMetadata<C, J, S, M> {
pub snapshot: Option<S>,
/// State machine - lives on all shards
pub mux_stm: M,
+ pub allocator: ConsensusGroupAllocator,
/// Snapshot coordinator - present when persistent checkpointing is
configured.
pub coordinator: Option<SnapshotCoordinator<M>>,
}
impl<C, J, S, M> IggyMetadata<C, J, S, M>
where
- M: FillSnapshot<MetadataSnapshot>,
+ M: StreamsFrontend + FillSnapshot<MetadataSnapshot>,
{
/// Create a new `IggyMetadata` instance.
///
@@ -265,13 +289,15 @@ where
mux_stm: M,
data_dir: Option<std::path::PathBuf>,
) -> Self {
- let coordinator = data_dir
- .map(|dir| SnapshotCoordinator::new(dir, |stm, seq|
IggySnapshot::create(stm, seq)));
+ let allocator =
+
ConsensusGroupAllocator::new(mux_stm.streams().highest_partition_consensus_group_id());
+ let coordinator = data_dir.map(|dir| SnapshotCoordinator::new(dir,
IggySnapshot::create));
Self {
consensus,
journal,
snapshot,
mux_stm,
+ allocator,
coordinator,
}
}
@@ -283,7 +309,8 @@ where
B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
J: JournalHandle,
J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
- M: StateMachine<
+ M: StreamsFrontend
+ + StateMachine<
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
@@ -335,7 +362,19 @@ where
operation: message.header().operation,
},
);
- let prepare = message.project(consensus);
+ let prepare = match self.prepare_request(message) {
+ Ok(prepare) => prepare,
+ Err(error) => {
+ warn!(
+ target: "iggy.metadata.diag",
+ plane = "metadata",
+ replica_id = consensus.replica(),
+ error = %error,
+ "failed to transform metadata request into prepare"
+ );
+ return;
+ }
+ };
pipeline_prepare_common(consensus, PlaneKind::Metadata, prepare,
|prepare| {
self.on_replicate(prepare)
})
@@ -395,38 +434,8 @@ where
panic_if_hash_chain_would_break_in_same_view(&previous, &header);
}
- // Force a checkpoint if the journal is running low on capacity.
- if let Some(coordinator) = &self.coordinator {
- // Use commit_min (locally executed), not commit_max. WAL entries
- // between commit_min+1 and commit_max haven't been applied to the
- // state machine yet, draining them would lose data on crash.
- let snap_op = consensus.commit_min();
- match coordinator
- .checkpoint_if_needed(&self.mux_stm, journal, snap_op)
- .await
- {
- Ok(true) => {
- debug!(
- target: "iggy.metadata.diag",
- plane = "metadata",
- replica_id = consensus.replica(),
- checkpoint_op = snap_op,
- "forced checkpoint completed"
- );
- }
- Ok(false) => {}
- Err(e) => {
- error!(
- target: "iggy.metadata.diag",
- plane = "metadata",
- replica_id = consensus.replica(),
- checkpoint_op = snap_op,
- error = %e,
- "forced checkpoint failed"
- );
- return;
- }
- }
+ if !self.checkpoint_if_needed(consensus, journal).await {
+ return;
}
// TODO: Restore hard assert_eq!(header.op, current_op + 1) once
message repair
@@ -460,6 +469,7 @@ where
return;
}
+ self.observe_prepare_runtime_state(&message);
consensus.sequencer().set_sequence(header.op);
consensus.set_last_prepare_checksum(header.checksum);
@@ -639,12 +649,129 @@ where
P: Pipeline<Entry = PipelineEntry>,
J: JournalHandle,
J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header =
PrepareHeader>,
- M: StateMachine<
+ M: StreamsFrontend
+ + StateMachine<
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
>,
{
+ #[allow(clippy::future_not_send)]
+ async fn checkpoint_if_needed(&self, consensus: &VsrConsensus<B, P>,
journal: &J) -> bool {
+ let Some(coordinator) = &self.coordinator else {
+ return true;
+ };
+
+ // Use commit_min (locally executed), not commit_max. WAL entries
+ // between commit_min+1 and commit_max haven't been applied to the
+ // state machine yet, draining them would lose data on crash.
+ let snap_op = consensus.commit_min();
+ match coordinator
+ .checkpoint_if_needed(&self.mux_stm, journal, snap_op)
+ .await
+ {
+ Ok(true) => {
+ debug!(
+ target: "iggy.metadata.diag",
+ plane = "metadata",
+ replica_id = consensus.replica(),
+ checkpoint_op = snap_op,
+ "forced checkpoint completed"
+ );
+ true
+ }
+ Ok(false) => true,
+ Err(e) => {
+ error!(
+ target: "iggy.metadata.diag",
+ plane = "metadata",
+ replica_id = consensus.replica(),
+ checkpoint_op = snap_op,
+ error = %e,
+ "forced checkpoint failed"
+ );
+ false
+ }
+ }
+ }
+
+ #[allow(clippy::too_many_lines)]
+ fn prepare_request(
+ &self,
+ message: Message<RequestHeader>,
+ ) -> Result<Message<PrepareHeader>, iggy_common::IggyError> {
+ let consensus = self.consensus.as_ref().unwrap();
+ let header = *message.header();
+ if !header.operation.is_client_allowed() {
+ return Err(IggyError::InvalidCommand);
+ }
+ let body = &message.as_slice()[size_of::<RequestHeader>()..header.size
as usize];
+
+ match header.operation {
+ Operation::CreateTopic => {
+ let request = WireCreateTopicRequest::decode_from(body)
+ .map_err(|_| IggyError::InvalidCommand)?;
+ let partitions = self
+ .allocator
+ .allocate_many(request.partitions_count as usize)
+ .into_iter()
+ .enumerate()
+ .map(|(partition_id, consensus_group_id)| {
+ Ok(CreatedPartitionAssignment {
+ partition_id: u32::try_from(partition_id)
+ .map_err(|_| IggyError::InvalidCommand)?,
+ consensus_group_id,
+ })
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+ let body = PersistedCreateTopicRequest {
+ request,
+ partitions,
+ }
+ .to_bytes();
+ Ok(build_prepare_message(
+ consensus,
+ &header,
+ Operation::CreateTopicWithAssignments,
+ &body,
+ ))
+ }
+ Operation::CreatePartitions => {
+ let request = WireCreatePartitionsRequest::decode_from(body)
+ .map_err(|_| IggyError::InvalidCommand)?;
+ self.mux_stm
+ .streams()
+ .current_partition_count(&request.stream_id,
&request.topic_id)
+ .ok_or(IggyError::InvalidCommand)?;
+ let partitions = self
+ .allocator
+ .allocate_many(request.partitions_count as usize)
+ .into_iter()
+ .enumerate()
+ .map(|(offset, consensus_group_id)| {
+ Ok(CreatedPartitionAssignment {
+ partition_id: u32::try_from(offset)
+ .map_err(|_| IggyError::InvalidCommand)?,
+ consensus_group_id,
+ })
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+ let body = PersistedCreatePartitionsRequest {
+ request,
+ partitions,
+ }
+ .to_bytes();
+ Ok(build_prepare_message(
+ consensus,
+ &header,
+ Operation::CreatePartitionsWithAssignments,
+ &body,
+ ))
+ }
+ _ => Ok(message.project(consensus)),
+ }
+ }
+
/// Replicate a prepare message to the next replica in the chain.
///
/// Chain replication pattern:
@@ -724,11 +851,41 @@ where
.borrow_mut()
.commit_reply(header.client, session, reply);
}
-
debug!("commit_journal: committed op={op}");
}
}
+ fn observe_prepare_runtime_state(&self, prepare: &Message<PrepareHeader>) {
+ let header = prepare.header();
+ let body = &prepare.as_slice()[size_of::<PrepareHeader>()..header.size
as usize];
+
+ match header.operation {
+ Operation::CreateTopicWithAssignments => {
+ let request = PersistedCreateTopicRequest::decode_from(body)
+ .expect("create topic with assignments prepare must
decode");
+ let highest_consensus_group_id = request
+ .partitions
+ .iter()
+ .map(|partition| partition.consensus_group_id)
+ .max()
+ .expect("create topic with assignments must allocate
partitions");
+ self.allocator.observe(highest_consensus_group_id);
+ }
+ Operation::CreatePartitionsWithAssignments => {
+ let request =
PersistedCreatePartitionsRequest::decode_from(body)
+ .expect("create partitions with assignments prepare must
decode");
+ let highest_consensus_group_id = request
+ .partitions
+ .iter()
+ .map(|partition| partition.consensus_group_id)
+ .max()
+ .expect("create partitions with assignments must allocate
partitions");
+ self.allocator.observe(highest_consensus_group_id);
+ }
+ _ => {}
+ }
+ }
+
#[allow(clippy::future_not_send, clippy::cast_possible_truncation)]
async fn send_prepare_ok(&self, header: &PrepareHeader) {
let consensus = self.consensus.as_ref().unwrap();
@@ -737,3 +894,44 @@ where
send_prepare_ok_common(consensus, header, Some(persisted)).await;
}
}
+
+fn build_prepare_message<B, P>(
+ consensus: &VsrConsensus<B, P>,
+ request: &RequestHeader,
+ operation: Operation,
+ body: &[u8],
+) -> Message<PrepareHeader>
+where
+ B: MessageBus,
+ P: Pipeline<Entry = PipelineEntry>,
+{
+ let op = consensus.sequencer().current_sequence() + 1;
+ let size = size_of::<PrepareHeader>() + body.len();
+ let mut prepare = Message::<PrepareHeader>::new(size);
+ let prepare_bytes = prepare.as_mut_slice();
+ prepare_bytes[size_of::<PrepareHeader>()..size].copy_from_slice(body);
+
+ let header_bytes = &mut prepare_bytes[..size_of::<PrepareHeader>()];
+ let new_header =
bytemuck::checked::try_from_bytes_mut::<PrepareHeader>(header_bytes)
+ .expect("prepare header bytes should be valid");
+ *new_header = PrepareHeader {
+ cluster: consensus.cluster(),
+ size: u32::try_from(size).expect("prepare message size exceeds u32"),
+ view: consensus.view(),
+ release: request.release,
+ command: Command2::Prepare,
+ replica: consensus.replica(),
+ client: request.client,
+ parent: consensus.last_prepare_checksum(),
+ request_checksum: request.request_checksum,
+ request: request.request,
+ commit: consensus.commit_max(),
+ op,
+ timestamp: 0,
+ operation,
+ namespace: request.namespace,
+ ..Default::default()
+ };
+
+ prepare
+}
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index e3d8611f4..fb0eff73c 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -24,6 +24,7 @@ pub mod user;
use bytes::Bytes;
use iggy_common::Either;
use left_right::{Absorb, ReadHandle, WriteHandle};
+use std::cell::Cell;
use std::cell::UnsafeCell;
use std::sync::Arc;
@@ -159,6 +160,46 @@ pub trait StateMachine {
fn update(&self, input: Self::Input) -> Result<Self::Output, Self::Error>;
}
+#[derive(Debug)]
+pub struct ConsensusGroupAllocator {
+ highest: Cell<u64>,
+}
+
+impl ConsensusGroupAllocator {
+ #[must_use]
+ pub const fn new(initial_highest: u64) -> Self {
+ Self {
+ highest: Cell::new(initial_highest),
+ }
+ }
+
+ #[must_use]
+ pub const fn highest(&self) -> u64 {
+ self.highest.get()
+ }
+
+ pub fn observe(&self, assigned: u64) {
+ if assigned > self.highest.get() {
+ self.highest.set(assigned);
+ }
+ }
+
+ #[must_use]
+ ///
+ /// # Panics
+ /// Panics if allocating `count` more group IDs would overflow `u64`.
+ pub fn allocate_many(&self, count: usize) -> Vec<u64> {
+ let mut allocated = Vec::with_capacity(count);
+ let mut current = self.highest.get();
+ for _ in 0..count {
+ current = current.checked_add(1).expect("consensus group id
overflow");
+ allocated.push(current);
+ }
+ self.highest.set(current);
+ allocated
+ }
+}
+
/// Generates the state's inner struct and wrapper type.
///
/// # Generated items
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 89a545297..8ecea2079 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -39,6 +39,11 @@ where
pub const fn new(inner: T) -> Self {
Self { inner }
}
+
+ #[must_use]
+ pub const fn inner(&self) -> &T {
+ &self.inner
+ }
}
impl<T> StateMachine for MuxStateMachine<T>
diff --git a/core/metadata/src/stm/snapshot.rs
b/core/metadata/src/stm/snapshot.rs
index b3f2ffe7c..3e9681de0 100644
--- a/core/metadata/src/stm/snapshot.rs
+++ b/core/metadata/src/stm/snapshot.rs
@@ -296,8 +296,8 @@ macro_rules! impl_fill_restore {
#[cfg(test)]
mod tests {
use super::*;
- use crate::stm::stream::{StatsSnapshot, StreamSnapshot};
- use iggy_common::IggyTimestamp;
+ use crate::stm::stream::{PartitionSnapshot, StatsSnapshot, StreamSnapshot,
TopicSnapshot};
+ use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp,
MaxTopicSize};
#[test]
fn test_metadata_snapshot_roundtrip() {
@@ -329,7 +329,29 @@ mod tests {
messages_count: 50,
segments_count: 2,
},
- topics: vec![],
+ topics: vec![(
+ 0,
+ TopicSnapshot {
+ id: 0,
+ name: "topic".to_string(),
+ created_at: ts,
+ replication_factor: 1,
+ message_expiry: IggyExpiry::default(),
+ compression_algorithm:
CompressionAlgorithm::default(),
+ max_topic_size: MaxTopicSize::default(),
+ stats: StatsSnapshot {
+ size_bytes: 256,
+ messages_count: 12,
+ segments_count: 1,
+ },
+ partitions: vec![PartitionSnapshot {
+ id: 0,
+ consensus_group_id: 33,
+ created_at: ts,
+ }],
+ round_robin_counter: 0,
+ },
+ )],
},
)],
});
@@ -351,7 +373,10 @@ mod tests {
assert_eq!(stream.stats.size_bytes, 1024);
assert_eq!(stream.stats.messages_count, 50);
assert_eq!(stream.stats.segments_count, 2);
- assert_eq!(stream.topics.len(), 0);
+ assert_eq!(stream.topics.len(), 1);
+ let (_, topic) = &stream.topics[0];
+ assert_eq!(topic.partitions.len(), 1);
+ assert_eq!(topic.partitions[0].consensus_group_id, 33);
}
#[test]
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index dfb7e9e77..3aea30bd7 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -23,13 +23,13 @@ use ahash::AHashMap;
use bytes::Bytes;
use iggy_binary_protocol::WireIdentifier;
use iggy_binary_protocol::requests::partitions::{
- CreatePartitionsRequest, DeletePartitionsRequest,
+ CreatePartitionsWithAssignmentsRequest, DeletePartitionsRequest,
};
use iggy_binary_protocol::requests::streams::{
CreateStreamRequest, DeleteStreamRequest, PurgeStreamRequest,
UpdateStreamRequest,
};
use iggy_binary_protocol::requests::topics::{
- CreateTopicRequest, DeleteTopicRequest, PurgeTopicRequest,
UpdateTopicRequest,
+ CreateTopicWithAssignmentsRequest, DeleteTopicRequest, PurgeTopicRequest,
UpdateTopicRequest,
};
use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp,
MaxTopicSize};
use serde::{Deserialize, Serialize};
@@ -41,19 +41,25 @@ use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PartitionSnapshot {
pub id: usize,
+ pub consensus_group_id: u64,
pub created_at: IggyTimestamp,
}
#[derive(Debug, Clone)]
pub struct Partition {
pub id: usize,
+ pub consensus_group_id: u64,
pub created_at: IggyTimestamp,
}
impl Partition {
#[must_use]
- pub const fn new(id: usize, created_at: IggyTimestamp) -> Self {
- Self { id, created_at }
+ pub const fn new(id: usize, consensus_group_id: u64, created_at:
IggyTimestamp) -> Self {
+ Self {
+ id,
+ consensus_group_id,
+ created_at,
+ }
}
}
@@ -223,11 +229,11 @@ collect_handlers! {
UpdateStream,
DeleteStream,
PurgeStream,
- CreateTopic,
+ CreateTopicWithAssignments,
UpdateTopic,
DeleteTopic,
PurgeTopic,
- CreatePartitions,
+ CreatePartitionsWithAssignments,
DeletePartitions,
}
}
@@ -263,6 +269,55 @@ impl StreamsInner {
}
}
+impl Streams {
+ #[must_use]
+ pub fn partition_count_context(
+ &self,
+ stream_id: &WireIdentifier,
+ topic_id: &WireIdentifier,
+ ) -> Option<((usize, usize), u32)> {
+ self.inner.read(|inner| {
+ let stream_id = inner.resolve_stream_id(stream_id)?;
+ let topic_id = inner.resolve_topic_id(stream_id, topic_id)?;
+ let stream = inner.items.get(stream_id)?;
+ let topic = stream.topics.get(topic_id)?;
+ let next_partition_id = topic
+ .partitions
+ .iter()
+ .map(|partition| partition.id)
+ .max()
+ .and_then(|partition_id| partition_id.checked_add(1))
+ .and_then(|partition_id| u32::try_from(partition_id).ok())
+ .unwrap_or(0);
+ Some(((stream_id, topic_id), next_partition_id))
+ })
+ }
+
+ #[must_use]
+ pub fn current_partition_count(
+ &self,
+ stream_id: &WireIdentifier,
+ topic_id: &WireIdentifier,
+ ) -> Option<u32> {
+ self.partition_count_context(stream_id, topic_id)
+ .map(|(_, next_partition_id)| next_partition_id)
+ }
+
+ #[must_use]
+ pub fn highest_partition_consensus_group_id(&self) -> u64 {
+ self.inner.read(|inner| {
+ inner
+ .items
+ .iter()
+ .flat_map(|(_, stream)| stream.topics.iter())
+ .flat_map(|(_, topic)| topic.partitions.iter())
+ .map(|partition| partition.consensus_group_id)
+ .max()
+ .unwrap_or(0)
+ })
+ }
+}
+
// TODO(hubcio): Serialize proper reply (e.g. assigned stream ID) instead of
empty Bytes.
impl StateHandler for CreateStreamRequest {
type State = StreamsInner;
@@ -340,25 +395,25 @@ impl StateHandler for PurgeStreamRequest {
}
// TODO(hubcio): Serialize proper reply (e.g. assigned topic ID) instead of
empty Bytes.
-impl StateHandler for CreateTopicRequest {
+impl StateHandler for CreateTopicWithAssignmentsRequest {
type State = StreamsInner;
fn apply(&self, state: &mut StreamsInner) -> Bytes {
- let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+ let Some(stream_id) = state.resolve_stream_id(&self.request.stream_id)
else {
return Bytes::new();
};
let Some(stream) = state.items.get_mut(stream_id) else {
return Bytes::new();
};
- let name_arc: Arc<str> = Arc::from(self.name.as_str());
+ let name_arc: Arc<str> = Arc::from(self.request.name.as_str());
if stream.topic_index.contains_key(&name_arc) {
return Bytes::new();
}
- let replication_factor = if self.replication_factor == 0 {
+ let replication_factor = if self.request.replication_factor == 0 {
1
} else {
- self.replication_factor
+ self.request.replication_factor
};
let topic = Topic {
@@ -366,10 +421,12 @@ impl StateHandler for CreateTopicRequest {
name: name_arc.clone(),
created_at: IggyTimestamp::now(),
replication_factor,
- message_expiry: IggyExpiry::from(self.message_expiry),
- compression_algorithm:
CompressionAlgorithm::from_code(self.compression_algorithm)
- .unwrap_or_default(),
- max_topic_size: MaxTopicSize::from(self.max_topic_size),
+ message_expiry: IggyExpiry::from(self.request.message_expiry),
+ compression_algorithm: CompressionAlgorithm::from_code(
+ self.request.compression_algorithm,
+ )
+ .unwrap_or_default(),
+ max_topic_size: MaxTopicSize::from(self.request.max_topic_size),
stats: Arc::new(TopicStats::new(stream.stats.clone())),
partitions: Vec::new(),
round_robin_counter: Arc::new(AtomicUsize::new(0)),
@@ -379,9 +436,10 @@ impl StateHandler for CreateTopicRequest {
if let Some(topic) = stream.topics.get_mut(topic_id) {
topic.id = topic_id;
- for partition_id in 0..self.partitions_count as usize {
+ for partition in &self.partitions {
let partition = Partition {
- id: partition_id,
+ id: partition.partition_id as usize,
+ consensus_group_id: partition.consensus_group_id,
created_at: IggyTimestamp::now(),
};
topic.partitions.push(partition);
@@ -462,13 +520,13 @@ impl StateHandler for PurgeTopicRequest {
}
// TODO(hubcio): Serialize proper reply (e.g. assigned partition IDs) instead
of empty Bytes.
-impl StateHandler for CreatePartitionsRequest {
+impl StateHandler for CreatePartitionsWithAssignmentsRequest {
type State = StreamsInner;
fn apply(&self, state: &mut StreamsInner) -> Bytes {
- let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+ let Some(stream_id) = state.resolve_stream_id(&self.request.stream_id)
else {
return Bytes::new();
};
- let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id)
else {
+ let Some(topic_id) = state.resolve_topic_id(stream_id,
&self.request.topic_id) else {
return Bytes::new();
};
@@ -479,11 +537,28 @@ impl StateHandler for CreatePartitionsRequest {
return Bytes::new();
};
- let current_partition_count = topic.partitions.len();
- for i in 0..self.partitions_count as usize {
- let partition_id = current_partition_count + i;
+ let base_partition_id = topic
+ .partitions
+ .iter()
+ .map(|partition| partition.id)
+ .max()
+ .and_then(|partition_id| partition_id.checked_add(1))
+ .unwrap_or(0);
+ let Ok(base_partition_id) = u32::try_from(base_partition_id) else {
+ return Bytes::new();
+ };
+
+ for partition in &self.partitions {
+ let partition_id = partition
+ .partition_id
+ .checked_add(base_partition_id)
+ .and_then(|partition_id| usize::try_from(partition_id).ok());
+ let Some(partition_id) = partition_id else {
+ return Bytes::new();
+ };
let partition = Partition {
id: partition_id,
+ consensus_group_id: partition.consensus_group_id,
created_at: IggyTimestamp::now(),
};
topic.partitions.push(partition);
@@ -561,6 +636,7 @@ impl Snapshotable for Streams {
.iter()
.map(|p| PartitionSnapshot {
id: p.id,
+ consensus_group_id:
p.consensus_group_id,
created_at: p.created_at,
})
.collect(),
@@ -630,6 +706,7 @@ impl Snapshotable for Streams {
.into_iter()
.map(|p| Partition {
id: p.id,
+ consensus_group_id: p.consensus_group_id,
created_at: p.created_at,
})
.collect(),
@@ -666,3 +743,118 @@ impl Snapshotable for Streams {
}
impl_fill_restore!(Streams, streams);
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use iggy_binary_protocol::WireName;
+ use
iggy_binary_protocol::primitives::partition_assignment::CreatedPartitionAssignment;
+ use iggy_binary_protocol::requests::partitions::{
+ CreatePartitionsRequest as WireCreatePartitionsRequest,
+ CreatePartitionsWithAssignmentsRequest,
+ };
+ use iggy_binary_protocol::requests::topics::{
+ CreateTopicRequest as WireCreateTopicRequest,
CreateTopicWithAssignmentsRequest,
+ };
+
+ fn create_stream(inner: &mut StreamsInner, name: &str) {
+ let request = CreateStreamRequest {
+ name: WireName::new(name).unwrap(),
+ };
+ let _ = StateHandler::apply(&request, inner);
+ }
+
+ fn make_topic_request(
+ stream_id: u32,
+ partitions_count: u32,
+ name: &str,
+ ) -> WireCreateTopicRequest {
+ WireCreateTopicRequest {
+ stream_id: WireIdentifier::numeric(stream_id),
+ partitions_count,
+ compression_algorithm: 0,
+ message_expiry: 0,
+ max_topic_size: 0,
+ replication_factor: 1,
+ name: WireName::new(name).unwrap(),
+ }
+ }
+
+ #[test]
+ fn current_partition_count_scans_existing_topic_state() {
+ let mut inner = StreamsInner::new();
+ create_stream(&mut inner, "stream");
+ let create_topic = CreateTopicWithAssignmentsRequest {
+ request: make_topic_request(0, 2, "topic"),
+ partitions: vec![
+ CreatedPartitionAssignment {
+ partition_id: 0,
+ consensus_group_id: 1,
+ },
+ CreatedPartitionAssignment {
+ partition_id: 1,
+ consensus_group_id: 2,
+ },
+ ],
+ };
+ let _ = StateHandler::apply(&create_topic, &mut inner);
+ let streams: Streams = inner.into();
+
+ assert_eq!(
+ streams
+ .current_partition_count(&WireIdentifier::numeric(0),
&WireIdentifier::numeric(0)),
+ Some(2)
+ );
+ }
+
+ #[test]
+ fn applying_enriched_create_commands_stores_consensus_group_ids() {
+ let mut inner = StreamsInner::new();
+ create_stream(&mut inner, "stream");
+ let create_topic = CreateTopicWithAssignmentsRequest {
+ request: make_topic_request(0, 2, "topic"),
+ partitions: vec![
+ CreatedPartitionAssignment {
+ partition_id: 0,
+ consensus_group_id: 10,
+ },
+ CreatedPartitionAssignment {
+ partition_id: 1,
+ consensus_group_id: 11,
+ },
+ ],
+ };
+ let _ = StateHandler::apply(&create_topic, &mut inner);
+
+ let create_partitions = CreatePartitionsWithAssignmentsRequest {
+ request: WireCreatePartitionsRequest {
+ stream_id: WireIdentifier::numeric(0),
+ topic_id: WireIdentifier::numeric(0),
+ partitions_count: 2,
+ },
+ partitions: vec![
+ CreatedPartitionAssignment {
+ partition_id: 0,
+ consensus_group_id: 12,
+ },
+ CreatedPartitionAssignment {
+ partition_id: 1,
+ consensus_group_id: 13,
+ },
+ ],
+ };
+ let _ = StateHandler::apply(&create_partitions, &mut inner);
+
+ assert_eq!(inner.items[0].topics[0].partitions.len(), 4);
+ assert_eq!(inner.items[0].topics[0].partitions[2].id, 2);
+ assert_eq!(inner.items[0].topics[0].partitions[3].id, 3);
+ assert_eq!(
+ inner.items[0].topics[0].partitions[0].consensus_group_id,
+ 10
+ );
+ assert_eq!(
+ inner.items[0].topics[0].partitions[3].consensus_group_id,
+ 13
+ );
+ }
+}
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 47bd5a9b8..d5a65007c 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -31,6 +31,7 @@ use iggy_common::{PartitionStats, sharding::IggyNamespace};
use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
use metadata::IggyMetadata;
+use metadata::impls::metadata::StreamsFrontend;
use metadata::stm::StateMachine;
use partitions::{IggyPartition, IggyPartitions};
use shards_table::ShardsTable;
@@ -240,7 +241,7 @@ where
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
- >,
+ > + StreamsFrontend,
{
match MessageBag::try_from(message) {
Ok(MessageBag::Request(request)) => self.on_request(request).await,
@@ -270,7 +271,7 @@ where
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
- >,
+ > + StreamsFrontend,
{
self.plane.on_request(request).await;
}
@@ -289,7 +290,7 @@ where
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
- >,
+ > + StreamsFrontend,
{
self.plane.on_replicate(prepare).await;
}
@@ -308,7 +309,7 @@ where
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
- >,
+ > + StreamsFrontend,
{
self.plane.on_ack(prepare_ok).await;
}
@@ -338,7 +339,7 @@ where
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
- >,
+ > + StreamsFrontend,
{
debug_assert!(buf.is_empty(), "buf must be empty on entry");
@@ -477,7 +478,8 @@ where
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
- M: StateMachine<
+ M: StreamsFrontend
+ + StateMachine<
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
@@ -589,7 +591,8 @@ where
Entry = Message<PrepareHeader>,
Header = PrepareHeader,
>,
- M: StateMachine<
+ M: StreamsFrontend
+ + StateMachine<
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 730d51789..4fd6550c6 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -24,6 +24,7 @@ use iggy_binary_protocol::{
use iggy_common::sharding::IggyNamespace;
use journal::{Journal, JournalHandle};
use message_bus::MessageBus;
+use metadata::impls::metadata::StreamsFrontend;
use metadata::stm::StateMachine;
/// Inter-shard dispatch logic.
@@ -213,7 +214,7 @@ where
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
- >,
+ > + StreamsFrontend,
{
loop {
futures::select! {
@@ -247,7 +248,7 @@ where
Input = Message<PrepareHeader>,
Output = bytes::Bytes,
Error = iggy_common::IggyError,
- >,
+ > + StreamsFrontend,
{
self.on_message(frame.message).await;
// TODO: once on_message returns an R (e.g. ShardResponse), send it