This is an automated email from the ASF dual-hosted git repository.

gkoszyk pushed a commit to branch consensus_groups
in repository https://gitbox.apache.org/repos/asf/iggy.git

commit f1338b6338140c524cad2a49df9a6e6d937068d6
Author: numinex <[email protected]>
AuthorDate: Wed Apr 15 21:46:18 2026 +0200

    consensus group
---
 core/binary_protocol/src/consensus/operation.rs    |  50 +++++-
 core/binary_protocol/src/dispatch.rs               |   4 +-
 .../create_partitions_with_assignments.rs          | 124 +++++++++++++
 .../binary_protocol/src/requests/partitions/mod.rs |   4 +
 .../requests/partitions/partition_assignment.rs    |  75 ++++++++
 .../topics/create_topic_with_assignments.rs        | 131 ++++++++++++++
 core/binary_protocol/src/requests/topics/mod.rs    |   2 +
 core/consensus/src/observability.rs                |   2 +
 core/metadata/Cargo.toml                           |   2 +-
 core/metadata/src/impls/metadata.rs                | 188 ++++++++++++++++++--
 core/metadata/src/stm/mod.rs                       |  35 ++++
 core/metadata/src/stm/mux.rs                       |   5 +
 core/metadata/src/stm/snapshot.rs                  |  33 +++-
 core/metadata/src/stm/stream.rs                    | 195 ++++++++++++++++++---
 core/shard/src/lib.rs                              |  11 +-
 core/shard/src/router.rs                           |   5 +-
 16 files changed, 811 insertions(+), 55 deletions(-)

diff --git a/core/binary_protocol/src/consensus/operation.rs 
b/core/binary_protocol/src/consensus/operation.rs
index d275269ae..97696df64 100644
--- a/core/binary_protocol/src/consensus/operation.rs
+++ b/core/binary_protocol/src/consensus/operation.rs
@@ -25,6 +25,10 @@ pub enum Operation {
     #[default]
     Reserved = 0,
 
+    // Internal metadata operations (journal / replica-only)
+    CreateTopicWithAssignments = 64,
+    CreatePartitionsWithAssignments = 65,
+
     // Metadata operations (shard 0)
     CreateStream = 128,
     UpdateStream = 129,
@@ -57,10 +61,25 @@ pub enum Operation {
 }
 
 impl Operation {
+    pub const INTERNAL_START: u8 = Self::CreateTopicWithAssignments as u8;
+    pub const METADATA_START: u8 = Self::CreateStream as u8;
+    pub const PARTITION_START: u8 = Self::SendMessages as u8;
+
+    /// Internal-only operations reserved for replica / journal use.
+    #[must_use]
+    #[inline]
+    pub const fn is_internal(&self) -> bool {
+        (*self as u8) >= Self::INTERNAL_START && (*self as u8) < 
Self::METADATA_START
+    }
+
     /// Metadata / control-plane operations handled by shard 0.
     #[must_use]
     #[inline]
     pub const fn is_metadata(&self) -> bool {
+        if self.is_internal() {
+            return true;
+        }
+
         matches!(
             self,
             Self::CreateStream
@@ -89,13 +108,14 @@ impl Operation {
     #[must_use]
     #[inline]
     pub const fn is_partition(&self) -> bool {
-        matches!(
-            self,
-            Self::SendMessages
-                | Self::StoreConsumerOffset
-                | Self::DeleteConsumerOffset
-                | Self::DeleteSegments
-        )
+        matches!(self, Self::DeleteSegments) || (*self as u8) >= 
Self::PARTITION_START
+    }
+
+    /// Operations clients are allowed to send directly.
+    #[must_use]
+    #[inline]
+    pub const fn is_client_allowed(&self) -> bool {
+        !matches!(self, Self::Reserved) && !self.is_internal()
     }
 
     /// Bidirectional mapping: `Operation` -> client command code.
@@ -104,7 +124,9 @@ impl Operation {
     #[must_use]
     pub const fn to_command_code(&self) -> Option<u32> {
         match self {
-            Self::Reserved => None,
+            Self::Reserved
+            | Self::CreateTopicWithAssignments
+            | Self::CreatePartitionsWithAssignments => None,
             Self::CreateStream
             | Self::UpdateStream
             | Self::DeleteStream
@@ -190,6 +212,14 @@ mod tests {
     #[test]
     fn reserved_has_no_code() {
         assert_eq!(Operation::Reserved.to_command_code(), None);
+        assert_eq!(
+            Operation::CreateTopicWithAssignments.to_command_code(),
+            None
+        );
+        assert_eq!(
+            Operation::CreatePartitionsWithAssignments.to_command_code(),
+            None
+        );
     }
 
     #[test]
@@ -203,8 +233,12 @@ mod tests {
 
     #[test]
     fn metadata_vs_partition() {
+        assert!(Operation::CreateTopicWithAssignments.is_internal());
+        assert!(Operation::CreateTopicWithAssignments.is_metadata());
+        assert!(!Operation::CreateTopicWithAssignments.is_client_allowed());
         assert!(Operation::CreateStream.is_metadata());
         assert!(!Operation::CreateStream.is_partition());
+        assert!(Operation::CreateStream.is_client_allowed());
         assert!(Operation::SendMessages.is_partition());
         assert!(!Operation::SendMessages.is_metadata());
         assert!(Operation::DeleteSegments.is_partition());
diff --git a/core/binary_protocol/src/dispatch.rs 
b/core/binary_protocol/src/dispatch.rs
index 1aa9eb828..8aa4c6432 100644
--- a/core/binary_protocol/src/dispatch.rs
+++ b/core/binary_protocol/src/dispatch.rs
@@ -265,7 +265,9 @@ pub const fn lookup_by_operation(op: Operation) -> 
Option<&'static CommandMeta>
         Operation::SendMessages => 21,
         Operation::StoreConsumerOffset => 24,
         Operation::DeleteConsumerOffset => 25,
-        Operation::Reserved => return None,
+        Operation::CreateTopicWithAssignments
+        | Operation::CreatePartitionsWithAssignments
+        | Operation::Reserved => return None,
     };
     Some(&COMMAND_TABLE[idx])
 }
diff --git 
a/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs
 
b/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs
new file mode 100644
index 000000000..7ea85b5a9
--- /dev/null
+++ 
b/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le};
+use crate::requests::partitions::{CreatePartitionsRequest, 
CreatedPartitionAssignment};
+use bytes::{BufMut, BytesMut};
+
+fn usize_to_u32(value: usize, context: &str) -> u32 {
+    u32::try_from(value).unwrap_or_else(|_| panic!("{context} exceeds u32"))
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreatePartitionsWithAssignmentsRequest {
+    pub request: CreatePartitionsRequest,
+    pub partitions: Vec<CreatedPartitionAssignment>,
+}
+
+impl WireEncode for CreatePartitionsWithAssignmentsRequest {
+    fn encoded_size(&self) -> usize {
+        4 + self.request.encoded_size()
+            + 4
+            + self
+                .partitions
+                .iter()
+                .map(WireEncode::encoded_size)
+                .sum::<usize>()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(usize_to_u32(
+            self.request.encoded_size(),
+            "create partitions request size",
+        ));
+        self.request.encode(buf);
+        buf.put_u32_le(usize_to_u32(
+            self.partitions.len(),
+            "create partitions partition count",
+        ));
+        for partition in &self.partitions {
+            partition.encode(buf);
+        }
+    }
+}
+
+impl WireDecode for CreatePartitionsWithAssignmentsRequest {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let request_size = read_u32_le(buf, 0)? as usize;
+        let request_start = 4;
+        let request_end = request_start + request_size;
+        if buf.len() < request_end {
+            return Err(WireError::UnexpectedEof {
+                offset: request_start,
+                need: request_size,
+                have: buf.len().saturating_sub(request_start),
+            });
+        }
+
+        let request = 
CreatePartitionsRequest::decode_from(&buf[request_start..request_end])?;
+        let partitions_count = read_u32_le(buf, request_end)? as usize;
+        let mut offset = request_end + 4;
+        let mut partitions = Vec::with_capacity(partitions_count);
+        for _ in 0..partitions_count {
+            let (partition, consumed) = 
CreatedPartitionAssignment::decode(&buf[offset..])?;
+            offset += consumed;
+            partitions.push(partition);
+        }
+
+        Ok((
+            Self {
+                request,
+                partitions,
+            },
+            offset,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::CreatePartitionsWithAssignmentsRequest;
+    use crate::WireIdentifier;
+    use crate::codec::{WireDecode, WireEncode};
+    use crate::requests::partitions::{CreatePartitionsRequest, 
CreatedPartitionAssignment};
+
+    #[test]
+    fn roundtrip() {
+        let request = CreatePartitionsWithAssignmentsRequest {
+            request: CreatePartitionsRequest {
+                stream_id: WireIdentifier::numeric(1),
+                topic_id: WireIdentifier::numeric(2),
+                partitions_count: 2,
+            },
+            partitions: vec![
+                CreatedPartitionAssignment {
+                    partition_id: 3,
+                    consensus_group_id: 11,
+                },
+                CreatedPartitionAssignment {
+                    partition_id: 4,
+                    consensus_group_id: 12,
+                },
+            ],
+        };
+        let bytes = request.to_bytes();
+        let (decoded, consumed) = 
CreatePartitionsWithAssignmentsRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, request);
+    }
+}
diff --git a/core/binary_protocol/src/requests/partitions/mod.rs 
b/core/binary_protocol/src/requests/partitions/mod.rs
index 8a5d0a1a1..9b88355f5 100644
--- a/core/binary_protocol/src/requests/partitions/mod.rs
+++ b/core/binary_protocol/src/requests/partitions/mod.rs
@@ -16,7 +16,11 @@
 // under the License.
 
 pub mod create_partitions;
+pub mod create_partitions_with_assignments;
 pub mod delete_partitions;
+pub mod partition_assignment;
 
 pub use create_partitions::CreatePartitionsRequest;
+pub use 
create_partitions_with_assignments::CreatePartitionsWithAssignmentsRequest;
 pub use delete_partitions::DeletePartitionsRequest;
+pub use partition_assignment::CreatedPartitionAssignment;
diff --git 
a/core/binary_protocol/src/requests/partitions/partition_assignment.rs 
b/core/binary_protocol/src/requests/partitions/partition_assignment.rs
new file mode 100644
index 000000000..2c36732bd
--- /dev/null
+++ b/core/binary_protocol/src/requests/partitions/partition_assignment.rs
@@ -0,0 +1,75 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le};
+use bytes::{BufMut, BytesMut};
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreatedPartitionAssignment {
+    pub partition_id: u32,
+    pub consensus_group_id: u64,
+}
+
+impl WireEncode for CreatedPartitionAssignment {
+    fn encoded_size(&self) -> usize {
+        12
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(self.partition_id);
+        buf.put_u64_le(self.consensus_group_id);
+    }
+}
+
+impl WireDecode for CreatedPartitionAssignment {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        if buf.len() < 12 {
+            return Err(WireError::UnexpectedEof {
+                offset: 0,
+                need: 12,
+                have: buf.len(),
+            });
+        }
+
+        Ok((
+            Self {
+                partition_id: read_u32_le(buf, 0)?,
+                consensus_group_id: read_u64_le(buf, 4)?,
+            },
+            12,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::CreatedPartitionAssignment;
+    use crate::codec::{WireDecode, WireEncode};
+
+    #[test]
+    fn roundtrip() {
+        let request = CreatedPartitionAssignment {
+            partition_id: 7,
+            consensus_group_id: 42,
+        };
+        let bytes = request.to_bytes();
+        let (decoded, consumed) = 
CreatedPartitionAssignment::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, request);
+    }
+}
diff --git 
a/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs 
b/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs
new file mode 100644
index 000000000..d9d29c6eb
--- /dev/null
+++ b/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs
@@ -0,0 +1,131 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::WireError;
+use crate::codec::{WireDecode, WireEncode, read_u32_le};
+use crate::requests::partitions::CreatedPartitionAssignment;
+use crate::requests::topics::CreateTopicRequest;
+use bytes::{BufMut, BytesMut};
+
+fn usize_to_u32(value: usize, context: &str) -> u32 {
+    u32::try_from(value).unwrap_or_else(|_| panic!("{context} exceeds u32"))
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct CreateTopicWithAssignmentsRequest {
+    pub request: CreateTopicRequest,
+    pub partitions: Vec<CreatedPartitionAssignment>,
+}
+
+impl WireEncode for CreateTopicWithAssignmentsRequest {
+    fn encoded_size(&self) -> usize {
+        4 + self.request.encoded_size()
+            + 4
+            + self
+                .partitions
+                .iter()
+                .map(WireEncode::encoded_size)
+                .sum::<usize>()
+    }
+
+    fn encode(&self, buf: &mut BytesMut) {
+        buf.put_u32_le(usize_to_u32(
+            self.request.encoded_size(),
+            "create topic request size",
+        ));
+        self.request.encode(buf);
+        buf.put_u32_le(usize_to_u32(
+            self.partitions.len(),
+            "create topic partition count",
+        ));
+        for partition in &self.partitions {
+            partition.encode(buf);
+        }
+    }
+}
+
+impl WireDecode for CreateTopicWithAssignmentsRequest {
+    fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> {
+        let request_size = read_u32_le(buf, 0)? as usize;
+        let request_start = 4;
+        let request_end = request_start + request_size;
+        if buf.len() < request_end {
+            return Err(WireError::UnexpectedEof {
+                offset: request_start,
+                need: request_size,
+                have: buf.len().saturating_sub(request_start),
+            });
+        }
+
+        let request = 
CreateTopicRequest::decode_from(&buf[request_start..request_end])?;
+        let partitions_count = read_u32_le(buf, request_end)? as usize;
+        let mut offset = request_end + 4;
+        let mut partitions = Vec::with_capacity(partitions_count);
+        for _ in 0..partitions_count {
+            let (partition, consumed) = 
CreatedPartitionAssignment::decode(&buf[offset..])?;
+            offset += consumed;
+            partitions.push(partition);
+        }
+
+        Ok((
+            Self {
+                request,
+                partitions,
+            },
+            offset,
+        ))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::CreateTopicWithAssignmentsRequest;
+    use crate::WireIdentifier;
+    use crate::codec::{WireDecode, WireEncode};
+    use crate::primitives::identifier::WireName;
+    use crate::requests::partitions::CreatedPartitionAssignment;
+    use crate::requests::topics::CreateTopicRequest;
+
+    #[test]
+    fn roundtrip() {
+        let request = CreateTopicWithAssignmentsRequest {
+            request: CreateTopicRequest {
+                stream_id: WireIdentifier::numeric(1),
+                partitions_count: 2,
+                compression_algorithm: 1,
+                message_expiry: 3600,
+                max_topic_size: 1024,
+                replication_factor: 1,
+                name: WireName::new("events").unwrap(),
+            },
+            partitions: vec![
+                CreatedPartitionAssignment {
+                    partition_id: 0,
+                    consensus_group_id: 1,
+                },
+                CreatedPartitionAssignment {
+                    partition_id: 1,
+                    consensus_group_id: 2,
+                },
+            ],
+        };
+        let bytes = request.to_bytes();
+        let (decoded, consumed) = 
CreateTopicWithAssignmentsRequest::decode(&bytes).unwrap();
+        assert_eq!(consumed, bytes.len());
+        assert_eq!(decoded, request);
+    }
+}
diff --git a/core/binary_protocol/src/requests/topics/mod.rs 
b/core/binary_protocol/src/requests/topics/mod.rs
index 680105ae3..a3e00a1d4 100644
--- a/core/binary_protocol/src/requests/topics/mod.rs
+++ b/core/binary_protocol/src/requests/topics/mod.rs
@@ -16,6 +16,7 @@
 // under the License.
 
 pub mod create_topic;
+pub mod create_topic_with_assignments;
 pub mod delete_topic;
 pub mod get_topic;
 pub mod get_topics;
@@ -23,6 +24,7 @@ pub mod purge_topic;
 pub mod update_topic;
 
 pub use create_topic::CreateTopicRequest;
+pub use create_topic_with_assignments::CreateTopicWithAssignmentsRequest;
 pub use delete_topic::DeleteTopicRequest;
 pub use get_topic::GetTopicRequest;
 pub use get_topics::GetTopicsRequest;
diff --git a/core/consensus/src/observability.rs 
b/core/consensus/src/observability.rs
index 0ad1a02a7..5e0e45eff 100644
--- a/core/consensus/src/observability.rs
+++ b/core/consensus/src/observability.rs
@@ -580,6 +580,8 @@ pub const fn status_as_str(status: Status) -> &'static str {
 pub const fn operation_as_str(operation: Operation) -> &'static str {
     match operation {
         Operation::Reserved => "reserved",
+        Operation::CreateTopicWithAssignments => 
"create_topic_with_assignments",
+        Operation::CreatePartitionsWithAssignments => 
"create_partitions_with_assignments",
         Operation::CreateStream => "create_stream",
         Operation::UpdateStream => "update_stream",
         Operation::DeleteStream => "delete_stream",
diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml
index 2618b6153..ddedcb186 100644
--- a/core/metadata/Cargo.toml
+++ b/core/metadata/Cargo.toml
@@ -29,6 +29,7 @@ readme = "../../../README.md"
 
 [dependencies]
 ahash = { workspace = true }
+bytemuck = { workspace = true }
 bytes = { workspace = true }
 consensus = { workspace = true }
 iggy_binary_protocol = { workspace = true }
@@ -43,7 +44,6 @@ slab = { workspace = true }
 tracing = { workspace = true }
 
 [dev-dependencies]
-bytemuck = { workspace = true }
 bytes = { workspace = true }
 compio = { workspace = true }
 iobuf = { workspace = true }
diff --git a/core/metadata/src/impls/metadata.rs 
b/core/metadata/src/impls/metadata.rs
index 484135f19..e3d792e90 100644
--- a/core/metadata/src/impls/metadata.rs
+++ b/core/metadata/src/impls/metadata.rs
@@ -14,8 +14,12 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-use crate::stm::StateMachine;
+use crate::MuxStateMachine;
+use crate::stm::consumer_group::ConsumerGroups;
 use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, 
SnapshotError};
+use crate::stm::stream::Streams;
+use crate::stm::user::Users;
+use crate::stm::{ConsensusGroupAllocator, StateMachine};
 use consensus::{
     CommitLogEvent, Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, 
PlaneKind, Project,
     ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, 
ack_preflight,
@@ -24,12 +28,22 @@ use consensus::{
     pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain,
     send_prepare_ok as send_prepare_ok_common,
 };
+use iggy_binary_protocol::requests::partitions::CreatePartitionsRequest as 
WireCreatePartitionsRequest;
+use iggy_binary_protocol::requests::partitions::{
+    CreatePartitionsWithAssignmentsRequest as PersistedCreatePartitionsRequest,
+    CreatedPartitionAssignment,
+};
+use iggy_binary_protocol::requests::topics::CreateTopicRequest as 
WireCreateTopicRequest;
+use iggy_binary_protocol::requests::topics::CreateTopicWithAssignmentsRequest 
as PersistedCreateTopicRequest;
 use iggy_binary_protocol::{
-    Command2, ConsensusHeader, GenericHeader, Message, PrepareHeader, 
PrepareOkHeader,
-    RequestHeader,
+    Command2, ConsensusHeader, GenericHeader, Message, Operation, 
PrepareHeader, PrepareOkHeader,
+    RequestHeader, WireDecode, WireEncode,
 };
+use iggy_common::IggyError;
+use iggy_common::variadic;
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
+use std::mem::size_of;
 use std::path::Path;
 use tracing::{debug, error, warn};
 
@@ -37,6 +51,17 @@ const fn freeze_client_reply(message: 
Message<GenericHeader>) -> Message<Generic
     message
 }
 
+pub trait StreamsFrontend {
+    #[must_use]
+    fn streams(&self) -> &Streams;
+}
+
+impl StreamsFrontend for MuxStateMachine<variadic!(Users, Streams, 
ConsumerGroups)> {
+    fn streams(&self) -> &Streams {
+        &self.inner().1.0
+    }
+}
+
 #[derive(Debug, Clone)]
 #[allow(unused)]
 pub struct IggySnapshot {
@@ -245,13 +270,14 @@ pub struct IggyMetadata<C, J, S, M> {
     pub snapshot: Option<S>,
     /// State machine - lives on all shards
     pub mux_stm: M,
+    pub allocator: ConsensusGroupAllocator,
     /// Snapshot coordinator - present when persistent checkpointing is 
configured.
     pub coordinator: Option<SnapshotCoordinator<M>>,
 }
 
 impl<C, J, S, M> IggyMetadata<C, J, S, M>
 where
-    M: FillSnapshot<MetadataSnapshot>,
+    M: StreamsFrontend + FillSnapshot<MetadataSnapshot>,
 {
     /// Create a new `IggyMetadata` instance.
     ///
@@ -265,13 +291,14 @@ where
         mux_stm: M,
         data_dir: Option<std::path::PathBuf>,
     ) -> Self {
-        let coordinator = data_dir
-            .map(|dir| SnapshotCoordinator::new(dir, |stm, seq| 
IggySnapshot::create(stm, seq)));
+        let allocator = ConsensusGroupAllocator::new(0);
+        let coordinator = data_dir.map(|dir| SnapshotCoordinator::new(dir, 
IggySnapshot::create));
         Self {
             consensus,
             journal,
             snapshot,
             mux_stm,
+            allocator,
             coordinator,
         }
     }
@@ -283,7 +310,8 @@ where
     B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>,
     J: JournalHandle,
     J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
-    M: StateMachine<
+    M: StreamsFrontend
+        + StateMachine<
             Input = Message<PrepareHeader>,
             Output = bytes::Bytes,
             Error = iggy_common::IggyError,
@@ -301,7 +329,19 @@ where
                 operation: message.header().operation,
             },
         );
-        let prepare = message.project(consensus);
+        let prepare = match self.prepare_request(message) {
+            Ok(prepare) => prepare,
+            Err(error) => {
+                warn!(
+                    target: "iggy.metadata.diag",
+                    plane = "metadata",
+                    replica_id = consensus.replica(),
+                    error = %error,
+                    "failed to transform metadata request into prepare"
+                );
+                return;
+            }
+        };
         pipeline_prepare_common(consensus, PlaneKind::Metadata, prepare, 
|prepare| {
             self.on_replicate(prepare)
         })
@@ -419,6 +459,7 @@ where
         }
     }
 
+    #[allow(clippy::too_many_lines)]
     async fn on_ack(&self, message: <VsrConsensus<B> as 
Consensus>::Message<PrepareOkHeader>) {
         let consensus = self.consensus.as_ref().unwrap();
         let header = message.header();
@@ -493,7 +534,6 @@ where
                             prepare_header.op, prepare_header.checksum
                         )
                     });
-
                 let response = 
self.mux_stm.update(prepare).unwrap_or_else(|err| {
                     warn!(
                         target: "iggy.metadata.diag",
@@ -559,8 +599,95 @@ where
     P: Pipeline<Entry = PipelineEntry>,
     J: JournalHandle,
     J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = 
PrepareHeader>,
-    M: StateMachine<Input = Message<PrepareHeader>>,
+    M: StreamsFrontend
+        + StateMachine<
+            Input = Message<PrepareHeader>,
+            Output = bytes::Bytes,
+            Error = iggy_common::IggyError,
+        >,
 {
+    #[allow(clippy::too_many_lines)]
+    fn prepare_request(
+        &self,
+        message: Message<RequestHeader>,
+    ) -> Result<Message<PrepareHeader>, iggy_common::IggyError> {
+        let consensus = self.consensus.as_ref().unwrap();
+        let header = *message.header();
+        if !header.operation.is_client_allowed() {
+            return Err(IggyError::InvalidCommand);
+        }
+        let body = &message.as_slice()[size_of::<RequestHeader>()..header.size 
as usize];
+
+        match header.operation {
+            Operation::CreateTopic => {
+                let request = WireCreateTopicRequest::decode_from(body)
+                    .map_err(|_| IggyError::InvalidCommand)?;
+                let partitions = self
+                    .allocator
+                    .allocate_many(request.partitions_count as usize)
+                    .into_iter()
+                    .enumerate()
+                    .map(|(partition_id, consensus_group_id)| {
+                        Ok(CreatedPartitionAssignment {
+                            partition_id: u32::try_from(partition_id)
+                                .map_err(|_| IggyError::InvalidCommand)?,
+                            consensus_group_id,
+                        })
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+                let body = PersistedCreateTopicRequest {
+                    request,
+                    partitions,
+                }
+                .to_bytes();
+                Ok(build_prepare_message(
+                    consensus,
+                    &header,
+                    Operation::CreateTopicWithAssignments,
+                    &body,
+                ))
+            }
+            Operation::CreatePartitions => {
+                let request = WireCreatePartitionsRequest::decode_from(body)
+                    .map_err(|_| IggyError::InvalidCommand)?;
+                let start_partition_id = self
+                    .mux_stm
+                    .streams()
+                    .current_partition_count(&request.stream_id, 
&request.topic_id)
+                    .ok_or(IggyError::InvalidCommand)?;
+                let partitions = self
+                    .allocator
+                    .allocate_many(request.partitions_count as usize)
+                    .into_iter()
+                    .enumerate()
+                    .map(|(offset, consensus_group_id)| {
+                        let offset =
+                            u32::try_from(offset).map_err(|_| 
IggyError::InvalidCommand)?;
+                        let partition_id = start_partition_id
+                            .checked_add(offset)
+                            .ok_or(IggyError::InvalidCommand)?;
+                        Ok(CreatedPartitionAssignment {
+                            partition_id,
+                            consensus_group_id,
+                        })
+                    })
+                    .collect::<Result<Vec<_>, _>>()?;
+                let body = PersistedCreatePartitionsRequest {
+                    request,
+                    partitions,
+                }
+                .to_bytes();
+                Ok(build_prepare_message(
+                    consensus,
+                    &header,
+                    Operation::CreatePartitionsWithAssignments,
+                    &body,
+                ))
+            }
+            _ => Ok(message.project(consensus)),
+        }
+    }
+
     /// Replicate a prepare message to the next replica in the chain.
     ///
     /// Chain replication pattern:
@@ -603,3 +730,44 @@ where
         send_prepare_ok_common(consensus, header, Some(persisted)).await;
     }
 }
+
+fn build_prepare_message<B, P>(
+    consensus: &VsrConsensus<B, P>,
+    request: &RequestHeader,
+    operation: Operation,
+    body: &[u8],
+) -> Message<PrepareHeader>
+where
+    B: MessageBus,
+    P: Pipeline<Entry = PipelineEntry>,
+{
+    let op = consensus.sequencer().current_sequence() + 1;
+    let size = size_of::<PrepareHeader>() + body.len();
+    let mut prepare = Message::<PrepareHeader>::new(size);
+    let prepare_bytes = prepare.as_mut_slice();
+    prepare_bytes[size_of::<PrepareHeader>()..size].copy_from_slice(body);
+
+    let header_bytes = &mut prepare_bytes[..size_of::<PrepareHeader>()];
+    let new_header = 
bytemuck::checked::try_from_bytes_mut::<PrepareHeader>(header_bytes)
+        .expect("prepare header bytes should be valid");
+    *new_header = PrepareHeader {
+        cluster: consensus.cluster(),
+        size: u32::try_from(size).expect("prepare message size exceeds u32"),
+        view: consensus.view(),
+        release: request.release,
+        command: Command2::Prepare,
+        replica: consensus.replica(),
+        client: request.client,
+        parent: consensus.last_prepare_checksum(),
+        request_checksum: request.request_checksum,
+        request: request.request,
+        commit: consensus.commit(),
+        op,
+        timestamp: 0,
+        operation,
+        namespace: request.namespace,
+        ..Default::default()
+    };
+
+    prepare
+}
diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs
index e3d8611f4..cdb6f365b 100644
--- a/core/metadata/src/stm/mod.rs
+++ b/core/metadata/src/stm/mod.rs
@@ -24,6 +24,7 @@ pub mod user;
 use bytes::Bytes;
 use iggy_common::Either;
 use left_right::{Absorb, ReadHandle, WriteHandle};
+use std::cell::Cell;
 use std::cell::UnsafeCell;
 use std::sync::Arc;
 
@@ -159,6 +160,40 @@ pub trait StateMachine {
     fn update(&self, input: Self::Input) -> Result<Self::Output, Self::Error>;
 }
 
+#[derive(Debug)]
+pub struct ConsensusGroupAllocator {
+    highest: Cell<u64>,
+}
+
+impl ConsensusGroupAllocator {
+    #[must_use]
+    pub const fn new(initial_highest: u64) -> Self {
+        Self {
+            highest: Cell::new(initial_highest),
+        }
+    }
+
+    #[must_use]
+    pub const fn highest(&self) -> u64 {
+        self.highest.get()
+    }
+
+    #[must_use]
+    ///
+    /// # Panics
+    /// Panics if allocating `count` more group IDs would overflow `u64`.
+    pub fn allocate_many(&self, count: usize) -> Vec<u64> {
+        let mut allocated = Vec::with_capacity(count);
+        let mut current = self.highest.get();
+        for _ in 0..count {
+            current = current.checked_add(1).expect("consensus group id 
overflow");
+            allocated.push(current);
+        }
+        self.highest.set(current);
+        allocated
+    }
+}
+
 /// Generates the state's inner struct and wrapper type.
 ///
 /// # Generated items
diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs
index 89a545297..8ecea2079 100644
--- a/core/metadata/src/stm/mux.rs
+++ b/core/metadata/src/stm/mux.rs
@@ -39,6 +39,11 @@ where
     pub const fn new(inner: T) -> Self {
         Self { inner }
     }
+
+    #[must_use]
+    pub const fn inner(&self) -> &T {
+        &self.inner
+    }
 }
 
 impl<T> StateMachine for MuxStateMachine<T>
diff --git a/core/metadata/src/stm/snapshot.rs 
b/core/metadata/src/stm/snapshot.rs
index b3f2ffe7c..3e9681de0 100644
--- a/core/metadata/src/stm/snapshot.rs
+++ b/core/metadata/src/stm/snapshot.rs
@@ -296,8 +296,8 @@ macro_rules! impl_fill_restore {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::stm::stream::{StatsSnapshot, StreamSnapshot};
-    use iggy_common::IggyTimestamp;
+    use crate::stm::stream::{PartitionSnapshot, StatsSnapshot, StreamSnapshot, 
TopicSnapshot};
+    use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
 
     #[test]
     fn test_metadata_snapshot_roundtrip() {
@@ -329,7 +329,29 @@ mod tests {
                         messages_count: 50,
                         segments_count: 2,
                     },
-                    topics: vec![],
+                    topics: vec![(
+                        0,
+                        TopicSnapshot {
+                            id: 0,
+                            name: "topic".to_string(),
+                            created_at: ts,
+                            replication_factor: 1,
+                            message_expiry: IggyExpiry::default(),
+                            compression_algorithm: 
CompressionAlgorithm::default(),
+                            max_topic_size: MaxTopicSize::default(),
+                            stats: StatsSnapshot {
+                                size_bytes: 256,
+                                messages_count: 12,
+                                segments_count: 1,
+                            },
+                            partitions: vec![PartitionSnapshot {
+                                id: 0,
+                                consensus_group_id: 33,
+                                created_at: ts,
+                            }],
+                            round_robin_counter: 0,
+                        },
+                    )],
                 },
             )],
         });
@@ -351,7 +373,10 @@ mod tests {
         assert_eq!(stream.stats.size_bytes, 1024);
         assert_eq!(stream.stats.messages_count, 50);
         assert_eq!(stream.stats.segments_count, 2);
-        assert_eq!(stream.topics.len(), 0);
+        assert_eq!(stream.topics.len(), 1);
+        let (_, topic) = &stream.topics[0];
+        assert_eq!(topic.partitions.len(), 1);
+        assert_eq!(topic.partitions[0].consensus_group_id, 33);
     }
 
     #[test]
diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs
index dfb7e9e77..8839c0bd3 100644
--- a/core/metadata/src/stm/stream.rs
+++ b/core/metadata/src/stm/stream.rs
@@ -23,13 +23,13 @@ use ahash::AHashMap;
 use bytes::Bytes;
 use iggy_binary_protocol::WireIdentifier;
 use iggy_binary_protocol::requests::partitions::{
-    CreatePartitionsRequest, DeletePartitionsRequest,
+    CreatePartitionsWithAssignmentsRequest, DeletePartitionsRequest,
 };
 use iggy_binary_protocol::requests::streams::{
     CreateStreamRequest, DeleteStreamRequest, PurgeStreamRequest, 
UpdateStreamRequest,
 };
 use iggy_binary_protocol::requests::topics::{
-    CreateTopicRequest, DeleteTopicRequest, PurgeTopicRequest, 
UpdateTopicRequest,
+    CreateTopicWithAssignmentsRequest, DeleteTopicRequest, PurgeTopicRequest, 
UpdateTopicRequest,
 };
 use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, 
MaxTopicSize};
 use serde::{Deserialize, Serialize};
@@ -41,19 +41,25 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 #[derive(Debug, Clone, Serialize, Deserialize)]
 pub struct PartitionSnapshot {
     pub id: usize,
+    pub consensus_group_id: u64,
     pub created_at: IggyTimestamp,
 }
 
 #[derive(Debug, Clone)]
 pub struct Partition {
     pub id: usize,
+    pub consensus_group_id: u64,
     pub created_at: IggyTimestamp,
 }
 
 impl Partition {
     #[must_use]
-    pub const fn new(id: usize, created_at: IggyTimestamp) -> Self {
-        Self { id, created_at }
+    pub const fn new(id: usize, consensus_group_id: u64, created_at: 
IggyTimestamp) -> Self {
+        Self {
+            id,
+            consensus_group_id,
+            created_at,
+        }
     }
 }
 
@@ -223,11 +229,11 @@ collect_handlers! {
         UpdateStream,
         DeleteStream,
         PurgeStream,
-        CreateTopic,
+        CreateTopicWithAssignments,
         UpdateTopic,
         DeleteTopic,
         PurgeTopic,
-        CreatePartitions,
+        CreatePartitionsWithAssignments,
         DeletePartitions,
     }
 }
@@ -263,6 +269,31 @@ impl StreamsInner {
     }
 }
 
+impl Streams {
+    #[must_use]
+    pub fn current_partition_count(
+        &self,
+        stream_id: &WireIdentifier,
+        topic_id: &WireIdentifier,
+    ) -> Option<u32> {
+        self.inner.read(|inner| {
+            let stream_id = inner.resolve_stream_id(stream_id)?;
+            let topic_id = inner.resolve_topic_id(stream_id, topic_id)?;
+            let stream = inner.items.get(stream_id)?;
+            let topic = stream.topics.get(topic_id)?;
+            let next_partition_id = topic
+                .partitions
+                .iter()
+                .map(|partition| partition.id)
+                .max()
+                .and_then(|partition_id| partition_id.checked_add(1))
+                .and_then(|partition_id| u32::try_from(partition_id).ok())
+                .unwrap_or(0);
+            Some(next_partition_id)
+        })
+    }
+}
+
 // TODO(hubcio): Serialize proper reply (e.g. assigned stream ID) instead of 
empty Bytes.
 impl StateHandler for CreateStreamRequest {
     type State = StreamsInner;
@@ -340,25 +371,25 @@ impl StateHandler for PurgeStreamRequest {
 }
 
 // TODO(hubcio): Serialize proper reply (e.g. assigned topic ID) instead of 
empty Bytes.
-impl StateHandler for CreateTopicRequest {
+impl StateHandler for CreateTopicWithAssignmentsRequest {
     type State = StreamsInner;
     fn apply(&self, state: &mut StreamsInner) -> Bytes {
-        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+        let Some(stream_id) = state.resolve_stream_id(&self.request.stream_id) 
else {
             return Bytes::new();
         };
         let Some(stream) = state.items.get_mut(stream_id) else {
             return Bytes::new();
         };
 
-        let name_arc: Arc<str> = Arc::from(self.name.as_str());
+        let name_arc: Arc<str> = Arc::from(self.request.name.as_str());
         if stream.topic_index.contains_key(&name_arc) {
             return Bytes::new();
         }
 
-        let replication_factor = if self.replication_factor == 0 {
+        let replication_factor = if self.request.replication_factor == 0 {
             1
         } else {
-            self.replication_factor
+            self.request.replication_factor
         };
 
         let topic = Topic {
@@ -366,10 +397,12 @@ impl StateHandler for CreateTopicRequest {
             name: name_arc.clone(),
             created_at: IggyTimestamp::now(),
             replication_factor,
-            message_expiry: IggyExpiry::from(self.message_expiry),
-            compression_algorithm: 
CompressionAlgorithm::from_code(self.compression_algorithm)
-                .unwrap_or_default(),
-            max_topic_size: MaxTopicSize::from(self.max_topic_size),
+            message_expiry: IggyExpiry::from(self.request.message_expiry),
+            compression_algorithm: CompressionAlgorithm::from_code(
+                self.request.compression_algorithm,
+            )
+            .unwrap_or_default(),
+            max_topic_size: MaxTopicSize::from(self.request.max_topic_size),
             stats: Arc::new(TopicStats::new(stream.stats.clone())),
             partitions: Vec::new(),
             round_robin_counter: Arc::new(AtomicUsize::new(0)),
@@ -379,9 +412,10 @@ impl StateHandler for CreateTopicRequest {
         if let Some(topic) = stream.topics.get_mut(topic_id) {
             topic.id = topic_id;
 
-            for partition_id in 0..self.partitions_count as usize {
+            for partition in &self.partitions {
                 let partition = Partition {
-                    id: partition_id,
+                    id: partition.partition_id as usize,
+                    consensus_group_id: partition.consensus_group_id,
                     created_at: IggyTimestamp::now(),
                 };
                 topic.partitions.push(partition);
@@ -462,13 +496,13 @@ impl StateHandler for PurgeTopicRequest {
 }
 
 // TODO(hubcio): Serialize proper reply (e.g. assigned partition IDs) instead 
of empty Bytes.
-impl StateHandler for CreatePartitionsRequest {
+impl StateHandler for CreatePartitionsWithAssignmentsRequest {
     type State = StreamsInner;
     fn apply(&self, state: &mut StreamsInner) -> Bytes {
-        let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else {
+        let Some(stream_id) = state.resolve_stream_id(&self.request.stream_id) 
else {
             return Bytes::new();
         };
-        let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) 
else {
+        let Some(topic_id) = state.resolve_topic_id(stream_id, 
&self.request.topic_id) else {
             return Bytes::new();
         };
 
@@ -479,11 +513,10 @@ impl StateHandler for CreatePartitionsRequest {
             return Bytes::new();
         };
 
-        let current_partition_count = topic.partitions.len();
-        for i in 0..self.partitions_count as usize {
-            let partition_id = current_partition_count + i;
+        for partition in &self.partitions {
             let partition = Partition {
-                id: partition_id,
+                id: partition.partition_id as usize,
+                consensus_group_id: partition.consensus_group_id,
                 created_at: IggyTimestamp::now(),
             };
             topic.partitions.push(partition);
@@ -561,6 +594,7 @@ impl Snapshotable for Streams {
                                         .iter()
                                         .map(|p| PartitionSnapshot {
                                             id: p.id,
+                                            consensus_group_id: 
p.consensus_group_id,
                                             created_at: p.created_at,
                                         })
                                         .collect(),
@@ -630,6 +664,7 @@ impl Snapshotable for Streams {
                         .into_iter()
                         .map(|p| Partition {
                             id: p.id,
+                            consensus_group_id: p.consensus_group_id,
                             created_at: p.created_at,
                         })
                         .collect(),
@@ -666,3 +701,115 @@ impl Snapshotable for Streams {
 }
 
 impl_fill_restore!(Streams, streams);
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use iggy_binary_protocol::WireName;
+    use iggy_binary_protocol::requests::partitions::{
+        CreatePartitionsRequest as WireCreatePartitionsRequest,
+        CreatePartitionsWithAssignmentsRequest, CreatedPartitionAssignment,
+    };
+    use iggy_binary_protocol::requests::topics::{
+        CreateTopicRequest as WireCreateTopicRequest, 
CreateTopicWithAssignmentsRequest,
+    };
+
+    fn create_stream(inner: &mut StreamsInner, name: &str) {
+        let request = CreateStreamRequest {
+            name: WireName::new(name).unwrap(),
+        };
+        let _ = StateHandler::apply(&request, inner);
+    }
+
+    fn make_topic_request(
+        stream_id: u32,
+        partitions_count: u32,
+        name: &str,
+    ) -> WireCreateTopicRequest {
+        WireCreateTopicRequest {
+            stream_id: WireIdentifier::numeric(stream_id),
+            partitions_count,
+            compression_algorithm: 0,
+            message_expiry: 0,
+            max_topic_size: 0,
+            replication_factor: 1,
+            name: WireName::new(name).unwrap(),
+        }
+    }
+
+    #[test]
+    fn current_partition_count_scans_existing_topic_state() {
+        let mut inner = StreamsInner::new();
+        create_stream(&mut inner, "stream");
+        let create_topic = CreateTopicWithAssignmentsRequest {
+            request: make_topic_request(0, 2, "topic"),
+            partitions: vec![
+                CreatedPartitionAssignment {
+                    partition_id: 0,
+                    consensus_group_id: 1,
+                },
+                CreatedPartitionAssignment {
+                    partition_id: 1,
+                    consensus_group_id: 2,
+                },
+            ],
+        };
+        let _ = StateHandler::apply(&create_topic, &mut inner);
+        let streams: Streams = inner.into();
+
+        assert_eq!(
+            streams
+                .current_partition_count(&WireIdentifier::numeric(0), 
&WireIdentifier::numeric(0)),
+            Some(2)
+        );
+    }
+
+    #[test]
+    fn applying_enriched_create_commands_stores_consensus_group_ids() {
+        let mut inner = StreamsInner::new();
+        create_stream(&mut inner, "stream");
+        let create_topic = CreateTopicWithAssignmentsRequest {
+            request: make_topic_request(0, 2, "topic"),
+            partitions: vec![
+                CreatedPartitionAssignment {
+                    partition_id: 0,
+                    consensus_group_id: 10,
+                },
+                CreatedPartitionAssignment {
+                    partition_id: 1,
+                    consensus_group_id: 11,
+                },
+            ],
+        };
+        let _ = StateHandler::apply(&create_topic, &mut inner);
+
+        let create_partitions = CreatePartitionsWithAssignmentsRequest {
+            request: WireCreatePartitionsRequest {
+                stream_id: WireIdentifier::numeric(0),
+                topic_id: WireIdentifier::numeric(0),
+                partitions_count: 2,
+            },
+            partitions: vec![
+                CreatedPartitionAssignment {
+                    partition_id: 2,
+                    consensus_group_id: 12,
+                },
+                CreatedPartitionAssignment {
+                    partition_id: 3,
+                    consensus_group_id: 13,
+                },
+            ],
+        };
+        let _ = StateHandler::apply(&create_partitions, &mut inner);
+
+        assert_eq!(inner.items[0].topics[0].partitions.len(), 4);
+        assert_eq!(
+            inner.items[0].topics[0].partitions[0].consensus_group_id,
+            10
+        );
+        assert_eq!(
+            inner.items[0].topics[0].partitions[3].consensus_group_id,
+            13
+        );
+    }
+}
diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs
index acb8655a3..a1ffb21c2 100644
--- a/core/shard/src/lib.rs
+++ b/core/shard/src/lib.rs
@@ -27,6 +27,7 @@ use iggy_common::{PartitionStats, sharding::IggyNamespace};
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
 use metadata::IggyMetadata;
+use metadata::impls::metadata::StreamsFrontend;
 use metadata::stm::StateMachine;
 use partitions::{IggyPartition, IggyPartitions};
 use shards_table::ShardsTable;
@@ -236,7 +237,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         match MessageBag::try_from(message) {
             Ok(MessageBag::Request(request)) => self.on_request(request).await,
@@ -262,7 +263,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         self.plane.on_request(request).await;
     }
@@ -281,7 +282,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         self.plane.on_replicate(prepare).await;
     }
@@ -300,7 +301,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         self.plane.on_ack(prepare_ok).await;
     }
@@ -330,7 +331,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         debug_assert!(buf.is_empty(), "buf must be empty on entry");
 
diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs
index 8965a6a05..16dbbe8ff 100644
--- a/core/shard/src/router.rs
+++ b/core/shard/src/router.rs
@@ -22,6 +22,7 @@ use iggy_binary_protocol::{ConsensusError, GenericHeader, 
Message, MessageBag, P
 use iggy_common::sharding::IggyNamespace;
 use journal::{Journal, JournalHandle};
 use message_bus::MessageBus;
+use metadata::impls::metadata::StreamsFrontend;
 use metadata::stm::StateMachine;
 
 /// Inter-shard dispatch logic.
@@ -154,7 +155,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         loop {
             futures::select! {
@@ -188,7 +189,7 @@ where
                 Input = Message<PrepareHeader>,
                 Output = bytes::Bytes,
                 Error = iggy_common::IggyError,
-            >,
+            > + StreamsFrontend,
     {
         self.on_message(frame.message).await;
         // TODO: once on_message returns an R (e.g. ShardResponse), send it

Reply via email to