This is an automated email from the ASF dual-hosted git repository. gkoszyk pushed a commit to branch consensus_groups in repository https://gitbox.apache.org/repos/asf/iggy.git
commit f1338b6338140c524cad2a49df9a6e6d937068d6 Author: numinex <[email protected]> AuthorDate: Wed Apr 15 21:46:18 2026 +0200 consensus group --- core/binary_protocol/src/consensus/operation.rs | 50 +++++- core/binary_protocol/src/dispatch.rs | 4 +- .../create_partitions_with_assignments.rs | 124 +++++++++++++ .../binary_protocol/src/requests/partitions/mod.rs | 4 + .../requests/partitions/partition_assignment.rs | 75 ++++++++ .../topics/create_topic_with_assignments.rs | 131 ++++++++++++++ core/binary_protocol/src/requests/topics/mod.rs | 2 + core/consensus/src/observability.rs | 2 + core/metadata/Cargo.toml | 2 +- core/metadata/src/impls/metadata.rs | 188 ++++++++++++++++++-- core/metadata/src/stm/mod.rs | 35 ++++ core/metadata/src/stm/mux.rs | 5 + core/metadata/src/stm/snapshot.rs | 33 +++- core/metadata/src/stm/stream.rs | 195 ++++++++++++++++++--- core/shard/src/lib.rs | 11 +- core/shard/src/router.rs | 5 +- 16 files changed, 811 insertions(+), 55 deletions(-) diff --git a/core/binary_protocol/src/consensus/operation.rs b/core/binary_protocol/src/consensus/operation.rs index d275269ae..97696df64 100644 --- a/core/binary_protocol/src/consensus/operation.rs +++ b/core/binary_protocol/src/consensus/operation.rs @@ -25,6 +25,10 @@ pub enum Operation { #[default] Reserved = 0, + // Internal metadata operations (journal / replica-only) + CreateTopicWithAssignments = 64, + CreatePartitionsWithAssignments = 65, + // Metadata operations (shard 0) CreateStream = 128, UpdateStream = 129, @@ -57,10 +61,25 @@ pub enum Operation { } impl Operation { + pub const INTERNAL_START: u8 = Self::CreateTopicWithAssignments as u8; + pub const METADATA_START: u8 = Self::CreateStream as u8; + pub const PARTITION_START: u8 = Self::SendMessages as u8; + + /// Internal-only operations reserved for replica / journal use. + #[must_use] + #[inline] + pub const fn is_internal(&self) -> bool { + (*self as u8) >= Self::INTERNAL_START && (*self as u8) < Self::METADATA_START + } + /// Metadata / control-plane operations handled by shard 0. #[must_use] #[inline] pub const fn is_metadata(&self) -> bool { + if self.is_internal() { + return true; + } + matches!( self, Self::CreateStream @@ -89,13 +108,14 @@ impl Operation { #[must_use] #[inline] pub const fn is_partition(&self) -> bool { - matches!( - self, - Self::SendMessages - | Self::StoreConsumerOffset - | Self::DeleteConsumerOffset - | Self::DeleteSegments - ) + matches!(self, Self::DeleteSegments) || (*self as u8) >= Self::PARTITION_START + } + + /// Operations clients are allowed to send directly. + #[must_use] + #[inline] + pub const fn is_client_allowed(&self) -> bool { + !matches!(self, Self::Reserved) && !self.is_internal() } /// Bidirectional mapping: `Operation` -> client command code. @@ -104,7 +124,9 @@ impl Operation { #[must_use] pub const fn to_command_code(&self) -> Option<u32> { match self { - Self::Reserved => None, + Self::Reserved + | Self::CreateTopicWithAssignments + | Self::CreatePartitionsWithAssignments => None, Self::CreateStream | Self::UpdateStream | Self::DeleteStream @@ -190,6 +212,14 @@ mod tests { #[test] fn reserved_has_no_code() { assert_eq!(Operation::Reserved.to_command_code(), None); + assert_eq!( + Operation::CreateTopicWithAssignments.to_command_code(), + None + ); + assert_eq!( + Operation::CreatePartitionsWithAssignments.to_command_code(), + None + ); } #[test] @@ -203,8 +233,12 @@ mod tests { #[test] fn metadata_vs_partition() { + assert!(Operation::CreateTopicWithAssignments.is_internal()); + assert!(Operation::CreateTopicWithAssignments.is_metadata()); + assert!(!Operation::CreateTopicWithAssignments.is_client_allowed()); assert!(Operation::CreateStream.is_metadata()); assert!(!Operation::CreateStream.is_partition()); + assert!(Operation::CreateStream.is_client_allowed()); assert!(Operation::SendMessages.is_partition()); assert!(!Operation::SendMessages.is_metadata()); assert!(Operation::DeleteSegments.is_partition()); diff --git a/core/binary_protocol/src/dispatch.rs b/core/binary_protocol/src/dispatch.rs index 1aa9eb828..8aa4c6432 100644 --- a/core/binary_protocol/src/dispatch.rs +++ b/core/binary_protocol/src/dispatch.rs @@ -265,7 +265,9 @@ pub const fn lookup_by_operation(op: Operation) -> Option<&'static CommandMeta> Operation::SendMessages => 21, Operation::StoreConsumerOffset => 24, Operation::DeleteConsumerOffset => 25, - Operation::Reserved => return None, + Operation::CreateTopicWithAssignments + | Operation::CreatePartitionsWithAssignments + | Operation::Reserved => return None, }; Some(&COMMAND_TABLE[idx]) } diff --git a/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs b/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs new file mode 100644 index 000000000..7ea85b5a9 --- /dev/null +++ b/core/binary_protocol/src/requests/partitions/create_partitions_with_assignments.rs @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode, read_u32_le}; +use crate::requests::partitions::{CreatePartitionsRequest, CreatedPartitionAssignment}; +use bytes::{BufMut, BytesMut}; + +fn usize_to_u32(value: usize, context: &str) -> u32 { + u32::try_from(value).unwrap_or_else(|_| panic!("{context} exceeds u32")) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CreatePartitionsWithAssignmentsRequest { + pub request: CreatePartitionsRequest, + pub partitions: Vec<CreatedPartitionAssignment>, +} + +impl WireEncode for CreatePartitionsWithAssignmentsRequest { + fn encoded_size(&self) -> usize { + 4 + self.request.encoded_size() + + 4 + + self + .partitions + .iter() + .map(WireEncode::encoded_size) + .sum::<usize>() + } + + fn encode(&self, buf: &mut BytesMut) { + buf.put_u32_le(usize_to_u32( + self.request.encoded_size(), + "create partitions request size", + )); + self.request.encode(buf); + buf.put_u32_le(usize_to_u32( + self.partitions.len(), + "create partitions partition count", + )); + for partition in &self.partitions { + partition.encode(buf); + } + } +} + +impl WireDecode for CreatePartitionsWithAssignmentsRequest { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let request_size = read_u32_le(buf, 0)? as usize; + let request_start = 4; + let request_end = request_start + request_size; + if buf.len() < request_end { + return Err(WireError::UnexpectedEof { + offset: request_start, + need: request_size, + have: buf.len().saturating_sub(request_start), + }); + } + + let request = CreatePartitionsRequest::decode_from(&buf[request_start..request_end])?; + let partitions_count = read_u32_le(buf, request_end)? as usize; + let mut offset = request_end + 4; + let mut partitions = Vec::with_capacity(partitions_count); + for _ in 0..partitions_count { + let (partition, consumed) = CreatedPartitionAssignment::decode(&buf[offset..])?; + offset += consumed; + partitions.push(partition); + } + + Ok(( + Self { + request, + partitions, + }, + offset, + )) + } +} + +#[cfg(test)] +mod tests { + use super::CreatePartitionsWithAssignmentsRequest; + use crate::WireIdentifier; + use crate::codec::{WireDecode, WireEncode}; + use crate::requests::partitions::{CreatePartitionsRequest, CreatedPartitionAssignment}; + + #[test] + fn roundtrip() { + let request = CreatePartitionsWithAssignmentsRequest { + request: CreatePartitionsRequest { + stream_id: WireIdentifier::numeric(1), + topic_id: WireIdentifier::numeric(2), + partitions_count: 2, + }, + partitions: vec![ + CreatedPartitionAssignment { + partition_id: 3, + consensus_group_id: 11, + }, + CreatedPartitionAssignment { + partition_id: 4, + consensus_group_id: 12, + }, + ], + }; + let bytes = request.to_bytes(); + let (decoded, consumed) = CreatePartitionsWithAssignmentsRequest::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, request); + } +} diff --git a/core/binary_protocol/src/requests/partitions/mod.rs b/core/binary_protocol/src/requests/partitions/mod.rs index 8a5d0a1a1..9b88355f5 100644 --- a/core/binary_protocol/src/requests/partitions/mod.rs +++ b/core/binary_protocol/src/requests/partitions/mod.rs @@ -16,7 +16,11 @@ // under the License. pub mod create_partitions; +pub mod create_partitions_with_assignments; pub mod delete_partitions; +pub mod partition_assignment; pub use create_partitions::CreatePartitionsRequest; +pub use create_partitions_with_assignments::CreatePartitionsWithAssignmentsRequest; pub use delete_partitions::DeletePartitionsRequest; +pub use partition_assignment::CreatedPartitionAssignment; diff --git a/core/binary_protocol/src/requests/partitions/partition_assignment.rs b/core/binary_protocol/src/requests/partitions/partition_assignment.rs new file mode 100644 index 000000000..2c36732bd --- /dev/null +++ b/core/binary_protocol/src/requests/partitions/partition_assignment.rs @@ -0,0 +1,75 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode, read_u32_le, read_u64_le}; +use bytes::{BufMut, BytesMut}; + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CreatedPartitionAssignment { + pub partition_id: u32, + pub consensus_group_id: u64, +} + +impl WireEncode for CreatedPartitionAssignment { + fn encoded_size(&self) -> usize { + 12 + } + + fn encode(&self, buf: &mut BytesMut) { + buf.put_u32_le(self.partition_id); + buf.put_u64_le(self.consensus_group_id); + } +} + +impl WireDecode for CreatedPartitionAssignment { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + if buf.len() < 12 { + return Err(WireError::UnexpectedEof { + offset: 0, + need: 12, + have: buf.len(), + }); + } + + Ok(( + Self { + partition_id: read_u32_le(buf, 0)?, + consensus_group_id: read_u64_le(buf, 4)?, + }, + 12, + )) + } +} + +#[cfg(test)] +mod tests { + use super::CreatedPartitionAssignment; + use crate::codec::{WireDecode, WireEncode}; + + #[test] + fn roundtrip() { + let request = CreatedPartitionAssignment { + partition_id: 7, + consensus_group_id: 42, + }; + let bytes = request.to_bytes(); + let (decoded, consumed) = CreatedPartitionAssignment::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, request); + } +} diff --git a/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs b/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs new file mode 100644 index 000000000..d9d29c6eb --- /dev/null +++ b/core/binary_protocol/src/requests/topics/create_topic_with_assignments.rs @@ -0,0 +1,131 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::WireError; +use crate::codec::{WireDecode, WireEncode, read_u32_le}; +use crate::requests::partitions::CreatedPartitionAssignment; +use crate::requests::topics::CreateTopicRequest; +use bytes::{BufMut, BytesMut}; + +fn usize_to_u32(value: usize, context: &str) -> u32 { + u32::try_from(value).unwrap_or_else(|_| panic!("{context} exceeds u32")) +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CreateTopicWithAssignmentsRequest { + pub request: CreateTopicRequest, + pub partitions: Vec<CreatedPartitionAssignment>, +} + +impl WireEncode for CreateTopicWithAssignmentsRequest { + fn encoded_size(&self) -> usize { + 4 + self.request.encoded_size() + + 4 + + self + .partitions + .iter() + .map(WireEncode::encoded_size) + .sum::<usize>() + } + + fn encode(&self, buf: &mut BytesMut) { + buf.put_u32_le(usize_to_u32( + self.request.encoded_size(), + "create topic request size", + )); + self.request.encode(buf); + buf.put_u32_le(usize_to_u32( + self.partitions.len(), + "create topic partition count", + )); + for partition in &self.partitions { + partition.encode(buf); + } + } +} + +impl WireDecode for CreateTopicWithAssignmentsRequest { + fn decode(buf: &[u8]) -> Result<(Self, usize), WireError> { + let request_size = read_u32_le(buf, 0)? as usize; + let request_start = 4; + let request_end = request_start + request_size; + if buf.len() < request_end { + return Err(WireError::UnexpectedEof { + offset: request_start, + need: request_size, + have: buf.len().saturating_sub(request_start), + }); + } + + let request = CreateTopicRequest::decode_from(&buf[request_start..request_end])?; + let partitions_count = read_u32_le(buf, request_end)? as usize; + let mut offset = request_end + 4; + let mut partitions = Vec::with_capacity(partitions_count); + for _ in 0..partitions_count { + let (partition, consumed) = CreatedPartitionAssignment::decode(&buf[offset..])?; + offset += consumed; + partitions.push(partition); + } + + Ok(( + Self { + request, + partitions, + }, + offset, + )) + } +} + +#[cfg(test)] +mod tests { + use super::CreateTopicWithAssignmentsRequest; + use crate::WireIdentifier; + use crate::codec::{WireDecode, WireEncode}; + use crate::primitives::identifier::WireName; + use crate::requests::partitions::CreatedPartitionAssignment; + use crate::requests::topics::CreateTopicRequest; + + #[test] + fn roundtrip() { + let request = CreateTopicWithAssignmentsRequest { + request: CreateTopicRequest { + stream_id: WireIdentifier::numeric(1), + partitions_count: 2, + compression_algorithm: 1, + message_expiry: 3600, + max_topic_size: 1024, + replication_factor: 1, + name: WireName::new("events").unwrap(), + }, + partitions: vec![ + CreatedPartitionAssignment { + partition_id: 0, + consensus_group_id: 1, + }, + CreatedPartitionAssignment { + partition_id: 1, + consensus_group_id: 2, + }, + ], + }; + let bytes = request.to_bytes(); + let (decoded, consumed) = CreateTopicWithAssignmentsRequest::decode(&bytes).unwrap(); + assert_eq!(consumed, bytes.len()); + assert_eq!(decoded, request); + } +} diff --git a/core/binary_protocol/src/requests/topics/mod.rs b/core/binary_protocol/src/requests/topics/mod.rs index 680105ae3..a3e00a1d4 100644 --- a/core/binary_protocol/src/requests/topics/mod.rs +++ b/core/binary_protocol/src/requests/topics/mod.rs @@ -16,6 +16,7 @@ // under the License. pub mod create_topic; +pub mod create_topic_with_assignments; pub mod delete_topic; pub mod get_topic; pub mod get_topics; @@ -23,6 +24,7 @@ pub mod purge_topic; pub mod update_topic; pub use create_topic::CreateTopicRequest; +pub use create_topic_with_assignments::CreateTopicWithAssignmentsRequest; pub use delete_topic::DeleteTopicRequest; pub use get_topic::GetTopicRequest; pub use get_topics::GetTopicsRequest; diff --git a/core/consensus/src/observability.rs b/core/consensus/src/observability.rs index 0ad1a02a7..5e0e45eff 100644 --- a/core/consensus/src/observability.rs +++ b/core/consensus/src/observability.rs @@ -580,6 +580,8 @@ pub const fn status_as_str(status: Status) -> &'static str { pub const fn operation_as_str(operation: Operation) -> &'static str { match operation { Operation::Reserved => "reserved", + Operation::CreateTopicWithAssignments => "create_topic_with_assignments", + Operation::CreatePartitionsWithAssignments => "create_partitions_with_assignments", Operation::CreateStream => "create_stream", Operation::UpdateStream => "update_stream", Operation::DeleteStream => "delete_stream", diff --git a/core/metadata/Cargo.toml b/core/metadata/Cargo.toml index 2618b6153..ddedcb186 100644 --- a/core/metadata/Cargo.toml +++ b/core/metadata/Cargo.toml @@ -29,6 +29,7 @@ readme = "../../../README.md" [dependencies] ahash = { workspace = true } +bytemuck = { workspace = true } bytes = { workspace = true } consensus = { workspace = true } iggy_binary_protocol = { workspace = true } @@ -43,7 +44,6 @@ slab = { workspace = true } tracing = { workspace = true } [dev-dependencies] -bytemuck = { workspace = true } bytes = { workspace = true } compio = { workspace = true } iobuf = { workspace = true } diff --git a/core/metadata/src/impls/metadata.rs b/core/metadata/src/impls/metadata.rs index 484135f19..e3d792e90 100644 --- a/core/metadata/src/impls/metadata.rs +++ b/core/metadata/src/impls/metadata.rs @@ -14,8 +14,12 @@ // KIND, either express or implied. See the License for the // specific language governing permissions and limitations // under the License. -use crate::stm::StateMachine; +use crate::MuxStateMachine; +use crate::stm::consumer_group::ConsumerGroups; use crate::stm::snapshot::{FillSnapshot, MetadataSnapshot, Snapshot, SnapshotError}; +use crate::stm::stream::Streams; +use crate::stm::user::Users; +use crate::stm::{ConsensusGroupAllocator, StateMachine}; use consensus::{ CommitLogEvent, Consensus, Pipeline, PipelineEntry, Plane, PlaneIdentity, PlaneKind, Project, ReplicaLogContext, RequestLogEvent, Sequencer, SimEventKind, VsrConsensus, ack_preflight, @@ -24,12 +28,22 @@ use consensus::{ pipeline_prepare_common, replicate_preflight, replicate_to_next_in_chain, send_prepare_ok as send_prepare_ok_common, }; +use iggy_binary_protocol::requests::partitions::CreatePartitionsRequest as WireCreatePartitionsRequest; +use iggy_binary_protocol::requests::partitions::{ + CreatePartitionsWithAssignmentsRequest as PersistedCreatePartitionsRequest, + CreatedPartitionAssignment, +}; +use iggy_binary_protocol::requests::topics::CreateTopicRequest as WireCreateTopicRequest; +use iggy_binary_protocol::requests::topics::CreateTopicWithAssignmentsRequest as PersistedCreateTopicRequest; use iggy_binary_protocol::{ - Command2, ConsensusHeader, GenericHeader, Message, PrepareHeader, PrepareOkHeader, - RequestHeader, + Command2, ConsensusHeader, GenericHeader, Message, Operation, PrepareHeader, PrepareOkHeader, + RequestHeader, WireDecode, WireEncode, }; +use iggy_common::IggyError; +use iggy_common::variadic; use journal::{Journal, JournalHandle}; use message_bus::MessageBus; +use std::mem::size_of; use std::path::Path; use tracing::{debug, error, warn}; @@ -37,6 +51,17 @@ const fn freeze_client_reply(message: Message<GenericHeader>) -> Message<Generic message } +pub trait StreamsFrontend { + #[must_use] + fn streams(&self) -> &Streams; +} + +impl StreamsFrontend for MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)> { + fn streams(&self) -> &Streams { + &self.inner().1.0 + } +} + #[derive(Debug, Clone)] #[allow(unused)] pub struct IggySnapshot { @@ -245,13 +270,14 @@ pub struct IggyMetadata<C, J, S, M> { pub snapshot: Option<S>, /// State machine - lives on all shards pub mux_stm: M, + pub allocator: ConsensusGroupAllocator, /// Snapshot coordinator - present when persistent checkpointing is configured. pub coordinator: Option<SnapshotCoordinator<M>>, } impl<C, J, S, M> IggyMetadata<C, J, S, M> where - M: FillSnapshot<MetadataSnapshot>, + M: StreamsFrontend + FillSnapshot<MetadataSnapshot>, { /// Create a new `IggyMetadata` instance. /// @@ -265,13 +291,14 @@ where mux_stm: M, data_dir: Option<std::path::PathBuf>, ) -> Self { - let coordinator = data_dir - .map(|dir| SnapshotCoordinator::new(dir, |stm, seq| IggySnapshot::create(stm, seq))); + let allocator = ConsensusGroupAllocator::new(0); + let coordinator = data_dir.map(|dir| SnapshotCoordinator::new(dir, IggySnapshot::create)); Self { consensus, journal, snapshot, mux_stm, + allocator, coordinator, } } @@ -283,7 +310,8 @@ where B: MessageBus<Replica = u8, Data = Message<GenericHeader>, Client = u128>, J: JournalHandle, J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = PrepareHeader>, - M: StateMachine< + M: StreamsFrontend + + StateMachine< Input = Message<PrepareHeader>, Output = bytes::Bytes, Error = iggy_common::IggyError, @@ -301,7 +329,19 @@ where operation: message.header().operation, }, ); - let prepare = message.project(consensus); + let prepare = match self.prepare_request(message) { + Ok(prepare) => prepare, + Err(error) => { + warn!( + target: "iggy.metadata.diag", + plane = "metadata", + replica_id = consensus.replica(), + error = %error, + "failed to transform metadata request into prepare" + ); + return; + } + }; pipeline_prepare_common(consensus, PlaneKind::Metadata, prepare, |prepare| { self.on_replicate(prepare) }) @@ -419,6 +459,7 @@ where } } + #[allow(clippy::too_many_lines)] async fn on_ack(&self, message: <VsrConsensus<B> as Consensus>::Message<PrepareOkHeader>) { let consensus = self.consensus.as_ref().unwrap(); let header = message.header(); @@ -493,7 +534,6 @@ where prepare_header.op, prepare_header.checksum ) }); - let response = self.mux_stm.update(prepare).unwrap_or_else(|err| { warn!( target: "iggy.metadata.diag", @@ -559,8 +599,95 @@ where P: Pipeline<Entry = PipelineEntry>, J: JournalHandle, J::Target: Journal<J::Storage, Entry = Message<PrepareHeader>, Header = PrepareHeader>, - M: StateMachine<Input = Message<PrepareHeader>>, + M: StreamsFrontend + + StateMachine< + Input = Message<PrepareHeader>, + Output = bytes::Bytes, + Error = iggy_common::IggyError, + >, { + #[allow(clippy::too_many_lines)] + fn prepare_request( + &self, + message: Message<RequestHeader>, + ) -> Result<Message<PrepareHeader>, iggy_common::IggyError> { + let consensus = self.consensus.as_ref().unwrap(); + let header = *message.header(); + if !header.operation.is_client_allowed() { + return Err(IggyError::InvalidCommand); + } + let body = &message.as_slice()[size_of::<RequestHeader>()..header.size as usize]; + + match header.operation { + Operation::CreateTopic => { + let request = WireCreateTopicRequest::decode_from(body) + .map_err(|_| IggyError::InvalidCommand)?; + let partitions = self + .allocator + .allocate_many(request.partitions_count as usize) + .into_iter() + .enumerate() + .map(|(partition_id, consensus_group_id)| { + Ok(CreatedPartitionAssignment { + partition_id: u32::try_from(partition_id) + .map_err(|_| IggyError::InvalidCommand)?, + consensus_group_id, + }) + }) + .collect::<Result<Vec<_>, _>>()?; + let body = PersistedCreateTopicRequest { + request, + partitions, + } + .to_bytes(); + Ok(build_prepare_message( + consensus, + &header, + Operation::CreateTopicWithAssignments, + &body, + )) + } + Operation::CreatePartitions => { + let request = WireCreatePartitionsRequest::decode_from(body) + .map_err(|_| IggyError::InvalidCommand)?; + let start_partition_id = self + .mux_stm + .streams() + .current_partition_count(&request.stream_id, &request.topic_id) + .ok_or(IggyError::InvalidCommand)?; + let partitions = self + .allocator + .allocate_many(request.partitions_count as usize) + .into_iter() + .enumerate() + .map(|(offset, consensus_group_id)| { + let offset = + u32::try_from(offset).map_err(|_| IggyError::InvalidCommand)?; + let partition_id = start_partition_id + .checked_add(offset) + .ok_or(IggyError::InvalidCommand)?; + Ok(CreatedPartitionAssignment { + partition_id, + consensus_group_id, + }) + }) + .collect::<Result<Vec<_>, _>>()?; + let body = PersistedCreatePartitionsRequest { + request, + partitions, + } + .to_bytes(); + Ok(build_prepare_message( + consensus, + &header, + Operation::CreatePartitionsWithAssignments, + &body, + )) + } + _ => Ok(message.project(consensus)), + } + } + /// Replicate a prepare message to the next replica in the chain. /// /// Chain replication pattern: @@ -603,3 +730,44 @@ where send_prepare_ok_common(consensus, header, Some(persisted)).await; } } + +fn build_prepare_message<B, P>( + consensus: &VsrConsensus<B, P>, + request: &RequestHeader, + operation: Operation, + body: &[u8], +) -> Message<PrepareHeader> +where + B: MessageBus, + P: Pipeline<Entry = PipelineEntry>, +{ + let op = consensus.sequencer().current_sequence() + 1; + let size = size_of::<PrepareHeader>() + body.len(); + let mut prepare = Message::<PrepareHeader>::new(size); + let prepare_bytes = prepare.as_mut_slice(); + prepare_bytes[size_of::<PrepareHeader>()..size].copy_from_slice(body); + + let header_bytes = &mut prepare_bytes[..size_of::<PrepareHeader>()]; + let new_header = bytemuck::checked::try_from_bytes_mut::<PrepareHeader>(header_bytes) + .expect("prepare header bytes should be valid"); + *new_header = PrepareHeader { + cluster: consensus.cluster(), + size: u32::try_from(size).expect("prepare message size exceeds u32"), + view: consensus.view(), + release: request.release, + command: Command2::Prepare, + replica: consensus.replica(), + client: request.client, + parent: consensus.last_prepare_checksum(), + request_checksum: request.request_checksum, + request: request.request, + commit: consensus.commit(), + op, + timestamp: 0, + operation, + namespace: request.namespace, + ..Default::default() + }; + + prepare +} diff --git a/core/metadata/src/stm/mod.rs b/core/metadata/src/stm/mod.rs index e3d8611f4..cdb6f365b 100644 --- a/core/metadata/src/stm/mod.rs +++ b/core/metadata/src/stm/mod.rs @@ -24,6 +24,7 @@ pub mod user; use bytes::Bytes; use iggy_common::Either; use left_right::{Absorb, ReadHandle, WriteHandle}; +use std::cell::Cell; use std::cell::UnsafeCell; use std::sync::Arc; @@ -159,6 +160,40 @@ pub trait StateMachine { fn update(&self, input: Self::Input) -> Result<Self::Output, Self::Error>; } +#[derive(Debug)] +pub struct ConsensusGroupAllocator { + highest: Cell<u64>, +} + +impl ConsensusGroupAllocator { + #[must_use] + pub const fn new(initial_highest: u64) -> Self { + Self { + highest: Cell::new(initial_highest), + } + } + + #[must_use] + pub const fn highest(&self) -> u64 { + self.highest.get() + } + + #[must_use] + /// + /// # Panics + /// Panics if allocating `count` more group IDs would overflow `u64`. + pub fn allocate_many(&self, count: usize) -> Vec<u64> { + let mut allocated = Vec::with_capacity(count); + let mut current = self.highest.get(); + for _ in 0..count { + current = current.checked_add(1).expect("consensus group id overflow"); + allocated.push(current); + } + self.highest.set(current); + allocated + } +} + /// Generates the state's inner struct and wrapper type. /// /// # Generated items diff --git a/core/metadata/src/stm/mux.rs b/core/metadata/src/stm/mux.rs index 89a545297..8ecea2079 100644 --- a/core/metadata/src/stm/mux.rs +++ b/core/metadata/src/stm/mux.rs @@ -39,6 +39,11 @@ where pub const fn new(inner: T) -> Self { Self { inner } } + + #[must_use] + pub const fn inner(&self) -> &T { + &self.inner + } } impl<T> StateMachine for MuxStateMachine<T> diff --git a/core/metadata/src/stm/snapshot.rs b/core/metadata/src/stm/snapshot.rs index b3f2ffe7c..3e9681de0 100644 --- a/core/metadata/src/stm/snapshot.rs +++ b/core/metadata/src/stm/snapshot.rs @@ -296,8 +296,8 @@ macro_rules! impl_fill_restore { #[cfg(test)] mod tests { use super::*; - use crate::stm::stream::{StatsSnapshot, StreamSnapshot}; - use iggy_common::IggyTimestamp; + use crate::stm::stream::{PartitionSnapshot, StatsSnapshot, StreamSnapshot, TopicSnapshot}; + use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize}; #[test] fn test_metadata_snapshot_roundtrip() { @@ -329,7 +329,29 @@ mod tests { messages_count: 50, segments_count: 2, }, - topics: vec![], + topics: vec![( + 0, + TopicSnapshot { + id: 0, + name: "topic".to_string(), + created_at: ts, + replication_factor: 1, + message_expiry: IggyExpiry::default(), + compression_algorithm: CompressionAlgorithm::default(), + max_topic_size: MaxTopicSize::default(), + stats: StatsSnapshot { + size_bytes: 256, + messages_count: 12, + segments_count: 1, + }, + partitions: vec![PartitionSnapshot { + id: 0, + consensus_group_id: 33, + created_at: ts, + }], + round_robin_counter: 0, + }, + )], }, )], }); @@ -351,7 +373,10 @@ mod tests { assert_eq!(stream.stats.size_bytes, 1024); assert_eq!(stream.stats.messages_count, 50); assert_eq!(stream.stats.segments_count, 2); - assert_eq!(stream.topics.len(), 0); + assert_eq!(stream.topics.len(), 1); + let (_, topic) = &stream.topics[0]; + assert_eq!(topic.partitions.len(), 1); + assert_eq!(topic.partitions[0].consensus_group_id, 33); } #[test] diff --git a/core/metadata/src/stm/stream.rs b/core/metadata/src/stm/stream.rs index dfb7e9e77..8839c0bd3 100644 --- a/core/metadata/src/stm/stream.rs +++ b/core/metadata/src/stm/stream.rs @@ -23,13 +23,13 @@ use ahash::AHashMap; use bytes::Bytes; use iggy_binary_protocol::WireIdentifier; use iggy_binary_protocol::requests::partitions::{ - CreatePartitionsRequest, DeletePartitionsRequest, + CreatePartitionsWithAssignmentsRequest, DeletePartitionsRequest, }; use iggy_binary_protocol::requests::streams::{ CreateStreamRequest, DeleteStreamRequest, PurgeStreamRequest, UpdateStreamRequest, }; use iggy_binary_protocol::requests::topics::{ - CreateTopicRequest, DeleteTopicRequest, PurgeTopicRequest, UpdateTopicRequest, + CreateTopicWithAssignmentsRequest, DeleteTopicRequest, PurgeTopicRequest, UpdateTopicRequest, }; use iggy_common::{CompressionAlgorithm, IggyExpiry, IggyTimestamp, MaxTopicSize}; use serde::{Deserialize, Serialize}; @@ -41,19 +41,25 @@ use std::sync::atomic::{AtomicUsize, Ordering}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct PartitionSnapshot { pub id: usize, + pub consensus_group_id: u64, pub created_at: IggyTimestamp, } #[derive(Debug, Clone)] pub struct Partition { pub id: usize, + pub consensus_group_id: u64, pub created_at: IggyTimestamp, } impl Partition { #[must_use] - pub const fn new(id: usize, created_at: IggyTimestamp) -> Self { - Self { id, created_at } + pub const fn new(id: usize, consensus_group_id: u64, created_at: IggyTimestamp) -> Self { + Self { + id, + consensus_group_id, + created_at, + } } } @@ -223,11 +229,11 @@ collect_handlers! { UpdateStream, DeleteStream, PurgeStream, - CreateTopic, + CreateTopicWithAssignments, UpdateTopic, DeleteTopic, PurgeTopic, - CreatePartitions, + CreatePartitionsWithAssignments, DeletePartitions, } } @@ -263,6 +269,31 @@ impl StreamsInner { } } +impl Streams { + #[must_use] + pub fn current_partition_count( + &self, + stream_id: &WireIdentifier, + topic_id: &WireIdentifier, + ) -> Option<u32> { + self.inner.read(|inner| { + let stream_id = inner.resolve_stream_id(stream_id)?; + let topic_id = inner.resolve_topic_id(stream_id, topic_id)?; + let stream = inner.items.get(stream_id)?; + let topic = stream.topics.get(topic_id)?; + let next_partition_id = topic + .partitions + .iter() + .map(|partition| partition.id) + .max() + .and_then(|partition_id| partition_id.checked_add(1)) + .and_then(|partition_id| u32::try_from(partition_id).ok()) + .unwrap_or(0); + Some(next_partition_id) + }) + } +} + // TODO(hubcio): Serialize proper reply (e.g. assigned stream ID) instead of empty Bytes. impl StateHandler for CreateStreamRequest { type State = StreamsInner; @@ -340,25 +371,25 @@ impl StateHandler for PurgeStreamRequest { } // TODO(hubcio): Serialize proper reply (e.g. assigned topic ID) instead of empty Bytes. -impl StateHandler for CreateTopicRequest { +impl StateHandler for CreateTopicWithAssignmentsRequest { type State = StreamsInner; fn apply(&self, state: &mut StreamsInner) -> Bytes { - let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + let Some(stream_id) = state.resolve_stream_id(&self.request.stream_id) else { return Bytes::new(); }; let Some(stream) = state.items.get_mut(stream_id) else { return Bytes::new(); }; - let name_arc: Arc<str> = Arc::from(self.name.as_str()); + let name_arc: Arc<str> = Arc::from(self.request.name.as_str()); if stream.topic_index.contains_key(&name_arc) { return Bytes::new(); } - let replication_factor = if self.replication_factor == 0 { + let replication_factor = if self.request.replication_factor == 0 { 1 } else { - self.replication_factor + self.request.replication_factor }; let topic = Topic { @@ -366,10 +397,12 @@ impl StateHandler for CreateTopicRequest { name: name_arc.clone(), created_at: IggyTimestamp::now(), replication_factor, - message_expiry: IggyExpiry::from(self.message_expiry), - compression_algorithm: CompressionAlgorithm::from_code(self.compression_algorithm) - .unwrap_or_default(), - max_topic_size: MaxTopicSize::from(self.max_topic_size), + message_expiry: IggyExpiry::from(self.request.message_expiry), + compression_algorithm: CompressionAlgorithm::from_code( + self.request.compression_algorithm, + ) + .unwrap_or_default(), + max_topic_size: MaxTopicSize::from(self.request.max_topic_size), stats: Arc::new(TopicStats::new(stream.stats.clone())), partitions: Vec::new(), round_robin_counter: Arc::new(AtomicUsize::new(0)), @@ -379,9 +412,10 @@ impl StateHandler for CreateTopicRequest { if let Some(topic) = stream.topics.get_mut(topic_id) { topic.id = topic_id; - for partition_id in 0..self.partitions_count as usize { + for partition in &self.partitions { let partition = Partition { - id: partition_id, + id: partition.partition_id as usize, + consensus_group_id: partition.consensus_group_id, created_at: IggyTimestamp::now(), }; topic.partitions.push(partition); @@ -462,13 +496,13 @@ impl StateHandler for PurgeTopicRequest { } // TODO(hubcio): Serialize proper reply (e.g. assigned partition IDs) instead of empty Bytes. -impl StateHandler for CreatePartitionsRequest { +impl StateHandler for CreatePartitionsWithAssignmentsRequest { type State = StreamsInner; fn apply(&self, state: &mut StreamsInner) -> Bytes { - let Some(stream_id) = state.resolve_stream_id(&self.stream_id) else { + let Some(stream_id) = state.resolve_stream_id(&self.request.stream_id) else { return Bytes::new(); }; - let Some(topic_id) = state.resolve_topic_id(stream_id, &self.topic_id) else { + let Some(topic_id) = state.resolve_topic_id(stream_id, &self.request.topic_id) else { return Bytes::new(); }; @@ -479,11 +513,10 @@ impl StateHandler for CreatePartitionsRequest { return Bytes::new(); }; - let current_partition_count = topic.partitions.len(); - for i in 0..self.partitions_count as usize { - let partition_id = current_partition_count + i; + for partition in &self.partitions { let partition = Partition { - id: partition_id, + id: partition.partition_id as usize, + consensus_group_id: partition.consensus_group_id, created_at: IggyTimestamp::now(), }; topic.partitions.push(partition); @@ -561,6 +594,7 @@ impl Snapshotable for Streams { .iter() .map(|p| PartitionSnapshot { id: p.id, + consensus_group_id: p.consensus_group_id, created_at: p.created_at, }) .collect(), @@ -630,6 +664,7 @@ impl Snapshotable for Streams { .into_iter() .map(|p| Partition { id: p.id, + consensus_group_id: p.consensus_group_id, created_at: p.created_at, }) .collect(), @@ -666,3 +701,115 @@ impl Snapshotable for Streams { } impl_fill_restore!(Streams, streams); + +#[cfg(test)] +mod tests { + use super::*; + use iggy_binary_protocol::WireName; + use iggy_binary_protocol::requests::partitions::{ + CreatePartitionsRequest as WireCreatePartitionsRequest, + CreatePartitionsWithAssignmentsRequest, CreatedPartitionAssignment, + }; + use iggy_binary_protocol::requests::topics::{ + CreateTopicRequest as WireCreateTopicRequest, CreateTopicWithAssignmentsRequest, + }; + + fn create_stream(inner: &mut StreamsInner, name: &str) { + let request = CreateStreamRequest { + name: WireName::new(name).unwrap(), + }; + let _ = StateHandler::apply(&request, inner); + } + + fn make_topic_request( + stream_id: u32, + partitions_count: u32, + name: &str, + ) -> WireCreateTopicRequest { + WireCreateTopicRequest { + stream_id: WireIdentifier::numeric(stream_id), + partitions_count, + compression_algorithm: 0, + message_expiry: 0, + max_topic_size: 0, + replication_factor: 1, + name: WireName::new(name).unwrap(), + } + } + + #[test] + fn current_partition_count_scans_existing_topic_state() { + let mut inner = StreamsInner::new(); + create_stream(&mut inner, "stream"); + let create_topic = CreateTopicWithAssignmentsRequest { + request: make_topic_request(0, 2, "topic"), + partitions: vec![ + CreatedPartitionAssignment { + partition_id: 0, + consensus_group_id: 1, + }, + CreatedPartitionAssignment { + partition_id: 1, + consensus_group_id: 2, + }, + ], + }; + let _ = StateHandler::apply(&create_topic, &mut inner); + let streams: Streams = inner.into(); + + assert_eq!( + streams + .current_partition_count(&WireIdentifier::numeric(0), &WireIdentifier::numeric(0)), + Some(2) + ); + } + + #[test] + fn applying_enriched_create_commands_stores_consensus_group_ids() { + let mut inner = StreamsInner::new(); + create_stream(&mut inner, "stream"); + let create_topic = CreateTopicWithAssignmentsRequest { + request: make_topic_request(0, 2, "topic"), + partitions: vec![ + CreatedPartitionAssignment { + partition_id: 0, + consensus_group_id: 10, + }, + CreatedPartitionAssignment { + partition_id: 1, + consensus_group_id: 11, + }, + ], + }; + let _ = StateHandler::apply(&create_topic, &mut inner); + + let create_partitions = CreatePartitionsWithAssignmentsRequest { + request: WireCreatePartitionsRequest { + stream_id: WireIdentifier::numeric(0), + topic_id: WireIdentifier::numeric(0), + partitions_count: 2, + }, + partitions: vec![ + CreatedPartitionAssignment { + partition_id: 2, + consensus_group_id: 12, + }, + CreatedPartitionAssignment { + partition_id: 3, + consensus_group_id: 13, + }, + ], + }; + let _ = StateHandler::apply(&create_partitions, &mut inner); + + assert_eq!(inner.items[0].topics[0].partitions.len(), 4); + assert_eq!( + inner.items[0].topics[0].partitions[0].consensus_group_id, + 10 + ); + assert_eq!( + inner.items[0].topics[0].partitions[3].consensus_group_id, + 13 + ); + } +} diff --git a/core/shard/src/lib.rs b/core/shard/src/lib.rs index acb8655a3..a1ffb21c2 100644 --- a/core/shard/src/lib.rs +++ b/core/shard/src/lib.rs @@ -27,6 +27,7 @@ use iggy_common::{PartitionStats, sharding::IggyNamespace}; use journal::{Journal, JournalHandle}; use message_bus::MessageBus; use metadata::IggyMetadata; +use metadata::impls::metadata::StreamsFrontend; use metadata::stm::StateMachine; use partitions::{IggyPartition, IggyPartitions}; use shards_table::ShardsTable; @@ -236,7 +237,7 @@ where Input = Message<PrepareHeader>, Output = bytes::Bytes, Error = iggy_common::IggyError, - >, + > + StreamsFrontend, { match MessageBag::try_from(message) { Ok(MessageBag::Request(request)) => self.on_request(request).await, @@ -262,7 +263,7 @@ where Input = Message<PrepareHeader>, Output = bytes::Bytes, Error = iggy_common::IggyError, - >, + > + StreamsFrontend, { self.plane.on_request(request).await; } @@ -281,7 +282,7 @@ where Input = Message<PrepareHeader>, Output = bytes::Bytes, Error = iggy_common::IggyError, - >, + > + StreamsFrontend, { self.plane.on_replicate(prepare).await; } @@ -300,7 +301,7 @@ where Input = Message<PrepareHeader>, Output = bytes::Bytes, Error = iggy_common::IggyError, - >, + > + StreamsFrontend, { self.plane.on_ack(prepare_ok).await; } @@ -330,7 +331,7 @@ where Input = Message<PrepareHeader>, Output = bytes::Bytes, Error = iggy_common::IggyError, - >, + > + StreamsFrontend, { debug_assert!(buf.is_empty(), "buf must be empty on entry"); diff --git a/core/shard/src/router.rs b/core/shard/src/router.rs index 8965a6a05..16dbbe8ff 100644 --- a/core/shard/src/router.rs +++ b/core/shard/src/router.rs @@ -22,6 +22,7 @@ use iggy_binary_protocol::{ConsensusError, GenericHeader, Message, MessageBag, P use iggy_common::sharding::IggyNamespace; use journal::{Journal, JournalHandle}; use message_bus::MessageBus; +use metadata::impls::metadata::StreamsFrontend; use metadata::stm::StateMachine; /// Inter-shard dispatch logic. @@ -154,7 +155,7 @@ where Input = Message<PrepareHeader>, Output = bytes::Bytes, Error = iggy_common::IggyError, - >, + > + StreamsFrontend, { loop { futures::select! { @@ -188,7 +189,7 @@ where Input = Message<PrepareHeader>, Output = bytes::Bytes, Error = iggy_common::IggyError, - >, + > + StreamsFrontend, { self.on_message(frame.message).await; // TODO: once on_message returns an R (e.g. ShardResponse), send it
