This is an automated email from the ASF dual-hosted git repository.
hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 3904ef310 feat(cpp): Add functions related to consumer groups (#2988)
3904ef310 is described below
commit 3904ef3104575e26e8553a3456c9a4f6b2047625
Author: Rimuksh Kansal <[email protected]>
AuthorDate: Wed Apr 1 15:31:57 2026 +0900
feat(cpp): Add functions related to consumer groups (#2988)
---
foreign/cpp/BUILD.bazel | 25 +-
foreign/cpp/Cargo.toml | 1 +
foreign/cpp/build.rs | 1 +
foreign/cpp/src/client.rs | 213 +++++++-
foreign/cpp/src/consumer_group.rs | 46 ++
foreign/cpp/src/identifier.rs | 16 +
foreign/cpp/src/lib.rs | 58 ++-
foreign/cpp/src/stream.rs | 2 +-
foreign/cpp/src/topic.rs | 4 +-
foreign/cpp/tests/client/low_level_e2e.cpp | 1 +
foreign/cpp/tests/common/test_helpers.hpp | 16 +-
foreign/cpp/tests/consumer_group/low_level_e2e.cpp | 572 +++++++++++++++++++++
foreign/cpp/tests/identifier/unit_tests.cpp | 152 ++++++
foreign/cpp/tests/partition/low_level_e2e.cpp | 530 +++++++++++++++++++
foreign/cpp/tests/stream/low_level_e2e.cpp | 23 +
foreign/cpp/tests/topic/low_level_e2e.cpp | 2 +
16 files changed, 1632 insertions(+), 30 deletions(-)
diff --git a/foreign/cpp/BUILD.bazel b/foreign/cpp/BUILD.bazel
index 614ac6349..8dbeb65bc 100644
--- a/foreign/cpp/BUILD.bazel
+++ b/foreign/cpp/BUILD.bazel
@@ -100,6 +100,13 @@ cc_library(
"cxxbridge",
"include",
],
+ copts = select({
+ "@platforms//os:windows": ["/utf-8"],
+ "//conditions:default": [
+ "-finput-charset=UTF-8",
+ "-fexec-charset=UTF-8",
+ ],
+ }),
linkopts = [
"-ldl",
"-lpthread",
@@ -117,9 +124,18 @@ cc_library(
cc_test(
name = "unit",
- srcs = [
+ srcs = glob([
+ "tests/*/unit_tests.cpp",
+ ]) + [
"tests/unit_tests.cpp",
],
+ copts = select({
+ "@platforms//os:windows": ["/utf-8"],
+ "//conditions:default": [
+ "-finput-charset=UTF-8",
+ "-fexec-charset=UTF-8",
+ ],
+ }),
deps = [
":iggy-cpp",
"@googletest//:gtest_main",
@@ -133,6 +149,13 @@ cc_test(
]) + [
"tests/common/test_helpers.hpp",
],
+ copts = select({
+ "@platforms//os:windows": ["/utf-8"],
+ "//conditions:default": [
+ "-finput-charset=UTF-8",
+ "-fexec-charset=UTF-8",
+ ],
+ }),
tags = [
"e2e",
],
diff --git a/foreign/cpp/Cargo.toml b/foreign/cpp/Cargo.toml
index 8e1ed032c..477cb0b74 100644
--- a/foreign/cpp/Cargo.toml
+++ b/foreign/cpp/Cargo.toml
@@ -29,6 +29,7 @@ crate-type = ["staticlib"]
[dependencies]
cxx = "1.0.194"
iggy = { path = "../../core/sdk" }
+iggy_common = { path = "../../core/common" }
tokio = { version = "1.49.0", features = ["rt-multi-thread"] }
[build-dependencies]
diff --git a/foreign/cpp/build.rs b/foreign/cpp/build.rs
index 1f80ed782..476a9d796 100644
--- a/foreign/cpp/build.rs
+++ b/foreign/cpp/build.rs
@@ -21,6 +21,7 @@ fn main() {
.compile("iggy-cpp-bridge");
println!("cargo:rerun-if-changed=src/client.rs");
+ println!("cargo:rerun-if-changed=src/consumer_group.rs");
println!("cargo:rerun-if-changed=src/identifier.rs");
println!("cargo:rerun-if-changed=src/lib.rs");
println!("cargo:rerun-if-changed=src/stream.rs");
diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs
index 894c78060..341e0c1a0 100644
--- a/foreign/cpp/src/client.rs
+++ b/foreign/cpp/src/client.rs
@@ -18,9 +18,9 @@
use crate::{RUNTIME, ffi};
use iggy::prelude::{
Client as IggyConnectionClient, CompressionAlgorithm as
RustCompressionAlgorithm,
- Identifier as RustIdentifier, IggyClient as RustIggyClient,
+ ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as
RustIggyClient,
IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as
RustIggyExpiry,
- MaxTopicSize as RustMaxTopicSize, StreamClient, TopicClient, UserClient,
+ MaxTopicSize as RustMaxTopicSize, PartitionClient, StreamClient,
TopicClient, UserClient,
};
use std::str::FromStr;
use std::sync::Arc;
@@ -53,22 +53,22 @@ pub fn new_connection(connection_string: String) ->
Result<*mut Client, String>
}
impl Client {
- pub fn connect(&self) -> Result<(), String> {
+ pub fn login_user(&self, username: String, password: String) -> Result<(),
String> {
RUNTIME.block_on(async {
self.inner
- .connect()
+ .login_user(&username, &password)
.await
- .map_err(|error| format!("Could not connect: {error}"))?;
+ .map_err(|error| format!("Could not login user '{}': {error}",
username))?;
Ok(())
})
}
- pub fn login_user(&self, username: String, password: String) -> Result<(),
String> {
+ pub fn connect(&self) -> Result<(), String> {
RUNTIME.block_on(async {
self.inner
- .login_user(&username, &password)
+ .connect()
.await
- .map_err(|error| format!("Could not login user '{}': {error}",
username))?;
+ .map_err(|error| format!("Could not connect: {error}"))?;
Ok(())
})
}
@@ -114,6 +114,19 @@ impl Client {
})
}
+ // pub fn purge_stream(&self, stream_id: ffi::Identifier) -> Result<(),
String> {
+ // let rust_stream_id = RustIdentifier::try_from(stream_id)
+ // .map_err(|error| format!("Could not purge stream: {error}"))?;
+
+ // RUNTIME.block_on(async {
+ // self.inner
+ // .purge_stream(&rust_stream_id)
+ // .await
+ // .map_err(|error| format!("Could not purge stream '{}':
{error}", rust_stream_id))?;
+ // Ok(())
+ // })
+ // }
+
#[allow(clippy::too_many_arguments)]
pub fn create_topic(
&self,
@@ -144,7 +157,9 @@ impl Client {
let rust_message_expiry = match message_expiry_kind.as_str() {
"" | "server_default" | "default" => RustIggyExpiry::ServerDefault,
"never_expire" => RustIggyExpiry::NeverExpire,
- "duration" =>
RustIggyExpiry::ExpireDuration(message_expiry_value.into()),
+ "duration" =>
RustIggyExpiry::ExpireDuration(iggy::prelude::IggyDuration::from(
+ message_expiry_value,
+ )),
_ => {
return Err(format!(
"Could not create topic '{}': invalid message expiry kind
'{}'",
@@ -183,6 +198,186 @@ impl Client {
Ok(())
})
}
+
+ // pub fn purge_topic(
+ // &self,
+ // stream_id: ffi::Identifier,
+ // topic_id: ffi::Identifier,
+ // ) -> Result<(), String> {
+ // let rust_stream_id =
RustIdentifier::try_from(stream_id).map_err(|error| {
+ // format!("Could not purge topic: invalid stream identifier:
{error}")
+ // })?;
+ // let rust_topic_id = RustIdentifier::try_from(topic_id)
+ // .map_err(|error| format!("Could not purge topic: invalid topic
identifier: {error}"))?;
+
+ // RUNTIME.block_on(async {
+ // self.inner
+ // .purge_topic(&rust_stream_id, &rust_topic_id)
+ // .await
+ // .map_err(|error| {
+ // format!(
+ // "Could not purge topic '{}' on stream '{}':
{error}",
+ // rust_topic_id, rust_stream_id
+ // )
+ // })?;
+ // Ok(())
+ // })
+ // }
+
+ pub fn create_partitions(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partitions_count: u32,
+ ) -> Result<(), String> {
+ let rust_stream_id =
RustIdentifier::try_from(stream_id).map_err(|error| {
+ format!("Could not create partitions: invalid stream identifier:
{error}")
+ })?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error|
{
+ format!("Could not create partitions: invalid topic identifier:
{error}")
+ })?;
+
+ RUNTIME.block_on(async {
+ self.inner
+ .create_partitions(&rust_stream_id, &rust_topic_id,
partitions_count)
+ .await
+ .map_err(|error| {
+ format!(
+ "Could not create {partitions_count} partitions for
topic '{}' on stream '{}': {error}",
+ rust_topic_id, rust_stream_id
+ )
+ })?;
+ Ok(())
+ })
+ }
+
+ pub fn delete_partitions(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partitions_count: u32,
+ ) -> Result<(), String> {
+ let rust_stream_id =
RustIdentifier::try_from(stream_id).map_err(|error| {
+ format!("Could not delete partitions: invalid stream identifier:
{error}")
+ })?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error|
{
+ format!("Could not delete partitions: invalid topic identifier:
{error}")
+ })?;
+
+ RUNTIME.block_on(async {
+ self.inner
+ .delete_partitions(&rust_stream_id, &rust_topic_id,
partitions_count)
+ .await
+ .map_err(|error| {
+ format!(
+ "Could not delete {partitions_count} partitions for
topic '{}' on stream '{}': {error}",
+ rust_topic_id, rust_stream_id
+ )
+ })?;
+ Ok(())
+ })
+ }
+
+ pub fn create_consumer_group(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ name: String,
+ ) -> Result<ffi::ConsumerGroupDetails, String> {
+ let rust_stream_id =
RustIdentifier::try_from(stream_id).map_err(|error| {
+ format!(
+ "Could not create consumer group '{}': invalid stream
identifier: {error}",
+ name
+ )
+ })?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error|
{
+ format!(
+ "Could not create consumer group '{}': invalid topic
identifier: {error}",
+ name
+ )
+ })?;
+
+ RUNTIME.block_on(async {
+ let group = self
+ .inner
+ .create_consumer_group(&rust_stream_id, &rust_topic_id, &name)
+ .await
+ .map_err(|error| {
+ format!(
+ "Could not create consumer group '{}' for topic '{}'
on stream '{}': {error}",
+ name, rust_topic_id, rust_stream_id
+ )
+ })?;
+ Ok(ffi::ConsumerGroupDetails::from(group))
+ })
+ }
+
+ pub fn get_consumer_group(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ group_id: ffi::Identifier,
+ ) -> Result<ffi::ConsumerGroupDetails, String> {
+ let rust_stream_id =
RustIdentifier::try_from(stream_id).map_err(|error| {
+ format!("Could not get consumer group: invalid stream identifier:
{error}")
+ })?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error|
{
+ format!("Could not get consumer group: invalid topic identifier:
{error}")
+ })?;
+ let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error|
{
+ format!("Could not get consumer group: invalid group identifier:
{error}")
+ })?;
+
+ RUNTIME.block_on(async {
+ let group = self
+ .inner
+ .get_consumer_group(&rust_stream_id, &rust_topic_id,
&rust_group_id)
+ .await
+ .map_err(|error| {
+ format!(
+ "Could not get consumer group '{}' for topic '{}' on
stream '{}': {error}",
+ rust_group_id, rust_topic_id, rust_stream_id
+ )
+ })?;
+ let group = group.ok_or_else(|| {
+ format!(
+ "Consumer group '{}' was not found for topic '{}' on
stream '{}'",
+ rust_group_id, rust_topic_id, rust_stream_id
+ )
+ })?;
+ Ok(ffi::ConsumerGroupDetails::from(group))
+ })
+ }
+
+ pub fn delete_consumer_group(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ group_id: ffi::Identifier,
+ ) -> Result<(), String> {
+ let rust_stream_id =
RustIdentifier::try_from(stream_id).map_err(|error| {
+ format!("Could not delete consumer group: invalid stream
identifier: {error}")
+ })?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error|
{
+ format!("Could not delete consumer group: invalid topic
identifier: {error}")
+ })?;
+ let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error|
{
+ format!("Could not delete consumer group: invalid group
identifier: {error}")
+ })?;
+
+ RUNTIME.block_on(async {
+ self.inner
+ .delete_consumer_group(&rust_stream_id, &rust_topic_id,
&rust_group_id)
+ .await
+ .map_err(|error| {
+ format!(
+ "Could not delete consumer group '{}' for topic '{}'
on stream '{}': {error}",
+ rust_group_id, rust_topic_id, rust_stream_id
+ )
+ })?;
+ Ok(())
+ })
+ }
}
pub unsafe fn delete_connection(client: *mut Client) -> Result<(), String> {
diff --git a/foreign/cpp/src/consumer_group.rs
b/foreign/cpp/src/consumer_group.rs
new file mode 100644
index 000000000..28a2d92e2
--- /dev/null
+++ b/foreign/cpp/src/consumer_group.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use iggy::prelude::ConsumerGroupDetails as RustConsumerGroupDetails;
+use iggy_common::ConsumerGroupMember as RustConsumerGroupMember;
+
+impl From<RustConsumerGroupMember> for ffi::ConsumerGroupMember {
+ fn from(member: RustConsumerGroupMember) -> Self {
+ ffi::ConsumerGroupMember {
+ id: member.id,
+ partitions_count: member.partitions_count,
+ partitions: member.partitions,
+ }
+ }
+}
+
+impl From<RustConsumerGroupDetails> for ffi::ConsumerGroupDetails {
+ fn from(group: RustConsumerGroupDetails) -> Self {
+ ffi::ConsumerGroupDetails {
+ id: group.id,
+ name: group.name,
+ partitions_count: group.partitions_count,
+ members_count: group.members_count,
+ members: group
+ .members
+ .into_iter()
+ .map(ffi::ConsumerGroupMember::from)
+ .collect(),
+ }
+ }
+}
diff --git a/foreign/cpp/src/identifier.rs b/foreign/cpp/src/identifier.rs
index bbd954168..02c9d8a48 100644
--- a/foreign/cpp/src/identifier.rs
+++ b/foreign/cpp/src/identifier.rs
@@ -61,3 +61,19 @@ impl TryFrom<ffi::Identifier> for RustIdentifier {
Ok(rust_identifier)
}
}
+
+impl ffi::Identifier {
+ pub fn from_string(&mut self, id: String) -> Result<(), String> {
+ *self = RustIdentifier::named(&id)
+ .map(ffi::Identifier::from)
+ .map_err(|error| format!("Could not create string identifier:
{error}"))?;
+ Ok(())
+ }
+
+ pub fn from_numeric(&mut self, id: u32) -> Result<(), String> {
+ *self = RustIdentifier::numeric(id)
+ .map(ffi::Identifier::from)
+ .map_err(|error| format!("Could not create numeric identifier:
{error}"))?;
+ Ok(())
+ }
+}
diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs
index 021a7d944..4d47a5b94 100644
--- a/foreign/cpp/src/lib.rs
+++ b/foreign/cpp/src/lib.rs
@@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.
mod client;
+mod consumer_group;
mod identifier;
mod stream;
mod topic;
@@ -60,18 +61,34 @@ mod ffi {
topics: Vec<Topic>,
}
+ struct ConsumerGroupMember {
+ id: u32,
+ partitions_count: u32,
+ partitions: Vec<u32>,
+ }
+
+ struct ConsumerGroupDetails {
+ id: u32,
+ name: String,
+ partitions_count: u32,
+ members_count: u32,
+ members: Vec<ConsumerGroupMember>,
+ }
+
extern "Rust" {
type Client;
+ // Client functions
fn new_connection(connection_string: String) -> Result<*mut Client>;
fn login_user(self: &Client, username: String, password: String) ->
Result<()>;
fn connect(self: &Client) -> Result<()>;
- fn create_stream(&self, stream_name: String) -> Result<()>;
+ fn create_stream(self: &Client, stream_name: String) -> Result<()>;
fn get_stream(self: &Client, stream_id: Identifier) ->
Result<StreamDetails>;
- fn delete_stream(&self, stream_id: Identifier) -> Result<()>;
+ fn delete_stream(self: &Client, stream_id: Identifier) -> Result<()>;
+ // fn purge_stream(&self, stream_id: Identifier) -> Result<()>;
#[allow(clippy::too_many_arguments)]
fn create_topic(
- &self,
+ self: &Client,
stream_id: Identifier,
topic_name: String,
partitions_count: u32,
@@ -81,7 +98,42 @@ mod ffi {
message_expiry_value: u64,
max_topic_size: String,
) -> Result<()>;
+ // fn purge_topic(&self, stream_id: Identifier, topic_id: Identifier)
-> Result<()>;
+ fn create_partitions(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ partitions_count: u32,
+ ) -> Result<()>;
+ fn delete_partitions(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ partitions_count: u32,
+ ) -> Result<()>;
+ fn create_consumer_group(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ name: String,
+ ) -> Result<ConsumerGroupDetails>;
+ fn get_consumer_group(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ group_id: Identifier,
+ ) -> Result<ConsumerGroupDetails>;
+ fn delete_consumer_group(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ group_id: Identifier,
+ ) -> Result<()>;
unsafe fn delete_connection(client: *mut Client) -> Result<()>;
+
+ // Identifier functions
+ fn from_string(self: &mut Identifier, id: String) -> Result<()>;
+ fn from_numeric(self: &mut Identifier, id: u32) -> Result<()>;
}
}
diff --git a/foreign/cpp/src/stream.rs b/foreign/cpp/src/stream.rs
index 7cb26c249..5c1f6e9d7 100644
--- a/foreign/cpp/src/stream.rs
+++ b/foreign/cpp/src/stream.rs
@@ -27,7 +27,7 @@ impl From<RustStreamDetails> for ffi::StreamDetails {
size_bytes: stream.size.as_bytes_u64(),
messages_count: stream.messages_count,
topics_count: stream.topics_count,
- topics: stream.topics.into_iter().map(Into::into).collect(),
+ topics: stream.topics.into_iter().map(ffi::Topic::from).collect(),
}
}
}
diff --git a/foreign/cpp/src/topic.rs b/foreign/cpp/src/topic.rs
index 6bc1d3f86..ac1bae30b 100644
--- a/foreign/cpp/src/topic.rs
+++ b/foreign/cpp/src/topic.rs
@@ -25,9 +25,9 @@ impl From<RustTopic> for ffi::Topic {
created_at: topic.created_at.as_micros(),
name: topic.name,
size_bytes: topic.size.as_bytes_u64(),
- message_expiry: topic.message_expiry.into(),
+ message_expiry: u64::from(topic.message_expiry),
compression_algorithm: topic.compression_algorithm.to_string(),
- max_topic_size: topic.max_topic_size.into(),
+ max_topic_size: u64::from(topic.max_topic_size),
replication_factor: topic.replication_factor,
messages_count: topic.messages_count,
partitions_count: topic.partitions_count,
diff --git a/foreign/cpp/tests/client/low_level_e2e.cpp
b/foreign/cpp/tests/client/low_level_e2e.cpp
index 6eaeacecd..26b82b82f 100644
--- a/foreign/cpp/tests/client/low_level_e2e.cpp
+++ b/foreign/cpp/tests/client/low_level_e2e.cpp
@@ -22,6 +22,7 @@
#include <gtest/gtest.h>
#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
TEST(LowLevelE2E_Client, ConnectAndLogin) {
RecordProperty("description", "Connects and logs in successfully using
each supported connection string format.");
diff --git a/foreign/cpp/tests/common/test_helpers.hpp
b/foreign/cpp/tests/common/test_helpers.hpp
index 5d76aa8da..5457c09d7 100644
--- a/foreign/cpp/tests/common/test_helpers.hpp
+++ b/foreign/cpp/tests/common/test_helpers.hpp
@@ -19,7 +19,6 @@
#pragma once
-#include <cassert>
#include <cstdint>
#include <string>
@@ -27,24 +26,13 @@
inline iggy::ffi::Identifier make_string_identifier(const std::string &value) {
iggy::ffi::Identifier identifier;
- identifier.kind = "string";
- identifier.length = static_cast<std::uint8_t>(value.size());
- for (const char c : value) {
- identifier.value.push_back(static_cast<std::uint8_t>(c));
- }
- assert(identifier.length == identifier.value.size());
+ identifier.from_string(value);
return identifier;
}
inline iggy::ffi::Identifier make_numeric_identifier(const std::uint32_t
value) {
iggy::ffi::Identifier identifier;
- identifier.kind = "numeric";
- identifier.length = 4;
- identifier.value.push_back(static_cast<std::uint8_t>(value & 0xFF));
- identifier.value.push_back(static_cast<std::uint8_t>((value >> 8) & 0xFF));
- identifier.value.push_back(static_cast<std::uint8_t>((value >> 16) &
0xFF));
- identifier.value.push_back(static_cast<std::uint8_t>((value >> 24) &
0xFF));
- assert(identifier.length == identifier.value.size());
+ identifier.from_numeric(value);
return identifier;
}
diff --git a/foreign/cpp/tests/consumer_group/low_level_e2e.cpp
b/foreign/cpp/tests/consumer_group/low_level_e2e.cpp
new file mode 100644
index 000000000..1391eaf70
--- /dev/null
+++ b/foreign/cpp/tests/consumer_group/low_level_e2e.cpp
@@ -0,0 +1,572 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// TODO(slbotbm): create fixture for setup/teardown.
+
+#include <cstdint>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_ConsumerGroup, CreateConsumerGroupSucceeds) {
+ RecordProperty("description", "Creates a consumer group successfully for
an existing stream and topic.");
+ const std::string stream_name = "client-create-group-happy-stream";
+ const std::string topic_name = "client-create-group-happy-topic";
+ const std::string group_name = "client-create-group-happy";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ ASSERT_NO_THROW({
+ const auto group =
client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name);
+ ASSERT_EQ(group.name, group_name);
+ ASSERT_EQ(group.members_count, 0);
+ ASSERT_TRUE(group.members.empty());
+ });
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup,
CreateConsumerGroupWithInvalidIdentifiersThrows) {
+ RecordProperty("description", "Rejects malformed stream and topic
identifiers before creating a consumer group.");
+ const std::string stream_name = "client-create-group-invalid-id-stream";
+ const std::string topic_name = "client-create-group-invalid-id-topic";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ iggy::ffi::Identifier invalid_stream_id;
+ invalid_stream_id.kind = "invalid";
+ invalid_stream_id.length = 4;
+ invalid_stream_id.value = {1, 2, 3, 4};
+
+ iggy::ffi::Identifier invalid_topic_id;
+ invalid_topic_id.kind = "numeric";
+ invalid_topic_id.length = 3;
+ invalid_topic_id.value = {1, 2, 3};
+
+ ASSERT_THROW(client->create_consumer_group(std::move(invalid_stream_id),
make_string_identifier(topic_name),
+
"client-create-group-invalid-stream-id"),
+ std::exception);
+
ASSERT_THROW(client->create_consumer_group(make_string_identifier(stream_name),
std::move(invalid_topic_id),
+
"client-create-group-invalid-topic-id"),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup,
CreateConsumerGroupOnNonExistentResourcesThrows) {
+ RecordProperty("description", "Rejects creating a consumer group on
streams or topics that do not exist.");
+ const std::string stream_name =
"client-create-group-missing-resource-stream";
+ const std::string topic_name =
"client-create-group-missing-resource-topic";
+ const std::string missing_stream_name =
"client-create-group-missing-stream";
+ const std::string missing_topic_name =
"client-create-group-missing-topic";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ ASSERT_THROW(
+
client->create_consumer_group(make_string_identifier(missing_stream_name),
make_string_identifier(topic_name),
+
"client-create-group-missing-stream-group"),
+ std::exception);
+ ASSERT_THROW(
+ client->create_consumer_group(make_string_identifier(stream_name),
make_string_identifier(missing_topic_name),
+
"client-create-group-missing-topic-group"),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, CreateConsumerGroupTwiceOnSameInputThrows) {
+ RecordProperty("description", "Rejects creating the same consumer group
twice for the same stream and topic.");
+ const std::string stream_name = "client-create-group-duplicate-stream";
+ const std::string topic_name = "client-create-group-duplicate-topic";
+ const std::string group_name = "client-create-group-duplicate";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name));
+
+
ASSERT_THROW(client->create_consumer_group(make_string_identifier(stream_name),
make_string_identifier(topic_name),
+ group_name),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup,
CreateConsumerGroupWithNumericIdentifiersSucceeds) {
+ RecordProperty("description",
+ "Creates a consumer group successfully when stream and
topic are addressed by numeric identifiers.");
+ const std::string stream_name = "client-create-group-numeric-stream";
+ const std::string topic_name = "client-create-group-numeric-topic";
+ const std::string group_name = "client-create-group-numeric";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ ASSERT_EQ(stream_details.topics.size(), 1);
+
+ ASSERT_NO_THROW({
+ const auto group =
+
client->create_consumer_group(make_numeric_identifier(stream_details.id),
+
make_numeric_identifier(stream_details.topics[0].id), group_name);
+ ASSERT_EQ(group.name, group_name);
+ ASSERT_EQ(group.members_count, 0);
+ ASSERT_TRUE(group.members.empty());
+ });
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, CreateConsumerGroupWithInvalidNamesThrows) {
+ RecordProperty("description", "Rejects empty and overlong consumer group
names.");
+ const std::string stream_name = "client-create-group-invalid-name-stream";
+ const std::string topic_name = "client-create-group-invalid-name-topic";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ const std::string invalid_names[] = {"", std::string(256, 'a')};
+ for (const std::string &invalid_name : invalid_names) {
+ SCOPED_TRACE(invalid_name.size());
+
ASSERT_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), invalid_name),
+ std::exception);
+ }
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, CreateConsumerGroupAfterStreamDeletionThrows) {
+ RecordProperty("description", "Rejects creating a consumer group after
deleting the stream that owned the topic.");
+ const std::string stream_name =
"client-create-group-after-stream-delete-stream";
+ const std::string topic_name =
"client-create-group-after-stream-delete-topic";
+ const std::string group_name = "client-create-group-after-stream-delete";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+
+
ASSERT_THROW(client->create_consumer_group(make_string_identifier(stream_name),
make_string_identifier(topic_name),
+ group_name),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, CreateConsumerGroupBeforeLoginThrows) {
+ RecordProperty("description",
+ "Rejects creating a consumer group before connect, and
after connect but before login.");
+ const std::string stream_name = "client-create-group-before-login-stream";
+ const std::string topic_name = "client-create-group-before-login-topic";
+ const std::string group_name = "client-create-group-before-login";
+
+ iggy::ffi::Client *setup_client = login_to_server();
+ ASSERT_NE(setup_client, nullptr);
+ ASSERT_NO_THROW(setup_client->create_stream(stream_name));
+
ASSERT_NO_THROW(setup_client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(setup_client));
+ setup_client = nullptr;
+
+ iggy::ffi::Client *unauthenticated_client = nullptr;
+ ASSERT_NO_THROW({ unauthenticated_client = iggy::ffi::new_connection("");
});
+ ASSERT_NE(unauthenticated_client, nullptr);
+
+
ASSERT_THROW(unauthenticated_client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name),
+ std::exception);
+ ASSERT_NO_THROW(unauthenticated_client->connect());
+
ASSERT_THROW(unauthenticated_client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name),
+ std::exception);
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(unauthenticated_client));
+ unauthenticated_client = nullptr;
+}
+
+TEST(LowLevelE2E_ConsumerGroup,
GetConsumerGroupReturnsSameInfoAsCreateConsumerGroup) {
+ RecordProperty("description",
+ "Returns the same consumer group details from
get_consumer_group as create_consumer_group.");
+ const std::string stream_name = "client-get-group-happy-stream";
+ const std::string topic_name = "client-get-group-happy-topic";
+ const std::string group_name = "client-get-group-happy";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ const auto created_group =
client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name);
+ const auto fetched_group = client->get_consumer_group(
+ make_string_identifier(stream_name),
make_string_identifier(topic_name), make_string_identifier(group_name));
+
+ ASSERT_EQ(fetched_group.id, created_group.id);
+ ASSERT_EQ(fetched_group.name, created_group.name);
+ ASSERT_EQ(fetched_group.partitions_count, created_group.partitions_count);
+ ASSERT_EQ(fetched_group.members_count, created_group.members_count);
+ ASSERT_EQ(fetched_group.members.size(), created_group.members.size());
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, GetConsumerGroupBeforeLoginThrows) {
+ RecordProperty("description", "Rejects get_consumer_group before connect,
and after connect but before login.");
+ const std::string stream_name = "client-get-group-before-login-stream";
+ const std::string topic_name = "client-get-group-before-login-topic";
+ const std::string group_name = "client-get-group-before-login";
+
+ iggy::ffi::Client *setup_client = login_to_server();
+ ASSERT_NE(setup_client, nullptr);
+ ASSERT_NO_THROW(setup_client->create_stream(stream_name));
+
ASSERT_NO_THROW(setup_client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(setup_client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(setup_client));
+ setup_client = nullptr;
+
+ iggy::ffi::Client *unauthenticated_client = nullptr;
+ ASSERT_NO_THROW({ unauthenticated_client = iggy::ffi::new_connection("");
});
+ ASSERT_NE(unauthenticated_client, nullptr);
+
+
ASSERT_THROW(unauthenticated_client->get_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+ ASSERT_NO_THROW(unauthenticated_client->connect());
+
ASSERT_THROW(unauthenticated_client->get_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(unauthenticated_client));
+ unauthenticated_client = nullptr;
+}
+
+TEST(LowLevelE2E_ConsumerGroup, GetConsumerGroupWithInvalidIdentifiersThrows) {
+ RecordProperty("description", "Rejects get_consumer_group when the stream
or topic identifier is invalid.");
+ const std::string stream_name = "client-get-group-invalid-id-stream";
+ const std::string topic_name = "client-get-group-invalid-id-topic";
+ const std::string group_name = "client-get-group-invalid-id-group";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name));
+
+ ASSERT_THROW(client->get_consumer_group(make_string_identifier(""),
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+
ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name),
make_string_identifier(""),
+
make_string_identifier(group_name)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, GetConsumerGroupOnNonExistentResourcesThrows) {
+ RecordProperty("description", "Rejects get_consumer_group for streams,
topics, or groups that do not exist.");
+ const std::string stream_name =
"client-get-group-missing-resource-stream";
+ const std::string topic_name =
"client-get-group-missing-resource-topic";
+ const std::string created_group_name = "client-get-group-existing-group";
+ const std::string missing_stream_name = "client-get-group-missing-stream";
+ const std::string missing_topic_name = "client-get-group-missing-topic";
+ const std::string missing_group_name = "client-get-group-missing-group";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), created_group_name));
+
+ ASSERT_THROW(
+
client->get_consumer_group(make_string_identifier(missing_stream_name),
make_string_identifier(topic_name),
+ make_string_identifier(created_group_name)),
+ std::exception);
+ ASSERT_THROW(
+ client->get_consumer_group(make_string_identifier(stream_name),
make_string_identifier(missing_topic_name),
+ make_string_identifier(created_group_name)),
+ std::exception);
+
ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name),
make_string_identifier(topic_name),
+
make_string_identifier(missing_group_name)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, GetConsumerGroupAfterStreamDeletionThrows) {
+ RecordProperty("description", "Rejects get_consumer_group after deleting
the stream that owned the group.");
+ const std::string stream_name =
"client-get-group-after-stream-delete-stream";
+ const std::string topic_name =
"client-get-group-after-stream-delete-topic";
+ const std::string group_name = "client-get-group-after-stream-delete";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name));
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+
+
ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name),
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupSucceeds) {
+ RecordProperty("description", "Deletes an existing consumer group
successfully.");
+ const std::string stream_name = "client-delete-group-happy-stream";
+ const std::string topic_name = "client-delete-group-happy-topic";
+ const std::string group_name = "client-delete-group-happy";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name));
+
+ ASSERT_NO_THROW(client->delete_consumer_group(
+ make_string_identifier(stream_name),
make_string_identifier(topic_name), make_string_identifier(group_name)));
+
+
ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name),
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupBeforeLoginThrows) {
+ RecordProperty("description", "Rejects delete_consumer_group before
connect, and after connect but before login.");
+ const std::string stream_name = "client-delete-group-before-login-stream";
+ const std::string topic_name = "client-delete-group-before-login-topic";
+ const std::string group_name = "client-delete-group-before-login";
+
+ iggy::ffi::Client *setup_client = login_to_server();
+ ASSERT_NE(setup_client, nullptr);
+ ASSERT_NO_THROW(setup_client->create_stream(stream_name));
+
ASSERT_NO_THROW(setup_client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(setup_client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(setup_client));
+ setup_client = nullptr;
+
+ iggy::ffi::Client *unauthenticated_client = nullptr;
+ ASSERT_NO_THROW({ unauthenticated_client = iggy::ffi::new_connection("");
});
+ ASSERT_NE(unauthenticated_client, nullptr);
+
+
ASSERT_THROW(unauthenticated_client->delete_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+ ASSERT_NO_THROW(unauthenticated_client->connect());
+
ASSERT_THROW(unauthenticated_client->delete_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(unauthenticated_client));
+ unauthenticated_client = nullptr;
+}
+
+TEST(LowLevelE2E_ConsumerGroup,
DeleteConsumerGroupWithInvalidIdentifiersThrows) {
+ RecordProperty("description",
+ "Rejects delete_consumer_group when the stream, topic, or
group identifier is invalid.");
+ const std::string stream_name = "client-delete-group-invalid-id-stream";
+ const std::string topic_name = "client-delete-group-invalid-id-topic";
+ const std::string group_name = "client-delete-group-invalid-id-group";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name));
+
+ iggy::ffi::Identifier invalid_group_id;
+ invalid_group_id.kind = "numeric";
+ invalid_group_id.length = 3;
+ invalid_group_id.value = {1, 2, 3};
+
+ ASSERT_THROW(client->delete_consumer_group(make_string_identifier(""),
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+
ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name),
make_string_identifier(""),
+
make_string_identifier(group_name)),
+ std::exception);
+
ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name),
make_string_identifier(topic_name),
+ std::move(invalid_group_id)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup,
DeleteConsumerGroupOnNonExistentResourcesThrows) {
+ RecordProperty("description", "Rejects delete_consumer_group for streams,
topics, or groups that do not exist.");
+ const std::string stream_name =
"client-delete-group-missing-resource-stream";
+ const std::string topic_name =
"client-delete-group-missing-resource-topic";
+ const std::string created_group_name =
"client-delete-group-existing-group";
+ const std::string missing_stream_name =
"client-delete-group-missing-stream";
+ const std::string missing_topic_name =
"client-delete-group-missing-topic";
+ const std::string missing_group_name =
"client-delete-group-missing-group";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), created_group_name));
+
+ ASSERT_THROW(
+
client->delete_consumer_group(make_string_identifier(missing_stream_name),
make_string_identifier(topic_name),
+
make_string_identifier(created_group_name)),
+ std::exception);
+ ASSERT_THROW(
+ client->delete_consumer_group(make_string_identifier(stream_name),
make_string_identifier(missing_topic_name),
+
make_string_identifier(created_group_name)),
+ std::exception);
+
ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name),
make_string_identifier(topic_name),
+
make_string_identifier(missing_group_name)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupTwiceThrows) {
+ RecordProperty("description", "Rejects deleting the same consumer group
twice.");
+ const std::string stream_name = "client-delete-group-twice-stream";
+ const std::string topic_name = "client-delete-group-twice-topic";
+ const std::string group_name = "client-delete-group-twice";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name));
+ ASSERT_NO_THROW(client->delete_consumer_group(
+ make_string_identifier(stream_name),
make_string_identifier(topic_name), make_string_identifier(group_name)));
+
+
ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name),
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup,
DeleteConsumerGroupWithNumericIdentifiersSucceeds) {
+ RecordProperty(
+ "description",
+ "Deletes a consumer group successfully when stream, topic, and group
are addressed by numeric identifiers.");
+ const std::string stream_name = "client-delete-group-numeric-stream";
+ const std::string topic_name = "client-delete-group-numeric-topic";
+ const std::string group_name = "client-delete-group-numeric";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+ const auto created_group =
client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name);
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+
+
ASSERT_NO_THROW(client->delete_consumer_group(make_numeric_identifier(stream_details.id),
+
make_numeric_identifier(stream_details.topics[0].id),
+
make_numeric_identifier(created_group.id)));
+
+
ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name),
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupAfterStreamDeletionThrows) {
+ RecordProperty("description",
+ "Rejects delete_consumer_group after deleting the stream
that owned the consumer group.");
+ const std::string stream_name =
"client-delete-group-after-stream-delete-stream";
+ const std::string topic_name =
"client-delete-group-after-stream-delete-topic";
+ const std::string group_name = "client-delete-group-after-stream-delete";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name));
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+
+
ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name),
make_string_identifier(topic_name),
+
make_string_identifier(group_name)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup,
DeleteConsumerGroupAndRecreateWithSameNameSucceeds) {
+ RecordProperty("description",
+ "Allows recreating a consumer group with the same name
after the previous group is deleted.");
+ const std::string stream_name = "client-delete-group-recreate-stream";
+ const std::string topic_name = "client-delete-group-recreate-topic";
+ const std::string group_name = "client-delete-group-recreate";
+ iggy::ffi::Client *client = login_to_server();
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name));
+ ASSERT_NO_THROW(client->delete_consumer_group(
+ make_string_identifier(stream_name),
make_string_identifier(topic_name), make_string_identifier(group_name)));
+
+ ASSERT_NO_THROW({
+ const auto recreated_group =
client->create_consumer_group(make_string_identifier(stream_name),
+
make_string_identifier(topic_name), group_name);
+ ASSERT_EQ(recreated_group.id, 0u);
+ ASSERT_EQ(recreated_group.name, group_name);
+ ASSERT_EQ(recreated_group.members_count, 0);
+ ASSERT_TRUE(recreated_group.members.empty());
+ });
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
diff --git a/foreign/cpp/tests/identifier/unit_tests.cpp
b/foreign/cpp/tests/identifier/unit_tests.cpp
new file mode 100644
index 000000000..bed56c273
--- /dev/null
+++ b/foreign/cpp/tests/identifier/unit_tests.cpp
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <array>
+#include <cstdint>
+#include <limits>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+
+TEST(LowLevelE2E_Identifier, FromStringCreatesStringIdentifier) {
+ RecordProperty("description", "Creates a string identifier and preserves
its UTF-8 byte payload.");
+ const std::string value = "stream-identifier";
+ iggy::ffi::Identifier identifier;
+
+ ASSERT_NO_THROW(identifier.from_string(value));
+
+ ASSERT_EQ(identifier.kind, "string");
+ ASSERT_EQ(identifier.length, value.size());
+ ASSERT_EQ(identifier.value.size(), value.size());
+ for (size_t i = 0; i < value.size(); ++i) {
+ EXPECT_EQ(identifier.value[i], static_cast<std::uint8_t>(value[i]));
+ }
+}
+
+TEST(LowLevelE2E_Identifier, FromStringAcceptsExact255ByteUtf8Value) {
+ RecordProperty("description", "Accepts a UTF-8 string identifier whose
encoded byte length is exactly 255.");
+ std::string value;
+ for (size_t i = 0; i < 127; ++i) {
+ value += "\xC2\xA2";
+ }
+ value += "a";
+
+ ASSERT_EQ(value.size(), 255u);
+
+ iggy::ffi::Identifier identifier;
+ ASSERT_NO_THROW(identifier.from_string(value));
+
+ ASSERT_EQ(identifier.kind, "string");
+ ASSERT_EQ(identifier.length, value.size());
+ ASSERT_EQ(identifier.value.size(), value.size());
+ for (size_t i = 0; i < value.size(); ++i) {
+ EXPECT_EQ(identifier.value[i], static_cast<std::uint8_t>(value[i]));
+ }
+}
+
+TEST(LowLevelE2E_Identifier, FromStringRejectsEmptyValue) {
+ RecordProperty("description", "Rejects creating a string identifier from
an empty string.");
+ iggy::ffi::Identifier identifier;
+
+ ASSERT_THROW(identifier.from_string(""), std::exception);
+}
+
+TEST(LowLevelE2E_Identifier, FromStringRejectsUtf8ValueLongerThan255Bytes) {
+ RecordProperty("description", "Rejects creating a UTF-8 string identifier
longer than 255 encoded bytes.");
+ iggy::ffi::Identifier identifier;
+ std::string too_long_value;
+ for (size_t i = 0; i < 128; ++i) {
+ too_long_value += "\xC2\xA2";
+ }
+
+ ASSERT_EQ(too_long_value.size(), 256u);
+
+ ASSERT_THROW(identifier.from_string(too_long_value), std::exception);
+}
+
+TEST(LowLevelE2E_Identifier, FromStringRejectsAsciiValueLongerThan255Bytes) {
+ RecordProperty("description", "Rejects creating an ASCII string identifier
longer than 255 bytes.");
+ iggy::ffi::Identifier identifier;
+ const std::string too_long_value(256, 'a');
+
+ ASSERT_THROW(identifier.from_string(too_long_value), std::exception);
+}
+
+TEST(LowLevelE2E_Identifier, FromNumericCreatesNumericIdentifier) {
+ RecordProperty("description", "Creates a numeric identifier encoded as
four little-endian bytes.");
+ iggy::ffi::Identifier identifier;
+ constexpr std::uint32_t value = 0x12345678;
+ constexpr std::array<std::uint8_t, 4> expected_bytes = {0x78, 0x56, 0x34,
0x12};
+
+ ASSERT_NO_THROW(identifier.from_numeric(value));
+
+ ASSERT_EQ(identifier.kind, "numeric");
+ ASSERT_EQ(identifier.length, 4u);
+ ASSERT_EQ(identifier.value.size(), expected_bytes.size());
+ for (size_t i = 0; i < expected_bytes.size(); ++i) {
+ EXPECT_EQ(identifier.value[i], expected_bytes[i]);
+ }
+}
+
+TEST(LowLevelE2E_Identifier, FromNumericCreatesUint32MaxIdentifier) {
+ RecordProperty("description", "Creates a numeric identifier for UINT32_MAX
using four 0xFF bytes.");
+ iggy::ffi::Identifier identifier;
+ constexpr std::array<std::uint8_t, 4> expected_bytes = {0xFF, 0xFF, 0xFF,
0xFF};
+
+
ASSERT_NO_THROW(identifier.from_numeric(std::numeric_limits<std::uint32_t>::max()));
+
+ ASSERT_EQ(identifier.kind, "numeric");
+ ASSERT_EQ(identifier.length, 4u);
+ ASSERT_EQ(identifier.value.size(), expected_bytes.size());
+ for (size_t i = 0; i < expected_bytes.size(); ++i) {
+ EXPECT_EQ(identifier.value[i], expected_bytes[i]);
+ }
+}
+
+TEST(LowLevelE2E_Identifier, FromNumericOverwritesExistingStringIdentifier) {
+ RecordProperty("description", "Replaces a previously created string
identifier with numeric identifier data.");
+ iggy::ffi::Identifier identifier;
+
+ ASSERT_NO_THROW(identifier.from_string("temporary-name"));
+ ASSERT_NO_THROW(identifier.from_numeric(7));
+
+ ASSERT_EQ(identifier.kind, "numeric");
+ ASSERT_EQ(identifier.length, 4u);
+ ASSERT_EQ(identifier.value.size(), 4u);
+ EXPECT_EQ(identifier.value[0], 7u);
+ EXPECT_EQ(identifier.value[1], 0u);
+ EXPECT_EQ(identifier.value[2], 0u);
+ EXPECT_EQ(identifier.value[3], 0u);
+}
+
+TEST(LowLevelE2E_Identifier, FromStringOverwritesExistingNumericIdentifier) {
+ RecordProperty("description", "Replaces a previously created numeric
identifier with string identifier data.");
+ const std::string value = "replacement-name";
+ iggy::ffi::Identifier identifier;
+
+ ASSERT_NO_THROW(identifier.from_numeric(42));
+ ASSERT_NO_THROW(identifier.from_string(value));
+
+ ASSERT_EQ(identifier.kind, "string");
+ ASSERT_EQ(identifier.length, value.size());
+ ASSERT_EQ(identifier.value.size(), value.size());
+ for (size_t i = 0; i < value.size(); ++i) {
+ EXPECT_EQ(identifier.value[i], static_cast<std::uint8_t>(value[i]));
+ }
+}
diff --git a/foreign/cpp/tests/partition/low_level_e2e.cpp
b/foreign/cpp/tests/partition/low_level_e2e.cpp
new file mode 100644
index 000000000..ef06f483d
--- /dev/null
+++ b/foreign/cpp/tests/partition/low_level_e2e.cpp
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// TODO(slbotbm): create fixture for setup/teardown.
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_Partition, CreatePartitionsSucceeds) {
+ RecordProperty("description", "Creates partitions for an existing topic
and verifies the resulting count.");
+ const std::string stream_name = "cpp-create-partitions-happy-path";
+ const std::string topic_name = "topic-create-partitions-happy-path";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+ ASSERT_NO_THROW(
+ client->create_partitions(make_string_identifier(stream_name),
make_string_identifier(topic_name), 43));
+
+ ASSERT_NO_THROW({
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ ASSERT_EQ(stream_details.topics.size(), 1u);
+ EXPECT_EQ(stream_details.topics[0].name, topic_name);
+ EXPECT_EQ(stream_details.topics[0].partitions_count, 44u);
+ });
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, CreatePartitionsBeforeLoginThrows) {
+ RecordProperty("description",
+ "Throws when create_partitions is called before connect,
and after connect but before login.");
+ const std::string stream_name = "cpp-create-partitions-before-login";
+ const std::string topic_name = "topic-create-partitions-before-login";
+
+ iggy::ffi::Client *client = nullptr;
+ ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+ ASSERT_NE(client, nullptr);
+
+
ASSERT_THROW(client->create_partitions(make_string_identifier(stream_name),
make_string_identifier(topic_name), 1),
+ std::exception);
+ ASSERT_NO_THROW(client->connect());
+
ASSERT_THROW(client->create_partitions(make_string_identifier(stream_name),
make_string_identifier(topic_name), 1),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, CreatePartitionsOnNonExistentResourcesThrows) {
+ RecordProperty("description", "Throws when create_partitions is called for
a stream or topic that does not exist.");
+ const std::string stream_name =
"cpp-create-partitions-missing-resource-stream";
+ const std::string topic_name =
"topic-create-partitions-missing-resource-topic";
+ const std::string missing_stream_name =
"cpp-create-partitions-missing-stream";
+ const std::string missing_topic_name =
"topic-create-partitions-missing-topic";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ ASSERT_THROW(
+ client->create_partitions(make_string_identifier(missing_stream_name),
make_string_identifier(topic_name), 1),
+ std::exception);
+ ASSERT_THROW(
+ client->create_partitions(make_string_identifier(stream_name),
make_string_identifier(missing_topic_name), 1),
+ std::exception);
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, CreatePartitionsWithInvalidIdentifiersThrows) {
+ RecordProperty("description", "Rejects create_partitions requests that use
invalid stream or topic identifiers.");
+ const std::string stream_name = "cpp-create-partitions-invalid-identifier";
+ const std::string topic_name =
"topic-create-partitions-invalid-identifier";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ iggy::ffi::Identifier invalid_stream_kind_id;
+ invalid_stream_kind_id.kind = "invalid";
+ invalid_stream_kind_id.length = 4;
+ invalid_stream_kind_id.value = {1, 0, 0, 0};
+ ASSERT_THROW(client->create_partitions(std::move(invalid_stream_kind_id),
make_string_identifier(topic_name), 1),
+ std::exception);
+
+ iggy::ffi::Identifier invalid_stream_numeric_id;
+ invalid_stream_numeric_id.kind = "numeric";
+ invalid_stream_numeric_id.length = 1;
+ invalid_stream_numeric_id.value.push_back(1);
+
ASSERT_THROW(client->create_partitions(std::move(invalid_stream_numeric_id),
make_string_identifier(topic_name), 1),
+ std::exception);
+
+ iggy::ffi::Identifier invalid_topic_kind_id;
+ invalid_topic_kind_id.kind = "invalid";
+ invalid_topic_kind_id.length = 4;
+ invalid_topic_kind_id.value = {1, 0, 0, 0};
+
ASSERT_THROW(client->create_partitions(make_string_identifier(stream_name),
std::move(invalid_topic_kind_id), 1),
+ std::exception);
+
+ iggy::ffi::Identifier invalid_topic_numeric_id;
+ invalid_topic_numeric_id.kind = "numeric";
+ invalid_topic_numeric_id.length = 1;
+ invalid_topic_numeric_id.value.push_back(1);
+
ASSERT_THROW(client->create_partitions(make_string_identifier(stream_name),
std::move(invalid_topic_numeric_id), 1),
+ std::exception);
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, CreatePartitionsWithBoundaryPartitionsCountValues)
{
+ RecordProperty("description",
+ "Accepts supported create_partitions counts and rejects
values outside the allowed range.");
+ const std::string stream_name = "cpp-create-partitions-boundary-values";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+
+ struct TestCase {
+ std::string topic_name;
+ std::uint32_t partitions_count;
+ bool should_succeed;
+ std::uint32_t expected_total_partitions;
+ };
+
+ const std::vector<TestCase> test_cases = {
+ {"topic-partitions-minus-1", static_cast<std::uint32_t>(-1), false, 1},
+ {"topic-partitions-0", 0, false, 1},
+ {"topic-partitions-1", 1, true, 2},
+ {"topic-partitions-43", 43, true, 44},
+ {"topic-partitions-1000", 1000, true, 1001},
+ {"topic-partitions-1001", 1001, false, 1},
+ };
+
+ for (const auto &test_case : test_cases) {
+ SCOPED_TRACE(test_case.topic_name);
+
ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
test_case.topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ if (test_case.should_succeed) {
+
ASSERT_NO_THROW(client->create_partitions(make_string_identifier(stream_name),
+
make_string_identifier(test_case.topic_name),
+
test_case.partitions_count));
+ } else {
+ ASSERT_THROW(
+ client->create_partitions(make_string_identifier(stream_name),
+
make_string_identifier(test_case.topic_name), test_case.partitions_count),
+ std::exception);
+ }
+ }
+
+ ASSERT_NO_THROW({
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ ASSERT_EQ(stream_details.topics.size(), test_cases.size());
+ for (const auto &test_case : test_cases) {
+ bool found = false;
+ for (const auto &topic : stream_details.topics) {
+ if (topic.name == test_case.topic_name) {
+ EXPECT_EQ(topic.partitions_count,
test_case.expected_total_partitions);
+ found = true;
+ break;
+ }
+ }
+ EXPECT_TRUE(found) << "Missing topic " << test_case.topic_name;
+ }
+ });
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, CreatePartitionsWithNumericIdentifiersSucceeds) {
+ RecordProperty("description",
+ "Creates partitions successfully when valid numeric stream
and topic identifiers are used.");
+ const std::string stream_name =
"cpp-create-partitions-numeric-identifiers";
+ const std::string topic_name =
"topic-create-partitions-numeric-identifiers";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 1, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ ASSERT_EQ(stream_details.topics.size(), 1u);
+
+
ASSERT_NO_THROW(client->create_partitions(make_numeric_identifier(stream_details.id),
+
make_numeric_identifier(stream_details.topics[0].id), 43));
+
+ ASSERT_NO_THROW({
+ const auto updated_stream_details =
client->get_stream(make_numeric_identifier(stream_details.id));
+ ASSERT_EQ(updated_stream_details.topics.size(), 1u);
+ EXPECT_EQ(updated_stream_details.topics[0].id,
stream_details.topics[0].id);
+ EXPECT_EQ(updated_stream_details.topics[0].name, topic_name);
+ EXPECT_EQ(updated_stream_details.topics[0].partitions_count, 44u);
+ });
+
+
ASSERT_NO_THROW(client->delete_stream(make_numeric_identifier(stream_details.id)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsSucceeds) {
+ RecordProperty("description", "Deletes partitions from an existing topic
and verifies the resulting count.");
+ const std::string stream_name = "cpp-delete-partitions-happy-path";
+ const std::string topic_name = "topic-delete-partitions-happy-path";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 44, "none", 0,
+ "server_default", 0,
"server_default"));
+ ASSERT_NO_THROW(
+ client->delete_partitions(make_string_identifier(stream_name),
make_string_identifier(topic_name), 43));
+
+ ASSERT_NO_THROW({
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ ASSERT_EQ(stream_details.topics.size(), 1u);
+ EXPECT_EQ(stream_details.topics[0].name, topic_name);
+ EXPECT_EQ(stream_details.topics[0].partitions_count, 1u);
+ });
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeleteMorePartitionsThanExistingThrows) {
+ RecordProperty("description",
+ "Rejects delete_partitions counts outside the allowed range
and counts greater than existing.");
+ const std::string stream_name = "cpp-delete-partitions-boundary-values";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+
+ struct TestCase {
+ std::string topic_name;
+ std::uint32_t partitions_count;
+ bool should_succeed;
+ std::uint32_t initial_partitions;
+ std::uint32_t expected_total_partitions;
+ };
+
+ const std::vector<TestCase> test_cases = {
+ {"topic-delete-partitions-minus-1", static_cast<std::uint32_t>(-1),
false, 3, 3},
+ {"topic-delete-partitions-0", 0, false, 3, 3},
+ {"topic-delete-partitions-1", 1, true, 3, 2},
+ {"topic-delete-partitions-4", 4, false, 3, 3},
+ };
+
+ for (const auto &test_case : test_cases) {
+ SCOPED_TRACE(test_case.topic_name);
+
ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
test_case.topic_name,
+ test_case.initial_partitions,
"none", 0, "server_default", 0,
+ "server_default"));
+
+ if (test_case.should_succeed) {
+
ASSERT_NO_THROW(client->delete_partitions(make_string_identifier(stream_name),
+
make_string_identifier(test_case.topic_name),
+
test_case.partitions_count));
+ } else {
+ ASSERT_THROW(
+ client->delete_partitions(make_string_identifier(stream_name),
+
make_string_identifier(test_case.topic_name), test_case.partitions_count),
+ std::exception);
+ }
+ }
+
+ ASSERT_NO_THROW({
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ ASSERT_EQ(stream_details.topics.size(), test_cases.size());
+ for (const auto &test_case : test_cases) {
+ bool found = false;
+ for (const auto &topic : stream_details.topics) {
+ if (topic.name == test_case.topic_name) {
+ EXPECT_EQ(topic.partitions_count,
test_case.expected_total_partitions);
+ found = true;
+ break;
+ }
+ }
+ EXPECT_TRUE(found) << "Missing topic " << test_case.topic_name;
+ }
+ });
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition,
DeletePartitionsBeforeCreatingAdditionalPartitionsSucceeds) {
+ RecordProperty("description",
+ "Deletes partitions from the initial topic allocation
without calling create_partitions first.");
+ const std::string stream_name =
"cpp-delete-partitions-before-create-partitions";
+ const std::string topic_name =
"topic-delete-partitions-before-create-partitions";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 3, "none", 0,
+ "server_default", 0,
"server_default"));
+ ASSERT_NO_THROW(
+ client->delete_partitions(make_string_identifier(stream_name),
make_string_identifier(topic_name), 1));
+
+ ASSERT_NO_THROW({
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ ASSERT_EQ(stream_details.topics.size(), 1u);
+ EXPECT_EQ(stream_details.topics[0].partitions_count, 2u);
+ });
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsFromTopicWithZeroPartitionsThrows)
{
+ RecordProperty("description",
+ "Throws when delete_partitions is called with count 1 for a
topic that currently has 0 partitions.");
+ const std::string stream_name =
"cpp-delete-partitions-zero-existing-partitions";
+ const std::string topic_name =
"topic-delete-partitions-zero-existing-partitions";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 0, "none", 0,
+ "server_default", 0,
"server_default"));
+
+
ASSERT_THROW(client->delete_partitions(make_string_identifier(stream_name),
make_string_identifier(topic_name), 1),
+ std::exception);
+
+ ASSERT_NO_THROW({
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ ASSERT_EQ(stream_details.topics.size(), 1u);
+ EXPECT_EQ(stream_details.topics[0].name, topic_name);
+ EXPECT_EQ(stream_details.topics[0].partitions_count, 0u);
+ });
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsBeforeLoginThrows) {
+ RecordProperty("description",
+ "Throws when delete_partitions is called before connect,
and after connect but before login.");
+ const std::string stream_name = "cpp-delete-partitions-before-login";
+ const std::string topic_name = "topic-delete-partitions-before-login";
+
+ iggy::ffi::Client *client = nullptr;
+ ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+ ASSERT_NE(client, nullptr);
+
+
ASSERT_THROW(client->delete_partitions(make_string_identifier(stream_name),
make_string_identifier(topic_name), 1),
+ std::exception);
+ ASSERT_NO_THROW(client->connect());
+
ASSERT_THROW(client->delete_partitions(make_string_identifier(stream_name),
make_string_identifier(topic_name), 1),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsOnNonExistentResourcesThrows) {
+ RecordProperty("description", "Throws when delete_partitions is called for
a stream or topic that does not exist.");
+ const std::string stream_name =
"cpp-delete-partitions-missing-resource-stream";
+ const std::string topic_name =
"topic-delete-partitions-missing-resource-topic";
+ const std::string missing_stream_name =
"cpp-delete-partitions-missing-stream";
+ const std::string missing_topic_name =
"topic-delete-partitions-missing-topic";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 3, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ ASSERT_THROW(
+ client->delete_partitions(make_string_identifier(missing_stream_name),
make_string_identifier(topic_name), 1),
+ std::exception);
+ ASSERT_THROW(
+ client->delete_partitions(make_string_identifier(stream_name),
make_string_identifier(missing_topic_name), 1),
+ std::exception);
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsWithInvalidIdentifiersThrows) {
+ RecordProperty("description", "Rejects delete_partitions requests that use
invalid stream or topic identifiers.");
+ const std::string stream_name = "cpp-delete-partitions-invalid-identifier";
+ const std::string topic_name =
"topic-delete-partitions-invalid-identifier";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 3, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ iggy::ffi::Identifier invalid_stream_kind_id;
+ invalid_stream_kind_id.kind = "invalid";
+ invalid_stream_kind_id.length = 4;
+ invalid_stream_kind_id.value = {1, 0, 0, 0};
+ ASSERT_THROW(client->delete_partitions(std::move(invalid_stream_kind_id),
make_string_identifier(topic_name), 1),
+ std::exception);
+
+ iggy::ffi::Identifier invalid_stream_numeric_id;
+ invalid_stream_numeric_id.kind = "numeric";
+ invalid_stream_numeric_id.length = 1;
+ invalid_stream_numeric_id.value.push_back(1);
+
ASSERT_THROW(client->delete_partitions(std::move(invalid_stream_numeric_id),
make_string_identifier(topic_name), 1),
+ std::exception);
+
+ iggy::ffi::Identifier invalid_topic_kind_id;
+ invalid_topic_kind_id.kind = "invalid";
+ invalid_topic_kind_id.length = 4;
+ invalid_topic_kind_id.value = {1, 0, 0, 0};
+
ASSERT_THROW(client->delete_partitions(make_string_identifier(stream_name),
std::move(invalid_topic_kind_id), 1),
+ std::exception);
+
+ iggy::ffi::Identifier invalid_topic_numeric_id;
+ invalid_topic_numeric_id.kind = "numeric";
+ invalid_topic_numeric_id.length = 1;
+ invalid_topic_numeric_id.value.push_back(1);
+
ASSERT_THROW(client->delete_partitions(make_string_identifier(stream_name),
std::move(invalid_topic_numeric_id), 1),
+ std::exception);
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsTwiceForSameTopicSucceeds) {
+ RecordProperty("description", "Allows delete_partitions to be called twice
for the same stream and topic.");
+ const std::string stream_name = "cpp-delete-partitions-twice-same-topic";
+ const std::string topic_name = "topic-delete-partitions-twice-same-topic";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 45, "none", 0,
+ "server_default", 0,
"server_default"));
+ ASSERT_NO_THROW(
+ client->delete_partitions(make_string_identifier(stream_name),
make_string_identifier(topic_name), 20));
+ ASSERT_NO_THROW(
+ client->delete_partitions(make_string_identifier(stream_name),
make_string_identifier(topic_name), 20));
+
+ ASSERT_NO_THROW({
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ ASSERT_EQ(stream_details.topics.size(), 1u);
+ EXPECT_EQ(stream_details.topics[0].name, topic_name);
+ EXPECT_EQ(stream_details.topics[0].partitions_count, 5u);
+ });
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsAfterStreamDeletionThrows) {
+ RecordProperty("description", "Throws when delete_partitions is called
after the stream has been deleted.");
+ const std::string stream_name =
"cpp-delete-partitions-after-stream-deletion";
+ const std::string topic_name =
"topic-delete-partitions-after-stream-deletion";
+
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name),
topic_name, 3, "none", 0,
+ "server_default", 0,
"server_default"));
+
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ ASSERT_EQ(stream_details.topics.size(), 1u);
+
+
ASSERT_NO_THROW(client->delete_stream(make_numeric_identifier(stream_details.id)));
+
+
ASSERT_THROW(client->delete_partitions(make_numeric_identifier(stream_details.id),
+
make_numeric_identifier(stream_details.topics[0].id), 1),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+// TODO(slbotbm): Add CreatePartitionsAfterTopicDeletionThrows test case after
delete_topic function is added.
diff --git a/foreign/cpp/tests/stream/low_level_e2e.cpp
b/foreign/cpp/tests/stream/low_level_e2e.cpp
index ac3e01b9e..43d0496de 100644
--- a/foreign/cpp/tests/stream/low_level_e2e.cpp
+++ b/foreign/cpp/tests/stream/low_level_e2e.cpp
@@ -25,6 +25,8 @@
#include "lib.rs.h"
#include "tests/common/test_helpers.hpp"
+// TODO(slbotbm): Add tests for purge_stream after implementing
send_messages(...).
+
TEST(LowLevelE2E_Stream, CreateStreamAfterLogin) {
RecordProperty("description", "Creates a stream successfully after
authenticating.");
const std::string stream_name = "cpp-create-stream-after-login";
@@ -83,6 +85,27 @@ TEST(LowLevelE2E_Stream,
CreateStreamValidatesNameConstraintsAndUniqueness) {
client = nullptr;
}
+TEST(LowLevelE2E_Stream, CreateStreamWithEmojiName) {
+ RecordProperty("description", "Creates a stream with a UTF-8 emoji name.");
+ const std::string stream_name = "ππππApache Iggyππππ";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_NO_THROW(client->create_stream(stream_name));
+ ASSERT_NO_THROW({
+ const auto stream_details =
client->get_stream(make_string_identifier(stream_name));
+ const std::string returned_stream_name =
static_cast<std::string>(stream_details.name);
+ EXPECT_NE(stream_details.id, 0u);
+ EXPECT_EQ(returned_stream_name, stream_name);
+ EXPECT_EQ(stream_details.topics_count, 0u);
+ EXPECT_EQ(stream_details.topics.size(), 0u);
+ });
+
+
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
TEST(LowLevelE2E_Stream, StreamCreatedAndDeletedSuccessfully) {
RecordProperty("description", "Creates a stream and deletes it
successfully by string identifier.");
const std::string stream_name = "cpp-delete-stream-created-and-deleted";
diff --git a/foreign/cpp/tests/topic/low_level_e2e.cpp
b/foreign/cpp/tests/topic/low_level_e2e.cpp
index 79675d2ee..98af86753 100644
--- a/foreign/cpp/tests/topic/low_level_e2e.cpp
+++ b/foreign/cpp/tests/topic/low_level_e2e.cpp
@@ -29,6 +29,8 @@
#include "lib.rs.h"
#include "tests/common/test_helpers.hpp"
+// TODO(slbotbm): Add tests for purge_topic after implementing
send_messages(...).
+
TEST(LowLevelE2E_Topic, CreateTopicWithAllOptionCombinations) {
RecordProperty("description",
"Creates topics across supported option combinations and
verifies they are all returned.");