This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch consensus_group_impl
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit 11dc5ea874a9875662df8cf9799acea5fc7dc26e
Author: numinex <[email protected]>
AuthorDate: Fri Apr 17 11:44:26 2026 +0200

    feat(consensus): impl consensus group mechanism
---
 core/binary_protocol/src/consensus/operation.rs    |  50 +++-
 core/binary_protocol/src/dispatch.rs               |   5 +-
 .../create_partitions_with_assignments.rs          | 124 +++++++++
 .../binary_protocol/src/requests/partitions/mod.rs |   4 +
 .../requests/partitions/partition_assignment.rs    |  75 ++++++
 .../topics/create_topic_with_assignments.rs        | 131 ++++++++++
 core/binary_protocol/src/requests/topics/mod.rs    |   2 +
 core/consensus/src/observability.rs                |   2 +
 core/metadata/Cargo.toml                           |   2 +-
 core/metadata/src/impls/metadata.rs                | 282 ++++++++++++++++++---
 core/metadata/src/stm/mod.rs                       |  41 +++
 core/metadata/src/stm/mux.rs                       |   5 +
 core/metadata/src/stm/snapshot.rs                  |  33 ++-
 core/metadata/src/stm/stream.rs                    | 237 +++++++++++++++--
 core/shard/src/lib.rs                              |  17 +-
 core/shard/src/router.rs                           |   5 +-
 16 files changed, 928 insertions(+), 87 deletions(-)

diff --git a/core/binary_protocol/src/consensus/operation.rs 
b/core/binary_protocol/src/consensus/operation.rs
index 7f0a7f5a9..f0a08a29d 100644
--- a/core/binary_protocol/src/consensus/operation.rs
+++ b/core/binary_protocol/src/consensus/operation.rs
@@ -32,6 +32,9 @@ pub enum Operation {
     /// but skips state machine dispatch at commit time, the metadata
     /// plane calls `commit_register` directly. Session number = commit op.
     Register = 1,
+    // Internal metadata operations (journal / replica-only)
+    CreateTopicWithAssignments = 64,
+    CreatePartitionsWithAssignments = 65,
 
     // Metadata operations (shard 0)
     CreateStream = 128,
@@ -65,10 +68,25 @@ pub enum Operation {
 }
 
 impl Operation {
+    pub const INTERNAL_START: u8 = Self::CreateTopicWithAssignments as u8;
+    pub const METADATA_START: u8 = Self::CreateStream as u8;
+    pub const PARTITION_START: u8 = Self::SendMessages as u8;
+
+    /// Internal-only operations reserved for replica / journal use.
+    #[must_use]
+    #[inline]
+    pub const fn is_internal(&self) -> bool {
+        (*self as u8) >= Self::INTERNAL_START && (*self as u8) < 
Self::METADATA_START
+    }
+
     /// Metadata / control-plane operations handled by shard 0.
     #[must_use]
     #[inline]
     pub const fn is_metadata(&self) -> bool {
+        if self.is_internal() {
+            return true;
+        }
+
         matches!(
             self,
             Self::CreateStream
@@ -105,13 +123,14 @@ impl Operation {
     #[must_use]
     #[inline]
     pub const fn is_partition(&self) -> bool {
-        matches!(
-            self,
-            Self::SendMessages
-                | Self::StoreConsumerOffset
-                | Self::DeleteConsumerOffset
-                | Self::DeleteSegments
-        )
+        matches!(self, Self::DeleteSegments) || (*self as u8) >= 
Self::PARTITION_START
+    }
+
+    /// Operations clients are allowed to send directly.
+    #[must_use]
+    #[inline]
+    pub const fn is_client_allowed(&self) -> bool {
+        !matches!(self, Self::Reserved) && !self.is_internal()
     }
 
     /// Bidirectional mapping: `Operation` -> client command code.
@@ -120,7 +139,10 @@ impl Operation {
     #[must_use]
     pub const fn to_command_code(&self) -> Option<u32> {
         match self {
-            Self::Reserved | Self::Register => None,
+            Self::Reserved
+            | Self::Register
+            | Self::CreateTopicWithAssignments
+            | Self::CreatePartitionsWithAssignments => None,
             Self::CreateStream
             | Self::UpdateStream
             | Self::DeleteStream
@@ -217,6 +239,14 @@ mod tests {
         assert!(!Operation::SendMessages.is_vsr_reserved());
         assert!(!Operation::Register.is_metadata());
         assert!(!Operation::Register.is_partition());
+        assert_eq!(
+            Operation::CreateTopicWithAssignments.to_command_code(),
+            None
+        );
+        assert_eq!(
+            Operation::CreatePartitionsWithAssignments.to_command_code(),
+            None
+        );
     }
 
     #[test]
@@ -230,8 +260,12 @@ mod tests {
 
     #[test]
     fn metadata_vs_partition() {
+        assert!(Operation::CreateTopicWithAssignments.is_internal());
+        assert!(Operation::CreateTopicWithAssignments.is_metadata());
+        assert!(!Operation::CreateTopicWithAssignments.is_client_allowed());
         assert!(Operation::CreateStream.is_metadata());
         assert!(!Operation::CreateStream.is_partition());
+        assert!(Operation::CreateStream.is_client_allowed());
         assert!(Operation::SendMessages.is_partition());
         assert!(!Operation::SendMessages.is_metadata());
         assert!(Operation::DeleteSegments.is_partition());
diff --git a/core/binary_protocol/src/dispatch.rs 
b/core/binary_protocol/src/dispatch.rs
index e79432f38..fb5e33683 100644
--- a/core/binary_protocol/src/dispatch.rs
+++ b/core/binary_protocol/src/dispatch.rs
@@ -270,7 +270,10 @@ pub const fn lookup_by_operation(op: Operation) -> 
Option<&'static CommandMeta>
         Operation::SendMessages => 22,
         Operation::StoreConsumerOffset => 25,
         Operation::DeleteConsumerOffset => 26,
-        Operation::Reserved | Operation::Register => return None,
+        Operation::CreateTopicWithAssignments
+        | Operation::CreatePartitionsWithAssignments
+        | Operation::Reserved
+        | Operation::Register => return None,
     };
     Some(&COMMAND_TABLE[idx])
 }
diff --git 
a/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs
 
b/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs
new file mode 100644
index 000000000..7ea85b5a9
--- /dev/null
+++ 
b/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le};
+use crate::requests::partitions::{CreatePartitionsRequest, 
CreatedPartitionAssignment};
+use bytes::{BufMut, BytesMut};
+
+fn usize_to_u32(value: usize, context: &str) -> u32 {
+    u32::try_from(value).unwrap_or_else(|_| panic!("{context} exceeds u32"))
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreatePartitionsWithAssignmentsRequest {
+    pub request: CreatePartitionsRequest,
+    pub partitions: Vec<CreatedPartitionAssignment>,
+}
+
+impl WireEncode for CreatePartitionsWithAssignmentsRequest {
+    fn encoded_size(&self) -> usize {
+        4 + self.request.encoded_size()
+            + 4
+            + self
+                .partitions
+                .iter()
+                .map(WireEncode::encoded_size)
+                .sum::<usize>()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(usize_to_u32(
+            self.request.encoded_size(),
+            "create partitions request size",
+        ));
+        self.request.encode(buf);
+        buf.put_u32_le(usize_to_u32(
+            self.partitions.len(),
+            "create partitions partition count",
+        ));
+        for partition in &self.partitions {
+            partition.encode(buf);
+        }
+    }
+}
+
+impl WireDecode for CreatePartitionsWithAssignmentsRequest {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let request_size = read_u32_le(buf, 0)? as usize;
+        let request_start = 4;
+        let request_end = request_start + request_size;
+        if buf.len() < request_end {
+            return Err(WireError::UnexpectedEof {
+                offset: request_start,
+                need: request_size,
+                have: buf.len().saturating_sub(request_start),
+            });
+        }
+
+        let request = 
CreatePartitionsRequest::decode_from(&buf[request_start..request_end])?;
+        let partitions_count = read_u32_le(buf, request_end)? as usize;
+        let mut offset = request_end + 4;
+        let mut partitions = Vec::with_capacity(partitions_count);
+        for _ in 0..partitions_count {
+            let (partition, consumed) = 
CreatedPartitionAssignment::decode(&buf[offset..])?;
+            offset += consumed;
+            partitions.push(partition);
+        }
+
+        Ok((
+            Self {
+                request,
+                partitions,
+            },
+            offset,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::CreatePartitionsWithAssignmentsRequest;
+    use crate::WireIdentifier;
+    use crate::codec::{WireDecode, WireEncode};
+    use crate::requests::partitions::{CreatePartitionsRequest, 
CreatedPartitionAssignment};
+
+    #[test]
+    fn roundtrip() {
+        let request = CreatePartitionsWithAssignmentsRequest {
+            request: CreatePartitionsRequest {
+                stream_id: WireIdentifier::numeric(1),
+                topic_id: WireIdentifier::numeric(2),
+                partitions_count: 2,
+            },
+            partitions: vec![
+                CreatedPartitionAssignment {
+                    partition_id: 3,
+                    consensus_group_id: 11,
+                },
+                CreatedPartitionAssignment {
+                    partition_id: 4,
+                    consensus_group_id: 12,
+                },
+            ],
+        };
+        let bytes = request.to_bytes();
+        let (decoded, consumed) = 
CreatePartitionsWithAssignmentsRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, request);
+    }
+}
diff --git a/core/binary_protocol/src/requests/partitions/mod.rs 
b/core/binary_protocol/src/requests/partitions/mod.rs
index 8a5d0a1a1..9b88355f5 100644
--- a/core/binary_protocol/src/requests/partitions/mod.rs
+++ b/core/binary_protocol/src/requests/partitions/mod.rs
@@ -16,7 +16,11 @@
 // under the License.
 
 pub mod create_partitions;
+pub mod create_partitions_with_assignments;
 pub mod delete_partitions;
+pub mod partition_assignment;
 
 pub use create_partitions::CreatePartitionsRequest;
+pub use 
create_partitions_with_assignments::CreatePartitionsWithAssignmentsRequest;
 pub use delete_partitions::DeletePartitionsRequest;
+pub use partition_assignment::CreatedPartitionAssignment;
diff --git 
a/core/binary_protocol/src/requests/partitions/partition_assignment.rs 
b/core/binary_protocol/src/requests/partitions/partition_assignment.rs
new file mode 100644
index 000000000..2c36732bd
--- /dev/null
+++ b/core/binary_protocol/src/requests/partitions/partition_assignment.rs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le};
+use bytes::{BufMut, BytesMut};
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreatedPartitionAssignment {
+    pub partition_id: u32,
+    pub consensus_group_id: u64,
+}
+
+impl WireEncode for CreatedPartitionAssignment {
+    fn encoded_size(&self) -> usize {
+        12
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(self.partition_id);
+        buf.put_u64_le(self.consensus_group_id);
+    }
+}
+
+impl WireDecode for CreatedPartitionAssignment {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        if buf.len() < 12 {
+            return Err(WireError::UnexpectedEof {
+                offset: 0,
+                need: 12,
+                have: buf.len(),
+            });
+        }
+
+        Ok((
+            Self {
+                partition_id: read_u32_le(buf, 0)?,
+                consensus_group_id: read_u64_le(buf, 4)?,
+            },
+            12,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::CreatedPartitionAssignment;
+    use crate::codec::{WireDecode, WireEncode};
+
+    #[test]
+    fn roundtrip() {
+        let request = CreatedPartitionAssignment {
+            partition_id: 7,
+            consensus_group_id: 42,
+        };
+        let bytes = request.to_bytes();
+        let (decoded, consumed) = 
CreatedPartitionAssignment::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, request);
+    }
+}
diff --git 
a/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs 
b/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs
new file mode 100644
index 000000000..d9d29c6eb
--- /dev/null
+++ b/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le};
+use crate::requests::partitions::CreatedPartitionAssignment;
+use crate::requests::topics::CreateTopicRequest;
+use bytes::{BufMut, BytesMut};
+
+fn usize_to_u32(value: usize, context: &str) -> u32 {
+    u32::try_from(value).unwrap_or_else(|_| panic!("{context} exceeds u32"))
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreateTopicWithAssignmentsRequest {
+    pub request: CreateTopicRequest,
+    pub partitions: Vec<CreatedPartitionAssignment>,
+}
+
+impl WireEncode for CreateTopicWithAssignmentsRequest {
+    fn encoded_size(&self) -> usize {
+        4 + self.request.encoded_size()
+            + 4
+            + self
+                .partitions
+                .iter()
+                .map(WireEncode::encoded_size)
+                .sum::<usize>()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(usize_to_u32(
+            self.request.encoded_size(),
+            "create topic request size",
+        ));
+        self.request.encode(buf);
+        buf.put_u32_le(usize_to_u32(
+            self.partitions.len(),
+            "create topic partition count",
+        ));
+        for partition in &self.partitions {
+            partition.encode(buf);
+        }
+    }
+}
+
+impl WireDecode for CreateTopicWithAssignmentsRequest {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let request_size = read_u32_le(buf, 0)? as usize;
+        let request_start = 4;
+        let request_end = request_start + request_size;
+        if buf.len() < request_end {
+            return Err(WireError::UnexpectedEof {
+                offset: request_start,
+                need: request_size,
+                have: buf.len().saturating_sub(request_start),
+            });
+        }
+
+        let request = 
CreateTopicRequest::decode_from(&buf[request_start..request_end])?;
+        let partitions_count = read_u32_le(buf, request_end)? as usize;
+        let mut offset = request_end + 4;
+        let mut partitions = Vec::with_capacity(partitions_count);
+        for _ in 0..partitions_count {
+            let (partition, consumed) = 
CreatedPartitionAssignment::decode(&buf[offset..])?;
+            offset += consumed;
+            partitions.push(partition);
+        }
+
+        Ok((
+            Self {
+                request,
+                partitions,
+            },
+            offset,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::CreateTopicWithAssignmentsRequest;
+    use crate::WireIdentifier;
+    use crate::codec::{WireDecode, WireEncode};
+    use crate::primitives::identifier::WireName;
+    use crate::requests::partitions::CreatedPartitionAssignment;
+    use crate::requests::topics::CreateTopicRequest;
+
+    #[test]
+    fn roundtrip() {
+        let request = CreateTopicWithAssignmentsRequest {
+            request: CreateTopicRequest {
+                stream_id: WireIdentifier::numeric(1),
+                partitions_count: 2,
+                compression_algorithm: 1,
+                message_expiry: 3600,
+                max_topic_size: 1024,
+                replication_factor: 1,
+                name: WireName::new("events").unwrap(),
+            },
+            partitions: vec![
+                CreatedPartitionAssignment {
+                    partition_id: 0,
+                    consensus_group_id: 1,
+                },
+                CreatedPartitionAssignment {
+                    partition_id: 1,
+                    consensus_group_id: 2,
+                },
+            ],
+        };
+        let bytes = request.to_bytes();
+        let (decoded, consumed) = 
CreateTopicWithAssignmentsRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, request);
+    }
+}
diff --git a/core/binary_protocol/src/requests/topics/mod.rs 
b/core/binary_protocol/src/requests/topics/mod.rs
index 680105ae3..a3e00a1d4 100644
--- a/core/binary_protocol/src/requests/topics/mod.rs
+++ b/core/binary_protocol/src/requests/topics/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 pub mod create_topic;
+pub mod create_topic_with_assignments;
 pub mod delete_topic;
 pub mod get_topic;
 pub mod get_topics;
@@ -23,6 +24,7 @@ pub mod purge_topic;
 pub mod update_topic;
 
 pub use create_topic::CreateTopicRequest;
+pub use create_topic_with_assignments::CreateTopicWithAssignmentsRequest;
 pub use delete_topic::DeleteTopicRequest;
 pub use get_topic::GetTopicRequest;
 pub use get_topics::GetTopicsRequest;
diff --git a/core/consensus/src/observability.rs 
b/core/consensus/src/observability.rs
index 3306985da..bb042af10 100644
--- a/core/consensus/src/observability.rs
+++ b/core/consensus/src/observability.rs
@@ -618,6 +618,8 @@ pub const fn status_as_str(status: Status) -> &'static str {
 pub const fn operation_as_str(operation: Operation) -> &'static str {
     match operation {
         Operation::Reserved => "reserved",
+        Operation::CreateTopicWithAssignments => 
"create_topic_with_assignments",
+        Operation::CreatePartitionsWithAssignments => 
"create_partitions_with_assignments",
         Operation::CreateStream => "create_stream",
         Operation::UpdateStream => "update_stream",
         Operation::DeleteStream => "delete_stream",
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index c437c7efa..213303a81 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -30,6 +30,7 @@ publish = false
 
 [dependencies]
 ahash = { workspace = true }
+bytemuck = { workspace = true }
 bytes = { workspace = true }
 consensus = { workspace = true }
 iggy_binary_protocol = { workspace = true }
@@ -44,7 +45,6 @@ slab = { workspace = true }
 tracing = { workspace = true }
 
 [dev-dependencies]
-bytemuck = { workspace = true }
 bytes = { workspace = true }
 compio = { workspace = true }
 tempfile = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index c362821ec..87fd04ace 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,8 +14,12 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-use crate::stm::StateMachine;
+use crate::MuxStateMachine;
+use crate::stm::consumer_group::ConsumerGroups;
 use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
+use crate::stm::stream::Streams;
+use crate::stm::user::Users;
+use crate::stm::{ConsensusGroupAllocator, StateMachine};
 use consensus::{
     CommitLogEvent, Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, 
PlaneKind, Project,
     ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, 
ack_preflight,
@@ -24,12 +28,22 @@ use consensus::{
     pipeline_prepare_common, register_preflight, replicate_preflight, 
replicate_to_next_in_chain,
     request_preflight, send_prepare_ok as send_prepare_ok_common,
 };
+use iggy_binary_protocol::requests::partitions::CreatePartitionsRequest as 
WireCreatePartitionsRequest;
+use iggy_binary_protocol::requests::partitions::{
+    CreatePartitionsWithAssignmentsRequest as PersistedCreatePartitionsRequest,
+    CreatedPartitionAssignment,
+};
+use iggy_binary_protocol::requests::topics::CreateTopicRequest as 
WireCreateTopicRequest;
+use iggy_binary_protocol::requests::topics::CreateTopicWithAssignmentsRequest 
as PersistedCreateTopicRequest;
 use iggy_binary_protocol::{
     Command2, ConsensusHeader, GenericHeader, Message, Operation, 
PrepareHeader, PrepareOkHeader,
-    RequestHeader,
+    RequestHeader, WireDecode, WireEncode,
 };
+use iggy_common::IggyError;
+use iggy_common::variadic;
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
+use std::mem::size_of;
 use std::path::Path;
 use tracing::{debug, error, warn};
 
@@ -37,6 +51,17 @@ const fn freeze_client_reply(message: 
Message<GenericHeader>) -> Message<Generic
     message
 }
 
+pub trait StreamsFrontend {
+    #[must_use]
+    fn streams(&self) -> &Streams;
+}
+
+impl StreamsFrontend for MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)> {
+    fn streams(&self) -> &Streams {
+        &self.inner().1.0
+    }
+}
+
 #[derive(Debug, Clone)]
 #[allow(unused)]
 pub struct IggySnapshot {
@@ -245,13 +270,14 @@ pub struct IggyMetadata<C, J, S, M> {
     pub snapshot: Option<S>,
     /// State machine - lives on all shards
     pub mux_stm: M,
+    pub allocator: ConsensusGroupAllocator,
     /// Snapshot coordinator - present when persistent checkpointing is 
configured.
     pub coordinator: Option<SnapshotCoordinator<M>>,
 }
 
 impl<C, J, S, M> IggyMetadata<C, J, S, M>
 where
-    M: FillSnapshot<MetadataSnapshot>,
+    M: StreamsFrontend + FillSnapshot<MetadataSnapshot>,
 {
     /// Create a new `IggyMetadata` instance.
     ///
@@ -265,13 +291,15 @@ where
         mux_stm: M,
         data_dir: Option<std::path::PathBuf>,
     ) -> Self {
-        let coordinator = data_dir
-            .map(|dir| SnapshotCoordinator::new(dir, |stm, seq| 
IggySnapshot::create(stm, seq)));
+        let allocator =
+            
ConsensusGroupAllocator::new(mux_stm.streams().highest_partition_consensus_group_id());
+        let coordinator = data_dir.map(|dir| SnapshotCoordinator::new(dir, 
IggySnapshot::create));
         Self {
             consensus,
             journal,
             snapshot,
             mux_stm,
+            allocator,
             coordinator,
         }
     }
@@ -283,7 +311,8 @@ where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     J: JournalHandle,
     J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
-    M: StateMachine<
+    M: StreamsFrontend
+        + StateMachine<
             Input = Message<PrepareHeader>,
             Output = bytes::Bytes,
             Error = iggy_common::IggyError,
@@ -335,7 +364,19 @@ where
                 operation: message.header().operation,
             },
         );
-        let prepare = message.project(consensus);
+        let prepare = match self.prepare_request(message) {
+            Ok(prepare) => prepare,
+            Err(error) => {
+                warn!(
+                    target: "iggy.metadata.diag",
+                    plane = "metadata",
+                    replica_id = consensus.replica(),
+                    error = %error,
+                    "failed to transform metadata request into prepare"
+                );
+                return;
+            }
+        };
         pipeline_prepare_common(consensus, PlaneKind::Metadata, prepare, 
|prepare| {
             self.on_replicate(prepare)
         })
@@ -395,38 +436,8 @@ where
             panic_if_hash_chain_would_break_in_same_view(&previous, &header);
         }
 
-        // Force a checkpoint if the journal is running low on capacity.
-        if let Some(coordinator) = &self.coordinator {
-            // Use commit_min (locally executed), not commit_max. WAL entries
-            // between commit_min+1 and commit_max haven't been applied to the
-            // state machine yet, draining them would lose data on crash.
-            let snap_op = consensus.commit_min();
-            match coordinator
-                .checkpoint_if_needed(&self.mux_stm, journal, snap_op)
-                .await
-            {
-                Ok(true) => {
-                    debug!(
-                        target: "iggy.metadata.diag",
-                        plane = "metadata",
-                        replica_id = consensus.replica(),
-                        checkpoint_op = snap_op,
-                        "forced checkpoint completed"
-                    );
-                }
-                Ok(false) => {}
-                Err(e) => {
-                    error!(
-                        target: "iggy.metadata.diag",
-                        plane = "metadata",
-                        replica_id = consensus.replica(),
-                        checkpoint_op = snap_op,
-                        error = %e,
-                        "forced checkpoint failed"
-                    );
-                    return;
-                }
-            }
+        if !self.checkpoint_if_needed(consensus, journal).await {
+            return;
         }
 
         // TODO: Restore hard assert_eq!(header.op, current_op + 1) once 
message repair
@@ -460,6 +471,7 @@ where
             return;
         }
 
+        self.observe_prepare_runtime_state(&message);
         consensus.sequencer().set_sequence(header.op);
         consensus.set_last_prepare_checksum(header.checksum);
 
@@ -639,12 +651,129 @@ where
     P: Pipeline<Entry = PipelineEntry>,
     J: JournalHandle,
     J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
-    M: StateMachine<
+    M: StreamsFrontend
+        + StateMachine<
             Input = Message<PrepareHeader>,
             Output = bytes::Bytes,
             Error = iggy_common::IggyError,
         >,
 {
+    #[allow(clippy::future_not_send)]
+    async fn checkpoint_if_needed(&self, consensus: &VsrConsensus<B, P>, 
journal: &J) -> bool {
+        let Some(coordinator) = &self.coordinator else {
+            return true;
+        };
+
+        // Use commit_min (locally executed), not commit_max. WAL entries
+        // between commit_min+1 and commit_max haven't been applied to the
+        // state machine yet, draining them would lose data on crash.
+        let snap_op = consensus.commit_min();
+        match coordinator
+            .checkpoint_if_needed(&self.mux_stm, journal, snap_op)
+            .await
+        {
+            Ok(true) => {
+                debug!(
+                    target: "iggy.metadata.diag",
+                    plane = "metadata",
+                    replica_id = consensus.replica(),
+                    checkpoint_op = snap_op,
+                    "forced checkpoint completed"
+                );
+                true
+            }
+            Ok(false) => true,
+            Err(e) => {
+                error!(
+                    target: "iggy.metadata.diag",
+                    plane = "metadata",
+                    replica_id = consensus.replica(),
+                    checkpoint_op = snap_op,
+                    error = %e,
+                    "forced checkpoint failed"
+                );
+                false
+            }
+        }
+    }
+
+    #[allow(clippy::too_many_lines)]
+    fn prepare_request(
+        &self,
+        message: Message<RequestHeader>,
+    ) -> Result<Message<PrepareHeader>, iggy_common::IggyError> {
+        let consensus = self.consensus.as_ref().unwrap();
+        let header = *message.header();
+        if !header.operation.is_client_allowed() {
+            return Err(IggyError::InvalidCommand);
+        }
+        let body = &message.as_slice()[size_of::<RequestHeader>()..header.size 
as usize];
+
+        match header.operation {
+            Operation::CreateTopic => {
+                let request = WireCreateTopicRequest::decode_from(body)
+                    .map_err(|_| IggyError::InvalidCommand)?;
+                let partitions = self
+                    .allocator
+                    .allocate_many(request.partitions_count as usize)
+                    .into_iter()
+                    .enumerate()
+                    .map(|(partition_id, consensus_group_id)| {
+                        Ok(CreatedPartitionAssignment {
+                            partition_id: u32::try_from(partition_id)
+                                .map_err(|_| IggyError::InvalidCommand)?,
+                            consensus_group_id,
+                        })
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+                let body = PersistedCreateTopicRequest {
+                    request,
+                    partitions,
+                }
+                .to_bytes();
+                Ok(build_prepare_message(
+                    consensus,
+                    &header,
+                    Operation::CreateTopicWithAssignments,
+                    &body,
+                ))
+            }
+            Operation::CreatePartitions => {
+                let request = WireCreatePartitionsRequest::decode_from(body)
+                    .map_err(|_| IggyError::InvalidCommand)?;
+                self.mux_stm
+                    .streams()
+                    .current_partition_count(&request.stream_id, 
&request.topic_id)
+                    .ok_or(IggyError::InvalidCommand)?;
+                let partitions = self
+                    .allocator
+                    .allocate_many(request.partitions_count as usize)
+                    .into_iter()
+                    .enumerate()
+                    .map(|(offset, consensus_group_id)| {
+                        Ok(CreatedPartitionAssignment {
+                            partition_id: u32::try_from(offset)
+                                .map_err(|_| IggyError::InvalidCommand)?,
+                            consensus_group_id,
+                        })
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+                let body = PersistedCreatePartitionsRequest {
+                    request,
+                    partitions,
+                }
+                .to_bytes();
+                Ok(build_prepare_message(
+                    consensus,
+                    &header,
+                    Operation::CreatePartitionsWithAssignments,
+                    &body,
+                ))
+            }
+            _ => Ok(message.project(consensus)),
+        }
+    }
+
     /// Replicate a prepare message to the next replica in the chain.
     ///
     /// Chain replication pattern:
@@ -724,11 +853,41 @@ where
                     .borrow_mut()
                     .commit_reply(header.client, session, reply);
             }
-
             debug!("commit_journal: committed op={op}");
         }
     }
 
+    fn observe_prepare_runtime_state(&self, prepare: &Message<PrepareHeader>) {
+        let header = prepare.header();
+        let body = &prepare.as_slice()[size_of::<PrepareHeader>()..header.size 
as usize];
+
+        match header.operation {
+            Operation::CreateTopicWithAssignments => {
+                let request = PersistedCreateTopicRequest::decode_from(body)
+                    .expect("create topic with assignments prepare must 
decode");
+                let highest_consensus_group_id = request
+                    .partitions
+                    .iter()
+                    .map(|partition| partition.consensus_group_id)
+                    .max()
+                    .expect("create topic with assignments must allocate 
partitions");
+                self.allocator.observe(highest_consensus_group_id);
+            }
+            Operation::CreatePartitionsWithAssignments => {
+                let request = 
PersistedCreatePartitionsRequest::decode_from(body)
+                    .expect("create partitions with assignments prepare must 
decode");
+                let highest_consensus_group_id = request
+                    .partitions
+                    .iter()
+                    .map(|partition| partition.consensus_group_id)
+                    .max()
+                    .expect("create partitions with assignments must allocate 
partitions");
+                self.allocator.observe(highest_consensus_group_id);
+            }
+            _ => {}
+        }
+    }
+
     #[allow(clippy::future_not_send, clippy::cast_possible_truncation)]
     async fn send_prepare_ok(&self, header: &PrepareHeader) {
         let consensus = self.consensus.as_ref().unwrap();
@@ -737,3 +896,44 @@ where
         send_prepare_ok_common(consensus, header, Some(persisted)).await;
     }
 }
+
+fn build_prepare_message<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    request: &RequestHeader,
+    operation: Operation,
+    body: &[u8],
+) -> Message<PrepareHeader>
+where
+    B: MessageBus,
+    P: Pipeline<Entry = PipelineEntry>,
+{
+    let op = consensus.sequencer().current_sequence() + 1;
+    let size = size_of::<PrepareHeader>() + body.len();
+    let mut prepare = Message::<PrepareHeader>::new(size);
+    let prepare_bytes = prepare.as_mut_slice();
+    prepare_bytes[size_of::<PrepareHeader>()..size].copy_from_slice(body);
+
+    let header_bytes = &mut prepare_bytes[..size_of::<PrepareHeader>()];
+    let new_header = 
bytemuck::checked::try_from_bytes_mut::<PrepareHeader>(header_bytes)
+        .expect("prepare header bytes should be valid");
+    *new_header = PrepareHeader {
+        cluster: consensus.cluster(),
+        size: u32::try_from(size).expect("prepare message size exceeds u32"),
+        view: consensus.view(),
+        release: request.release,
+        command: Command2::Prepare,
+        replica: consensus.replica(),
+        client: request.client,
+        parent: consensus.last_prepare_checksum(),
+        request_checksum: request.request_checksum,
+        request: request.request,
+        commit: consensus.commit_max(),
+        op,
+        timestamp: 0,
+        operation,
+        namespace: request.namespace,
+        ..Default::default()
+    };
+
+    prepare
+}
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index e3d8611f4..fb0eff73c 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -24,6 +24,7 @@ pub mod user;
 use bytes::Bytes;
 use iggy_common::Either;
 use left_right::{Absorb, ReadHandle, WriteHandle};
+use std::cell::Cell;
 use std::cell::UnsafeCell;
 use std::sync::Arc;
 
@@ -159,6 +160,46 @@ pub trait StateMachine {
     fn update(&self, input: Self::Input) -> Result<Self::Output, Self::Error>;
 }
 
+#[derive(Debug)]
+pub struct ConsensusGroupAllocator {
+    highest: Cell<u64>,
+}
+
+impl ConsensusGroupAllocator {
+    #[must_use]
+    pub const fn new(initial_highest: u64) -> Self {
+        Self {
+            highest: Cell::new(initial_highest),
+        }
+    }
+
+    #[must_use]
+    pub const fn highest(&self) -> u64 {
+        self.highest.get()
+    }
+
+    pub fn observe(&self, assigned: u64) {
+        if assigned > self.highest.get() {
+            self.highest.set(assigned);
+        }
+    }
+
+    #[must_use]
+    ///
+    /// # Panics
+    /// Panics if allocating `count` more group IDs would overflow `u64`.
+    pub fn allocate_many(&self, count: usize) -> Vec<u64> {
+        let mut allocated = Vec::with_capacity(count);
+        let mut current = self.highest.get();
+        for _ in 0..count {
+            current = current.checked_add(1).expect("consensus group id 
overflow");
+            allocated.push(current);
+        }
+        self.highest.set(current);
+        allocated
+    }
+}
+
 /// Generates the state's inner struct and wrapper type.
 ///
 /// # Generated items
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 89a545297..8ecea2079 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -39,6 +39,11 @@ where
     pub const fn new(inner: T) -> Self {
         Self { inner }
     }
+
+    #[must_use]
+    pub const fn inner(&self) -> &T {
+        &self.inner
+    }
 }
 
 impl<T> StateMachine for MuxStateMachine<T>
diff --git a/core/metadata/src/stm/snapshot.rs 
b/core/metadata/src/stm/snapshot.rs
index b3f2ffe7c..3e9681de0 100644
--- a/core/metadata/src/stm/snapshot.rs
+++ b/core/metadata/src/stm/snapshot.rs
@@ -296,8 +296,8 @@ macro_rules! impl_fill_restore {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::stm::stream::{StatsSnapshot, StreamSnapshot};
-    use iggy_common::IggyTimestamp;
+    use crate::stm::stream::{PartitionSnapshot, StatsSnapshot, StreamSnapshot, 
TopicSnapshot};
+    use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
 
     #[test]
     fn test_metadata_snapshot_roundtrip() {
@@ -329,7 +329,29 @@ mod tests {
                         messages_count: 50,
                         segments_count: 2,
                     },
-                    topics: vec![],
+                    topics: vec![(
+                        0,
+                        TopicSnapshot {
+                            id: 0,
+                            name: "topic".to_string(),
+                            created_at: ts,
+                            replication_factor: 1,
+                            message_expiry: IggyExpiry::default(),
+                            compression_algorithm: 
CompressionAlgorithm::default(),
+                            max_topic_size: MaxTopicSize::default(),
+                            stats: StatsSnapshot {
+                                size_bytes: 256,
+                                messages_count: 12,
+                                segments_count: 1,
+                            },
+                            partitions: vec![PartitionSnapshot {
+                                id: 0,
+                                consensus_group_id: 33,
+                                created_at: ts,
+                            }],
+                            round_robin_counter: 0,
+                        },
+                    )],
                 },
             )],
         });
@@ -351,7 +373,10 @@ mod tests {
         assert_eq!(stream.stats.size_bytes, 1024);
         assert_eq!(stream.stats.messages_count, 50);
         assert_eq!(stream.stats.segments_count, 2);
-        assert_eq!(stream.topics.len(), 0);
+        assert_eq!(stream.topics.len(), 1);
+        let (_, topic) = &stream.topics[0];
+        assert_eq!(topic.partitions.len(), 1);
+        assert_eq!(topic.partitions[0].consensus_group_id, 33);
     }
 
     #[test]
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index dfb7e9e77..0a81d4b4f 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -23,13 +23,13 @@ use ahash::AHashMap;
 use bytes::Bytes;
 use iggy_binary_protocol::WireIdentifier;
 use iggy_binary_protocol::requests::partitions::{
-    CreatePartitionsRequest, DeletePartitionsRequest,
+    CreatePartitionsWithAssignmentsRequest, DeletePartitionsRequest,
 };
 use iggy_binary_protocol::requests::streams::{
     CreateStreamRequest, DeleteStreamRequest, PurgeStreamRequest, 
UpdateStreamRequest,
 };
 use iggy_binary_protocol::requests::topics::{
-    CreateTopicRequest, DeleteTopicRequest, PurgeTopicRequest, 
UpdateTopicRequest,
+    CreateTopicWithAssignmentsRequest, DeleteTopicRequest, PurgeTopicRequest, 
UpdateTopicRequest,
 };
 use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
 use serde::{Deserialize, Serialize};
@@ -41,19 +41,25 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct PartitionSnapshot {
     pub id: usize,
+    pub consensus_group_id: u64,
     pub created_at: IggyTimestamp,
 }
 
 #[derive(Debug, Clone)]
 pub struct Partition {
     pub id: usize,
+    pub consensus_group_id: u64,
     pub created_at: IggyTimestamp,
 }
 
 impl Partition {
     #[must_use]
-    pub const fn new(id: usize, created_at: IggyTimestamp) -> Self {
-        Self { id, created_at }
+    pub const fn new(id: usize, consensus_group_id: u64, created_at: 
IggyTimestamp) -> Self {
+        Self {
+            id,
+            consensus_group_id,
+            created_at,
+        }
     }
 }
 
@@ -223,11 +229,11 @@ collect_handlers! {
         UpdateStream,
         DeleteStream,
         PurgeStream,
-        CreateTopic,
+        CreateTopicWithAssignments,
         UpdateTopic,
         DeleteTopic,
         PurgeTopic,
-        CreatePartitions,
+        CreatePartitionsWithAssignments,
         DeletePartitions,
     }
 }
@@ -263,6 +269,55 @@ impl StreamsInner {
     }
 }
 
+impl Streams {
+    #[must_use]
+    pub fn partition_count_context(
+        &self,
+        stream_id: &WireIdentifier,
+        topic_id: &WireIdentifier,
+    ) -> Option<((usize, usize), u32)> {
+        self.inner.read(|inner| {
+            let stream_id = inner.resolve_stream_id(stream_id)?;
+            let topic_id = inner.resolve_topic_id(stream_id, topic_id)?;
+            let stream = inner.items.get(stream_id)?;
+            let topic = stream.topics.get(topic_id)?;
+            let next_partition_id = topic
+                .partitions
+                .iter()
+                .map(|partition| partition.id)
+                .max()
+                .and_then(|partition_id| partition_id.checked_add(1))
+                .and_then(|partition_id| u32::try_from(partition_id).ok())
+                .unwrap_or(0);
+            Some(((stream_id, topic_id), next_partition_id))
+        })
+    }
+
+    #[must_use]
+    pub fn current_partition_count(
+        &self,
+        stream_id: &WireIdentifier,
+        topic_id: &WireIdentifier,
+    ) -> Option<u32> {
+        self.partition_count_context(stream_id, topic_id)
+            .map(|(_, next_partition_id)| next_partition_id)
+    }
+
+    #[must_use]
+    pub fn highest_partition_consensus_group_id(&self) -> u64 {
+        self.inner.read(|inner| {
+            inner
+                .items
+                .iter()
+                .flat_map(|(_, stream)| stream.topics.iter())
+                .flat_map(|(_, topic)| topic.partitions.iter())
+                .map(|partition| partition.consensus_group_id)
+                .max()
+                .unwrap_or(0)
+        })
+    }
+}
+
 // TODO(hubcio): Serialize proper reply (e.g. assigned stream ID) instead of 
empty Bytes.
 impl StateHandler for CreateStreamRequest {
     type State = StreamsInner;
@@ -340,25 +395,25 @@ impl StateHandler for PurgeStreamRequest {
 }
 
 // TODO(hubcio): Serialize proper reply (e.g. assigned topic ID) instead of 
empty Bytes.
-impl StateHandler for CreateTopicRequest {
+impl StateHandler for CreateTopicWithAssignmentsRequest {
     type State = StreamsInner;
     fn apply(&self, state: &mut StreamsInner) -> Bytes {
-        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+        let Some(stream_id) = state.resolve_stream_id(&self.request.stream_id) 
else {
             return Bytes::new();
         };
         let Some(stream) = state.items.get_mut(stream_id) else {
             return Bytes::new();
         };
 
-        let name_arc: Arc<str> = Arc::from(self.name.as_str());
+        let name_arc: Arc<str> = Arc::from(self.request.name.as_str());
         if stream.topic_index.contains_key(&name_arc) {
             return Bytes::new();
         }
 
-        let replication_factor = if self.replication_factor == 0 {
+        let replication_factor = if self.request.replication_factor == 0 {
             1
         } else {
-            self.replication_factor
+            self.request.replication_factor
         };
 
         let topic = Topic {
@@ -366,10 +421,12 @@ impl StateHandler for CreateTopicRequest {
             name: name_arc.clone(),
             created_at: IggyTimestamp::now(),
             replication_factor,
-            message_expiry: IggyExpiry::from(self.message_expiry),
-            compression_algorithm: 
CompressionAlgorithm::from_code(self.compression_algorithm)
-                .unwrap_or_default(),
-            max_topic_size: MaxTopicSize::from(self.max_topic_size),
+            message_expiry: IggyExpiry::from(self.request.message_expiry),
+            compression_algorithm: CompressionAlgorithm::from_code(
+                self.request.compression_algorithm,
+            )
+            .unwrap_or_default(),
+            max_topic_size: MaxTopicSize::from(self.request.max_topic_size),
             stats: Arc::new(TopicStats::new(stream.stats.clone())),
             partitions: Vec::new(),
             round_robin_counter: Arc::new(AtomicUsize::new(0)),
@@ -379,9 +436,10 @@ impl StateHandler for CreateTopicRequest {
         if let Some(topic) = stream.topics.get_mut(topic_id) {
             topic.id = topic_id;
 
-            for partition_id in 0..self.partitions_count as usize {
+            for partition in &self.partitions {
                 let partition = Partition {
-                    id: partition_id,
+                    id: partition.partition_id as usize,
+                    consensus_group_id: partition.consensus_group_id,
                     created_at: IggyTimestamp::now(),
                 };
                 topic.partitions.push(partition);
@@ -462,13 +520,13 @@ impl StateHandler for PurgeTopicRequest {
 }
 
 // TODO(hubcio): Serialize proper reply (e.g. assigned partition IDs) instead 
of empty Bytes.
-impl StateHandler for CreatePartitionsRequest {
+impl StateHandler for CreatePartitionsWithAssignmentsRequest {
     type State = StreamsInner;
     fn apply(&self, state: &mut StreamsInner) -> Bytes {
-        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+        let Some(stream_id) = state.resolve_stream_id(&self.request.stream_id) 
else {
             return Bytes::new();
         };
-        let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) 
else {
+        let Some(topic_id) = state.resolve_topic_id(stream_id, 
&self.request.topic_id) else {
             return Bytes::new();
         };
 
@@ -479,11 +537,28 @@ impl StateHandler for CreatePartitionsRequest {
             return Bytes::new();
         };
 
-        let current_partition_count = topic.partitions.len();
-        for i in 0..self.partitions_count as usize {
-            let partition_id = current_partition_count + i;
+        let base_partition_id = topic
+            .partitions
+            .iter()
+            .map(|partition| partition.id)
+            .max()
+            .and_then(|partition_id| partition_id.checked_add(1))
+            .unwrap_or(0);
+        let Ok(base_partition_id) = u32::try_from(base_partition_id) else {
+            return Bytes::new();
+        };
+
+        for partition in &self.partitions {
+            let partition_id = partition
+                .partition_id
+                .checked_add(base_partition_id)
+                .and_then(|partition_id| usize::try_from(partition_id).ok());
+            let Some(partition_id) = partition_id else {
+                return Bytes::new();
+            };
             let partition = Partition {
                 id: partition_id,
+                consensus_group_id: partition.consensus_group_id,
                 created_at: IggyTimestamp::now(),
             };
             topic.partitions.push(partition);
@@ -561,6 +636,7 @@ impl Snapshotable for Streams {
                                         .iter()
                                         .map(|p| PartitionSnapshot {
                                             id: p.id,
+                                            consensus_group_id: 
p.consensus_group_id,
                                             created_at: p.created_at,
                                         })
                                         .collect(),
@@ -630,6 +706,7 @@ impl Snapshotable for Streams {
                         .into_iter()
                         .map(|p| Partition {
                             id: p.id,
+                            consensus_group_id: p.consensus_group_id,
                             created_at: p.created_at,
                         })
                         .collect(),
@@ -666,3 +743,117 @@ impl Snapshotable for Streams {
 }
 
 impl_fill_restore!(Streams, streams);
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use iggy_binary_protocol::WireName;
+    use iggy_binary_protocol::requests::partitions::{
+        CreatePartitionsRequest as WireCreatePartitionsRequest,
+        CreatePartitionsWithAssignmentsRequest, CreatedPartitionAssignment,
+    };
+    use iggy_binary_protocol::requests::topics::{
+        CreateTopicRequest as WireCreateTopicRequest, 
CreateTopicWithAssignmentsRequest,
+    };
+
+    fn create_stream(inner: &mut StreamsInner, name: &str) {
+        let request = CreateStreamRequest {
+            name: WireName::new(name).unwrap(),
+        };
+        let _ = StateHandler::apply(&request, inner);
+    }
+
+    fn make_topic_request(
+        stream_id: u32,
+        partitions_count: u32,
+        name: &str,
+    ) -> WireCreateTopicRequest {
+        WireCreateTopicRequest {
+            stream_id: WireIdentifier::numeric(stream_id),
+            partitions_count,
+            compression_algorithm: 0,
+            message_expiry: 0,
+            max_topic_size: 0,
+            replication_factor: 1,
+            name: WireName::new(name).unwrap(),
+        }
+    }
+
+    #[test]
+    fn current_partition_count_scans_existing_topic_state() {
+        let mut inner = StreamsInner::new();
+        create_stream(&mut inner, "stream");
+        let create_topic = CreateTopicWithAssignmentsRequest {
+            request: make_topic_request(0, 2, "topic"),
+            partitions: vec![
+                CreatedPartitionAssignment {
+                    partition_id: 0,
+                    consensus_group_id: 1,
+                },
+                CreatedPartitionAssignment {
+                    partition_id: 1,
+                    consensus_group_id: 2,
+                },
+            ],
+        };
+        let _ = StateHandler::apply(&create_topic, &mut inner);
+        let streams: Streams = inner.into();
+
+        assert_eq!(
+            streams
+                .current_partition_count(&WireIdentifier::numeric(0), 
&WireIdentifier::numeric(0)),
+            Some(2)
+        );
+    }
+
+    #[test]
+    fn applying_enriched_create_commands_stores_consensus_group_ids() {
+        let mut inner = StreamsInner::new();
+        create_stream(&mut inner, "stream");
+        let create_topic = CreateTopicWithAssignmentsRequest {
+            request: make_topic_request(0, 2, "topic"),
+            partitions: vec![
+                CreatedPartitionAssignment {
+                    partition_id: 0,
+                    consensus_group_id: 10,
+                },
+                CreatedPartitionAssignment {
+                    partition_id: 1,
+                    consensus_group_id: 11,
+                },
+            ],
+        };
+        let _ = StateHandler::apply(&create_topic, &mut inner);
+
+        let create_partitions = CreatePartitionsWithAssignmentsRequest {
+            request: WireCreatePartitionsRequest {
+                stream_id: WireIdentifier::numeric(0),
+                topic_id: WireIdentifier::numeric(0),
+                partitions_count: 2,
+            },
+            partitions: vec![
+                CreatedPartitionAssignment {
+                    partition_id: 0,
+                    consensus_group_id: 12,
+                },
+                CreatedPartitionAssignment {
+                    partition_id: 1,
+                    consensus_group_id: 13,
+                },
+            ],
+        };
+        let _ = StateHandler::apply(&create_partitions, &mut inner);
+
+        assert_eq!(inner.items[0].topics[0].partitions.len(), 4);
+        assert_eq!(inner.items[0].topics[0].partitions[2].id, 2);
+        assert_eq!(inner.items[0].topics[0].partitions[3].id, 3);
+        assert_eq!(
+            inner.items[0].topics[0].partitions[0].consensus_group_id,
+            10
+        );
+        assert_eq!(
+            inner.items[0].topics[0].partitions[3].consensus_group_id,
+            13
+        );
+    }
+}
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index 47bd5a9b8..d5a65007c 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -31,6 +31,7 @@ use iggy_common::{PartitionStats, sharding::IggyNamespace};
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
 use metadata::IggyMetadata;
+use metadata::impls::metadata::StreamsFrontend;
 use metadata::stm::StateMachine;
 use partitions::{IggyPartition, IggyPartitions};
 use shards_table::ShardsTable;
@@ -240,7 +241,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         match MessageBag::try_from(message) {
             Ok(MessageBag::Request(request)) => self.on_request(request).await,
@@ -270,7 +271,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         self.plane.on_request(request).await;
     }
@@ -289,7 +290,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         self.plane.on_replicate(prepare).await;
     }
@@ -308,7 +309,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         self.plane.on_ack(prepare_ok).await;
     }
@@ -338,7 +339,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         debug_assert!(buf.is_empty(), "buf must be empty on entry");
 
@@ -477,7 +478,8 @@ where
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
-        M: StateMachine<
+        M: StreamsFrontend
+            + StateMachine<
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
@@ -589,7 +591,8 @@ where
                 Entry = Message<PrepareHeader>,
                 Header = PrepareHeader,
             >,
-        M: StateMachine<
+        M: StreamsFrontend
+            + StateMachine<
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 730d51789..4fd6550c6 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -24,6 +24,7 @@ use iggy_binary_protocol::{
 use iggy_common::sharding::IggyNamespace;
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
+use metadata::impls::metadata::StreamsFrontend;
 use metadata::stm::StateMachine;
 
 /// Inter-shard dispatch logic.
@@ -213,7 +214,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         loop {
             futures::select! {
@@ -247,7 +248,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         self.on_message(frame.message).await;
         // TODO: once on_message returns an R (e.g. ShardResponse), send it

Reply via email to