This is an automated email from the ASF dual-hosted git repository.

hgruszecki pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git


The following commit(s) were added to refs/heads/master by this push:
     new 3904ef310 feat(cpp): Add functions related to consumer groups (#2988)
3904ef310 is described below

commit 3904ef3104575e26e8553a3456c9a4f6b2047625
Author: Rimuksh Kansal <[email protected]>
AuthorDate: Wed Apr 1 15:31:57 2026 +0900

    feat(cpp): Add functions related to consumer groups (#2988)
---
 foreign/cpp/BUILD.bazel                            |  25 +-
 foreign/cpp/Cargo.toml                             |   1 +
 foreign/cpp/build.rs                               |   1 +
 foreign/cpp/src/client.rs                          | 213 +++++++-
 foreign/cpp/src/consumer_group.rs                  |  46 ++
 foreign/cpp/src/identifier.rs                      |  16 +
 foreign/cpp/src/lib.rs                             |  58 ++-
 foreign/cpp/src/stream.rs                          |   2 +-
 foreign/cpp/src/topic.rs                           |   4 +-
 foreign/cpp/tests/client/low_level_e2e.cpp         |   1 +
 foreign/cpp/tests/common/test_helpers.hpp          |  16 +-
 foreign/cpp/tests/consumer_group/low_level_e2e.cpp | 572 +++++++++++++++++++++
 foreign/cpp/tests/identifier/unit_tests.cpp        | 152 ++++++
 foreign/cpp/tests/partition/low_level_e2e.cpp      | 530 +++++++++++++++++++
 foreign/cpp/tests/stream/low_level_e2e.cpp         |  23 +
 foreign/cpp/tests/topic/low_level_e2e.cpp          |   2 +
 16 files changed, 1632 insertions(+), 30 deletions(-)

diff --git a/foreign/cpp/BUILD.bazel b/foreign/cpp/BUILD.bazel
index 614ac6349..8dbeb65bc 100644
--- a/foreign/cpp/BUILD.bazel
+++ b/foreign/cpp/BUILD.bazel
@@ -100,6 +100,13 @@ cc_library(
         "cxxbridge",
         "include",
     ],
+    copts = select({
+        "@platforms//os:windows": ["/utf-8"],
+        "//conditions:default": [
+            "-finput-charset=UTF-8",
+            "-fexec-charset=UTF-8",
+        ],
+    }),
     linkopts = [
         "-ldl",
         "-lpthread",
@@ -117,9 +124,18 @@ cc_library(
 
 cc_test(
     name = "unit",
-    srcs = [
+    srcs = glob([
+        "tests/*/unit_tests.cpp",
+    ]) + [
         "tests/unit_tests.cpp",
     ],
+    copts = select({
+        "@platforms//os:windows": ["/utf-8"],
+        "//conditions:default": [
+            "-finput-charset=UTF-8",
+            "-fexec-charset=UTF-8",
+        ],
+    }),
     deps = [
         ":iggy-cpp",
         "@googletest//:gtest_main",
@@ -133,6 +149,13 @@ cc_test(
     ]) + [
         "tests/common/test_helpers.hpp",
     ],
+    copts = select({
+        "@platforms//os:windows": ["/utf-8"],
+        "//conditions:default": [
+            "-finput-charset=UTF-8",
+            "-fexec-charset=UTF-8",
+        ],
+    }),
     tags = [
         "e2e",
     ],
diff --git a/foreign/cpp/Cargo.toml b/foreign/cpp/Cargo.toml
index 8e1ed032c..477cb0b74 100644
--- a/foreign/cpp/Cargo.toml
+++ b/foreign/cpp/Cargo.toml
@@ -29,6 +29,7 @@ crate-type = ["staticlib"]
 [dependencies]
 cxx = "1.0.194"
 iggy = { path = "../../core/sdk" }
+iggy_common = { path = "../../core/common" }
 tokio = { version = "1.49.0", features = ["rt-multi-thread"] }
 
 [build-dependencies]
diff --git a/foreign/cpp/build.rs b/foreign/cpp/build.rs
index 1f80ed782..476a9d796 100644
--- a/foreign/cpp/build.rs
+++ b/foreign/cpp/build.rs
@@ -21,6 +21,7 @@ fn main() {
         .compile("iggy-cpp-bridge");
 
     println!("cargo:rerun-if-changed=src/client.rs");
+    println!("cargo:rerun-if-changed=src/consumer_group.rs");
     println!("cargo:rerun-if-changed=src/identifier.rs");
     println!("cargo:rerun-if-changed=src/lib.rs");
     println!("cargo:rerun-if-changed=src/stream.rs");
diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs
index 894c78060..341e0c1a0 100644
--- a/foreign/cpp/src/client.rs
+++ b/foreign/cpp/src/client.rs
@@ -18,9 +18,9 @@
 use crate::{RUNTIME, ffi};
 use iggy::prelude::{
     Client as IggyConnectionClient, CompressionAlgorithm as 
RustCompressionAlgorithm,
-    Identifier as RustIdentifier, IggyClient as RustIggyClient,
+    ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as 
RustIggyClient,
     IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as 
RustIggyExpiry,
-    MaxTopicSize as RustMaxTopicSize, StreamClient, TopicClient, UserClient,
+    MaxTopicSize as RustMaxTopicSize, PartitionClient, StreamClient, 
TopicClient, UserClient,
 };
 use std::str::FromStr;
 use std::sync::Arc;
@@ -53,22 +53,22 @@ pub fn new_connection(connection_string: String) -> 
Result<*mut Client, String>
 }
 
 impl Client {
-    pub fn connect(&self) -> Result<(), String> {
+    pub fn login_user(&self, username: String, password: String) -> Result<(), 
String> {
         RUNTIME.block_on(async {
             self.inner
-                .connect()
+                .login_user(&username, &password)
                 .await
-                .map_err(|error| format!("Could not connect: {error}"))?;
+                .map_err(|error| format!("Could not login user '{}': {error}", 
username))?;
             Ok(())
         })
     }
 
-    pub fn login_user(&self, username: String, password: String) -> Result<(), 
String> {
+    pub fn connect(&self) -> Result<(), String> {
         RUNTIME.block_on(async {
             self.inner
-                .login_user(&username, &password)
+                .connect()
                 .await
-                .map_err(|error| format!("Could not login user '{}': {error}", 
username))?;
+                .map_err(|error| format!("Could not connect: {error}"))?;
             Ok(())
         })
     }
@@ -114,6 +114,19 @@ impl Client {
         })
     }
 
+    // pub fn purge_stream(&self, stream_id: ffi::Identifier) -> Result<(), 
String> {
+    //     let rust_stream_id = RustIdentifier::try_from(stream_id)
+    //         .map_err(|error| format!("Could not purge stream: {error}"))?;
+
+    //     RUNTIME.block_on(async {
+    //         self.inner
+    //             .purge_stream(&rust_stream_id)
+    //             .await
+    //             .map_err(|error| format!("Could not purge stream '{}': 
{error}", rust_stream_id))?;
+    //         Ok(())
+    //     })
+    // }
+
     #[allow(clippy::too_many_arguments)]
     pub fn create_topic(
         &self,
@@ -144,7 +157,9 @@ impl Client {
         let rust_message_expiry = match message_expiry_kind.as_str() {
             "" | "server_default" | "default" => RustIggyExpiry::ServerDefault,
             "never_expire" => RustIggyExpiry::NeverExpire,
-            "duration" => 
RustIggyExpiry::ExpireDuration(message_expiry_value.into()),
+            "duration" => 
RustIggyExpiry::ExpireDuration(iggy::prelude::IggyDuration::from(
+                message_expiry_value,
+            )),
             _ => {
                 return Err(format!(
                     "Could not create topic '{}': invalid message expiry kind 
'{}'",
@@ -183,6 +198,186 @@ impl Client {
             Ok(())
         })
     }
+
+    // pub fn purge_topic(
+    //     &self,
+    //     stream_id: ffi::Identifier,
+    //     topic_id: ffi::Identifier,
+    // ) -> Result<(), String> {
+    //     let rust_stream_id = 
RustIdentifier::try_from(stream_id).map_err(|error| {
+    //         format!("Could not purge topic: invalid stream identifier: 
{error}")
+    //     })?;
+    //     let rust_topic_id = RustIdentifier::try_from(topic_id)
+    //         .map_err(|error| format!("Could not purge topic: invalid topic 
identifier: {error}"))?;
+
+    //     RUNTIME.block_on(async {
+    //         self.inner
+    //             .purge_topic(&rust_stream_id, &rust_topic_id)
+    //             .await
+    //             .map_err(|error| {
+    //                 format!(
+    //                     "Could not purge topic '{}' on stream '{}': 
{error}",
+    //                     rust_topic_id, rust_stream_id
+    //                 )
+    //             })?;
+    //         Ok(())
+    //     })
+    // }
+
+    pub fn create_partitions(
+        &self,
+        stream_id: ffi::Identifier,
+        topic_id: ffi::Identifier,
+        partitions_count: u32,
+    ) -> Result<(), String> {
+        let rust_stream_id = 
RustIdentifier::try_from(stream_id).map_err(|error| {
+            format!("Could not create partitions: invalid stream identifier: 
{error}")
+        })?;
+        let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| 
{
+            format!("Could not create partitions: invalid topic identifier: 
{error}")
+        })?;
+
+        RUNTIME.block_on(async {
+            self.inner
+                .create_partitions(&rust_stream_id, &rust_topic_id, 
partitions_count)
+                .await
+                .map_err(|error| {
+                    format!(
+                        "Could not create {partitions_count} partitions for 
topic '{}' on stream '{}': {error}",
+                        rust_topic_id, rust_stream_id
+                    )
+                })?;
+            Ok(())
+        })
+    }
+
+    pub fn delete_partitions(
+        &self,
+        stream_id: ffi::Identifier,
+        topic_id: ffi::Identifier,
+        partitions_count: u32,
+    ) -> Result<(), String> {
+        let rust_stream_id = 
RustIdentifier::try_from(stream_id).map_err(|error| {
+            format!("Could not delete partitions: invalid stream identifier: 
{error}")
+        })?;
+        let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| 
{
+            format!("Could not delete partitions: invalid topic identifier: 
{error}")
+        })?;
+
+        RUNTIME.block_on(async {
+            self.inner
+                .delete_partitions(&rust_stream_id, &rust_topic_id, 
partitions_count)
+                .await
+                .map_err(|error| {
+                    format!(
+                        "Could not delete {partitions_count} partitions for 
topic '{}' on stream '{}': {error}",
+                        rust_topic_id, rust_stream_id
+                    )
+                })?;
+            Ok(())
+        })
+    }
+
+    pub fn create_consumer_group(
+        &self,
+        stream_id: ffi::Identifier,
+        topic_id: ffi::Identifier,
+        name: String,
+    ) -> Result<ffi::ConsumerGroupDetails, String> {
+        let rust_stream_id = 
RustIdentifier::try_from(stream_id).map_err(|error| {
+            format!(
+                "Could not create consumer group '{}': invalid stream 
identifier: {error}",
+                name
+            )
+        })?;
+        let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| 
{
+            format!(
+                "Could not create consumer group '{}': invalid topic 
identifier: {error}",
+                name
+            )
+        })?;
+
+        RUNTIME.block_on(async {
+            let group = self
+                .inner
+                .create_consumer_group(&rust_stream_id, &rust_topic_id, &name)
+                .await
+                .map_err(|error| {
+                    format!(
+                        "Could not create consumer group '{}' for topic '{}' 
on stream '{}': {error}",
+                        name, rust_topic_id, rust_stream_id
+                    )
+                })?;
+            Ok(ffi::ConsumerGroupDetails::from(group))
+        })
+    }
+
+    pub fn get_consumer_group(
+        &self,
+        stream_id: ffi::Identifier,
+        topic_id: ffi::Identifier,
+        group_id: ffi::Identifier,
+    ) -> Result<ffi::ConsumerGroupDetails, String> {
+        let rust_stream_id = 
RustIdentifier::try_from(stream_id).map_err(|error| {
+            format!("Could not get consumer group: invalid stream identifier: 
{error}")
+        })?;
+        let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| 
{
+            format!("Could not get consumer group: invalid topic identifier: 
{error}")
+        })?;
+        let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error| 
{
+            format!("Could not get consumer group: invalid group identifier: 
{error}")
+        })?;
+
+        RUNTIME.block_on(async {
+            let group = self
+                .inner
+                .get_consumer_group(&rust_stream_id, &rust_topic_id, 
&rust_group_id)
+                .await
+                .map_err(|error| {
+                    format!(
+                        "Could not get consumer group '{}' for topic '{}' on 
stream '{}': {error}",
+                        rust_group_id, rust_topic_id, rust_stream_id
+                    )
+                })?;
+            let group = group.ok_or_else(|| {
+                format!(
+                    "Consumer group '{}' was not found for topic '{}' on 
stream '{}'",
+                    rust_group_id, rust_topic_id, rust_stream_id
+                )
+            })?;
+            Ok(ffi::ConsumerGroupDetails::from(group))
+        })
+    }
+
+    pub fn delete_consumer_group(
+        &self,
+        stream_id: ffi::Identifier,
+        topic_id: ffi::Identifier,
+        group_id: ffi::Identifier,
+    ) -> Result<(), String> {
+        let rust_stream_id = 
RustIdentifier::try_from(stream_id).map_err(|error| {
+            format!("Could not delete consumer group: invalid stream 
identifier: {error}")
+        })?;
+        let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error| 
{
+            format!("Could not delete consumer group: invalid topic 
identifier: {error}")
+        })?;
+        let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error| 
{
+            format!("Could not delete consumer group: invalid group 
identifier: {error}")
+        })?;
+
+        RUNTIME.block_on(async {
+            self.inner
+                .delete_consumer_group(&rust_stream_id, &rust_topic_id, 
&rust_group_id)
+                .await
+                .map_err(|error| {
+                    format!(
+                        "Could not delete consumer group '{}' for topic '{}' 
on stream '{}': {error}",
+                        rust_group_id, rust_topic_id, rust_stream_id
+                    )
+                })?;
+            Ok(())
+        })
+    }
 }
 
 pub unsafe fn delete_connection(client: *mut Client) -> Result<(), String> {
diff --git a/foreign/cpp/src/consumer_group.rs 
b/foreign/cpp/src/consumer_group.rs
new file mode 100644
index 000000000..28a2d92e2
--- /dev/null
+++ b/foreign/cpp/src/consumer_group.rs
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use iggy::prelude::ConsumerGroupDetails as RustConsumerGroupDetails;
+use iggy_common::ConsumerGroupMember as RustConsumerGroupMember;
+
+impl From<RustConsumerGroupMember> for ffi::ConsumerGroupMember {
+    fn from(member: RustConsumerGroupMember) -> Self {
+        ffi::ConsumerGroupMember {
+            id: member.id,
+            partitions_count: member.partitions_count,
+            partitions: member.partitions,
+        }
+    }
+}
+
+impl From<RustConsumerGroupDetails> for ffi::ConsumerGroupDetails {
+    fn from(group: RustConsumerGroupDetails) -> Self {
+        ffi::ConsumerGroupDetails {
+            id: group.id,
+            name: group.name,
+            partitions_count: group.partitions_count,
+            members_count: group.members_count,
+            members: group
+                .members
+                .into_iter()
+                .map(ffi::ConsumerGroupMember::from)
+                .collect(),
+        }
+    }
+}
diff --git a/foreign/cpp/src/identifier.rs b/foreign/cpp/src/identifier.rs
index bbd954168..02c9d8a48 100644
--- a/foreign/cpp/src/identifier.rs
+++ b/foreign/cpp/src/identifier.rs
@@ -61,3 +61,19 @@ impl TryFrom<ffi::Identifier> for RustIdentifier {
         Ok(rust_identifier)
     }
 }
+
+impl ffi::Identifier {
+    pub fn from_string(&mut self, id: String) -> Result<(), String> {
+        *self = RustIdentifier::named(&id)
+            .map(ffi::Identifier::from)
+            .map_err(|error| format!("Could not create string identifier: 
{error}"))?;
+        Ok(())
+    }
+
+    pub fn from_numeric(&mut self, id: u32) -> Result<(), String> {
+        *self = RustIdentifier::numeric(id)
+            .map(ffi::Identifier::from)
+            .map_err(|error| format!("Could not create numeric identifier: 
{error}"))?;
+        Ok(())
+    }
+}
diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs
index 021a7d944..4d47a5b94 100644
--- a/foreign/cpp/src/lib.rs
+++ b/foreign/cpp/src/lib.rs
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 mod client;
+mod consumer_group;
 mod identifier;
 mod stream;
 mod topic;
@@ -60,18 +61,34 @@ mod ffi {
         topics: Vec<Topic>,
     }
 
+    struct ConsumerGroupMember {
+        id: u32,
+        partitions_count: u32,
+        partitions: Vec<u32>,
+    }
+
+    struct ConsumerGroupDetails {
+        id: u32,
+        name: String,
+        partitions_count: u32,
+        members_count: u32,
+        members: Vec<ConsumerGroupMember>,
+    }
+
     extern "Rust" {
         type Client;
 
+        // Client functions
         fn new_connection(connection_string: String) -> Result<*mut Client>;
         fn login_user(self: &Client, username: String, password: String) -> 
Result<()>;
         fn connect(self: &Client) -> Result<()>;
-        fn create_stream(&self, stream_name: String) -> Result<()>;
+        fn create_stream(self: &Client, stream_name: String) -> Result<()>;
         fn get_stream(self: &Client, stream_id: Identifier) -> 
Result<StreamDetails>;
-        fn delete_stream(&self, stream_id: Identifier) -> Result<()>;
+        fn delete_stream(self: &Client, stream_id: Identifier) -> Result<()>;
+        // fn purge_stream(&self, stream_id: Identifier) -> Result<()>;
         #[allow(clippy::too_many_arguments)]
         fn create_topic(
-            &self,
+            self: &Client,
             stream_id: Identifier,
             topic_name: String,
             partitions_count: u32,
@@ -81,7 +98,42 @@ mod ffi {
             message_expiry_value: u64,
             max_topic_size: String,
         ) -> Result<()>;
+        // fn purge_topic(&self, stream_id: Identifier, topic_id: Identifier) 
-> Result<()>;
+        fn create_partitions(
+            self: &Client,
+            stream_id: Identifier,
+            topic_id: Identifier,
+            partitions_count: u32,
+        ) -> Result<()>;
+        fn delete_partitions(
+            self: &Client,
+            stream_id: Identifier,
+            topic_id: Identifier,
+            partitions_count: u32,
+        ) -> Result<()>;
+        fn create_consumer_group(
+            self: &Client,
+            stream_id: Identifier,
+            topic_id: Identifier,
+            name: String,
+        ) -> Result<ConsumerGroupDetails>;
+        fn get_consumer_group(
+            self: &Client,
+            stream_id: Identifier,
+            topic_id: Identifier,
+            group_id: Identifier,
+        ) -> Result<ConsumerGroupDetails>;
+        fn delete_consumer_group(
+            self: &Client,
+            stream_id: Identifier,
+            topic_id: Identifier,
+            group_id: Identifier,
+        ) -> Result<()>;
 
         unsafe fn delete_connection(client: *mut Client) -> Result<()>;
+
+        // Identifier functions
+        fn from_string(self: &mut Identifier, id: String) -> Result<()>;
+        fn from_numeric(self: &mut Identifier, id: u32) -> Result<()>;
     }
 }
diff --git a/foreign/cpp/src/stream.rs b/foreign/cpp/src/stream.rs
index 7cb26c249..5c1f6e9d7 100644
--- a/foreign/cpp/src/stream.rs
+++ b/foreign/cpp/src/stream.rs
@@ -27,7 +27,7 @@ impl From<RustStreamDetails> for ffi::StreamDetails {
             size_bytes: stream.size.as_bytes_u64(),
             messages_count: stream.messages_count,
             topics_count: stream.topics_count,
-            topics: stream.topics.into_iter().map(Into::into).collect(),
+            topics: stream.topics.into_iter().map(ffi::Topic::from).collect(),
         }
     }
 }
diff --git a/foreign/cpp/src/topic.rs b/foreign/cpp/src/topic.rs
index 6bc1d3f86..ac1bae30b 100644
--- a/foreign/cpp/src/topic.rs
+++ b/foreign/cpp/src/topic.rs
@@ -25,9 +25,9 @@ impl From<RustTopic> for ffi::Topic {
             created_at: topic.created_at.as_micros(),
             name: topic.name,
             size_bytes: topic.size.as_bytes_u64(),
-            message_expiry: topic.message_expiry.into(),
+            message_expiry: u64::from(topic.message_expiry),
             compression_algorithm: topic.compression_algorithm.to_string(),
-            max_topic_size: topic.max_topic_size.into(),
+            max_topic_size: u64::from(topic.max_topic_size),
             replication_factor: topic.replication_factor,
             messages_count: topic.messages_count,
             partitions_count: topic.partitions_count,
diff --git a/foreign/cpp/tests/client/low_level_e2e.cpp 
b/foreign/cpp/tests/client/low_level_e2e.cpp
index 6eaeacecd..26b82b82f 100644
--- a/foreign/cpp/tests/client/low_level_e2e.cpp
+++ b/foreign/cpp/tests/client/low_level_e2e.cpp
@@ -22,6 +22,7 @@
 #include <gtest/gtest.h>
 
 #include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
 
 TEST(LowLevelE2E_Client, ConnectAndLogin) {
     RecordProperty("description", "Connects and logs in successfully using 
each supported connection string format.");
diff --git a/foreign/cpp/tests/common/test_helpers.hpp 
b/foreign/cpp/tests/common/test_helpers.hpp
index 5d76aa8da..5457c09d7 100644
--- a/foreign/cpp/tests/common/test_helpers.hpp
+++ b/foreign/cpp/tests/common/test_helpers.hpp
@@ -19,7 +19,6 @@
 
 #pragma once
 
-#include <cassert>
 #include <cstdint>
 #include <string>
 
@@ -27,24 +26,13 @@
 
 inline iggy::ffi::Identifier make_string_identifier(const std::string &value) {
     iggy::ffi::Identifier identifier;
-    identifier.kind   = "string";
-    identifier.length = static_cast<std::uint8_t>(value.size());
-    for (const char c : value) {
-        identifier.value.push_back(static_cast<std::uint8_t>(c));
-    }
-    assert(identifier.length == identifier.value.size());
+    identifier.from_string(value);
     return identifier;
 }
 
 inline iggy::ffi::Identifier make_numeric_identifier(const std::uint32_t 
value) {
     iggy::ffi::Identifier identifier;
-    identifier.kind   = "numeric";
-    identifier.length = 4;
-    identifier.value.push_back(static_cast<std::uint8_t>(value & 0xFF));
-    identifier.value.push_back(static_cast<std::uint8_t>((value >> 8) & 0xFF));
-    identifier.value.push_back(static_cast<std::uint8_t>((value >> 16) & 
0xFF));
-    identifier.value.push_back(static_cast<std::uint8_t>((value >> 24) & 
0xFF));
-    assert(identifier.length == identifier.value.size());
+    identifier.from_numeric(value);
     return identifier;
 }
 
diff --git a/foreign/cpp/tests/consumer_group/low_level_e2e.cpp 
b/foreign/cpp/tests/consumer_group/low_level_e2e.cpp
new file mode 100644
index 000000000..1391eaf70
--- /dev/null
+++ b/foreign/cpp/tests/consumer_group/low_level_e2e.cpp
@@ -0,0 +1,572 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// TODO(slbotbm): create fixture for setup/teardown.
+
+#include <cstdint>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_ConsumerGroup, CreateConsumerGroupSucceeds) {
+    RecordProperty("description", "Creates a consumer group successfully for 
an existing stream and topic.");
+    const std::string stream_name = "client-create-group-happy-stream";
+    const std::string topic_name  = "client-create-group-happy-topic";
+    const std::string group_name  = "client-create-group-happy";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    ASSERT_NO_THROW({
+        const auto group = 
client->create_consumer_group(make_string_identifier(stream_name),
+                                                         
make_string_identifier(topic_name), group_name);
+        ASSERT_EQ(group.name, group_name);
+        ASSERT_EQ(group.members_count, 0);
+        ASSERT_TRUE(group.members.empty());
+    });
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, 
CreateConsumerGroupWithInvalidIdentifiersThrows) {
+    RecordProperty("description", "Rejects malformed stream and topic 
identifiers before creating a consumer group.");
+    const std::string stream_name = "client-create-group-invalid-id-stream";
+    const std::string topic_name  = "client-create-group-invalid-id-topic";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    iggy::ffi::Identifier invalid_stream_id;
+    invalid_stream_id.kind   = "invalid";
+    invalid_stream_id.length = 4;
+    invalid_stream_id.value  = {1, 2, 3, 4};
+
+    iggy::ffi::Identifier invalid_topic_id;
+    invalid_topic_id.kind   = "numeric";
+    invalid_topic_id.length = 3;
+    invalid_topic_id.value  = {1, 2, 3};
+
+    ASSERT_THROW(client->create_consumer_group(std::move(invalid_stream_id), 
make_string_identifier(topic_name),
+                                               
"client-create-group-invalid-stream-id"),
+                 std::exception);
+    
ASSERT_THROW(client->create_consumer_group(make_string_identifier(stream_name), 
std::move(invalid_topic_id),
+                                               
"client-create-group-invalid-topic-id"),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, 
CreateConsumerGroupOnNonExistentResourcesThrows) {
+    RecordProperty("description", "Rejects creating a consumer group on 
streams or topics that do not exist.");
+    const std::string stream_name         = 
"client-create-group-missing-resource-stream";
+    const std::string topic_name          = 
"client-create-group-missing-resource-topic";
+    const std::string missing_stream_name = 
"client-create-group-missing-stream";
+    const std::string missing_topic_name  = 
"client-create-group-missing-topic";
+    iggy::ffi::Client *client             = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    ASSERT_THROW(
+        
client->create_consumer_group(make_string_identifier(missing_stream_name), 
make_string_identifier(topic_name),
+                                      
"client-create-group-missing-stream-group"),
+        std::exception);
+    ASSERT_THROW(
+        client->create_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(missing_topic_name),
+                                      
"client-create-group-missing-topic-group"),
+        std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, CreateConsumerGroupTwiceOnSameInputThrows) {
+    RecordProperty("description", "Rejects creating the same consumer group 
twice for the same stream and topic.");
+    const std::string stream_name = "client-create-group-duplicate-stream";
+    const std::string topic_name  = "client-create-group-duplicate-topic";
+    const std::string group_name  = "client-create-group-duplicate";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                  
make_string_identifier(topic_name), group_name));
+
+    
ASSERT_THROW(client->create_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(topic_name),
+                                               group_name),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, 
CreateConsumerGroupWithNumericIdentifiersSucceeds) {
+    RecordProperty("description",
+                   "Creates a consumer group successfully when stream and 
topic are addressed by numeric identifiers.");
+    const std::string stream_name = "client-create-group-numeric-stream";
+    const std::string topic_name  = "client-create-group-numeric-topic";
+    const std::string group_name  = "client-create-group-numeric";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+    ASSERT_EQ(stream_details.topics.size(), 1);
+
+    ASSERT_NO_THROW({
+        const auto group =
+            
client->create_consumer_group(make_numeric_identifier(stream_details.id),
+                                          
make_numeric_identifier(stream_details.topics[0].id), group_name);
+        ASSERT_EQ(group.name, group_name);
+        ASSERT_EQ(group.members_count, 0);
+        ASSERT_TRUE(group.members.empty());
+    });
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, CreateConsumerGroupWithInvalidNamesThrows) {
+    RecordProperty("description", "Rejects empty and overlong consumer group 
names.");
+    const std::string stream_name = "client-create-group-invalid-name-stream";
+    const std::string topic_name  = "client-create-group-invalid-name-topic";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    const std::string invalid_names[] = {"", std::string(256, 'a')};
+    for (const std::string &invalid_name : invalid_names) {
+        SCOPED_TRACE(invalid_name.size());
+        
ASSERT_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                   
make_string_identifier(topic_name), invalid_name),
+                     std::exception);
+    }
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, CreateConsumerGroupAfterStreamDeletionThrows) {
+    RecordProperty("description", "Rejects creating a consumer group after 
deleting the stream that owned the topic.");
+    const std::string stream_name = 
"client-create-group-after-stream-delete-stream";
+    const std::string topic_name  = 
"client-create-group-after-stream-delete-topic";
+    const std::string group_name  = "client-create-group-after-stream-delete";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+
+    
ASSERT_THROW(client->create_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(topic_name),
+                                               group_name),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, CreateConsumerGroupBeforeLoginThrows) {
+    RecordProperty("description",
+                   "Rejects creating a consumer group before connect, and 
after connect but before login.");
+    const std::string stream_name = "client-create-group-before-login-stream";
+    const std::string topic_name  = "client-create-group-before-login-topic";
+    const std::string group_name  = "client-create-group-before-login";
+
+    iggy::ffi::Client *setup_client = login_to_server();
+    ASSERT_NE(setup_client, nullptr);
+    ASSERT_NO_THROW(setup_client->create_stream(stream_name));
+    
ASSERT_NO_THROW(setup_client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                               "server_default", 0, 
"server_default"));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(setup_client));
+    setup_client = nullptr;
+
+    iggy::ffi::Client *unauthenticated_client = nullptr;
+    ASSERT_NO_THROW({ unauthenticated_client = iggy::ffi::new_connection(""); 
});
+    ASSERT_NE(unauthenticated_client, nullptr);
+
+    
ASSERT_THROW(unauthenticated_client->create_consumer_group(make_string_identifier(stream_name),
+                                                               
make_string_identifier(topic_name), group_name),
+                 std::exception);
+    ASSERT_NO_THROW(unauthenticated_client->connect());
+    
ASSERT_THROW(unauthenticated_client->create_consumer_group(make_string_identifier(stream_name),
+                                                               
make_string_identifier(topic_name), group_name),
+                 std::exception);
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(unauthenticated_client));
+    unauthenticated_client = nullptr;
+}
+
+TEST(LowLevelE2E_ConsumerGroup, 
GetConsumerGroupReturnsSameInfoAsCreateConsumerGroup) {
+    RecordProperty("description",
+                   "Returns the same consumer group details from 
get_consumer_group as create_consumer_group.");
+    const std::string stream_name = "client-get-group-happy-stream";
+    const std::string topic_name  = "client-get-group-happy-topic";
+    const std::string group_name  = "client-get-group-happy";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    const auto created_group = 
client->create_consumer_group(make_string_identifier(stream_name),
+                                                             
make_string_identifier(topic_name), group_name);
+    const auto fetched_group = client->get_consumer_group(
+        make_string_identifier(stream_name), 
make_string_identifier(topic_name), make_string_identifier(group_name));
+
+    ASSERT_EQ(fetched_group.id, created_group.id);
+    ASSERT_EQ(fetched_group.name, created_group.name);
+    ASSERT_EQ(fetched_group.partitions_count, created_group.partitions_count);
+    ASSERT_EQ(fetched_group.members_count, created_group.members_count);
+    ASSERT_EQ(fetched_group.members.size(), created_group.members.size());
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, GetConsumerGroupBeforeLoginThrows) {
+    RecordProperty("description", "Rejects get_consumer_group before connect, 
and after connect but before login.");
+    const std::string stream_name = "client-get-group-before-login-stream";
+    const std::string topic_name  = "client-get-group-before-login-topic";
+    const std::string group_name  = "client-get-group-before-login";
+
+    iggy::ffi::Client *setup_client = login_to_server();
+    ASSERT_NE(setup_client, nullptr);
+    ASSERT_NO_THROW(setup_client->create_stream(stream_name));
+    
ASSERT_NO_THROW(setup_client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                               "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(setup_client->create_consumer_group(make_string_identifier(stream_name),
+                                                        
make_string_identifier(topic_name), group_name));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(setup_client));
+    setup_client = nullptr;
+
+    iggy::ffi::Client *unauthenticated_client = nullptr;
+    ASSERT_NO_THROW({ unauthenticated_client = iggy::ffi::new_connection(""); 
});
+    ASSERT_NE(unauthenticated_client, nullptr);
+
+    
ASSERT_THROW(unauthenticated_client->get_consumer_group(make_string_identifier(stream_name),
+                                                            
make_string_identifier(topic_name),
+                                                            
make_string_identifier(group_name)),
+                 std::exception);
+    ASSERT_NO_THROW(unauthenticated_client->connect());
+    
ASSERT_THROW(unauthenticated_client->get_consumer_group(make_string_identifier(stream_name),
+                                                            
make_string_identifier(topic_name),
+                                                            
make_string_identifier(group_name)),
+                 std::exception);
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(unauthenticated_client));
+    unauthenticated_client = nullptr;
+}
+
+TEST(LowLevelE2E_ConsumerGroup, GetConsumerGroupWithInvalidIdentifiersThrows) {
+    RecordProperty("description", "Rejects get_consumer_group when the stream 
or topic identifier is invalid.");
+    const std::string stream_name = "client-get-group-invalid-id-stream";
+    const std::string topic_name  = "client-get-group-invalid-id-topic";
+    const std::string group_name  = "client-get-group-invalid-id-group";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                  
make_string_identifier(topic_name), group_name));
+
+    ASSERT_THROW(client->get_consumer_group(make_string_identifier(""), 
make_string_identifier(topic_name),
+                                            
make_string_identifier(group_name)),
+                 std::exception);
+    
ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(""),
+                                            
make_string_identifier(group_name)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, GetConsumerGroupOnNonExistentResourcesThrows) {
+    RecordProperty("description", "Rejects get_consumer_group for streams, 
topics, or groups that do not exist.");
+    const std::string stream_name         = 
"client-get-group-missing-resource-stream";
+    const std::string topic_name          = 
"client-get-group-missing-resource-topic";
+    const std::string created_group_name  = "client-get-group-existing-group";
+    const std::string missing_stream_name = "client-get-group-missing-stream";
+    const std::string missing_topic_name  = "client-get-group-missing-topic";
+    const std::string missing_group_name  = "client-get-group-missing-group";
+    iggy::ffi::Client *client             = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                  
make_string_identifier(topic_name), created_group_name));
+
+    ASSERT_THROW(
+        
client->get_consumer_group(make_string_identifier(missing_stream_name), 
make_string_identifier(topic_name),
+                                   make_string_identifier(created_group_name)),
+        std::exception);
+    ASSERT_THROW(
+        client->get_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(missing_topic_name),
+                                   make_string_identifier(created_group_name)),
+        std::exception);
+    
ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(topic_name),
+                                            
make_string_identifier(missing_group_name)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, GetConsumerGroupAfterStreamDeletionThrows) {
+    RecordProperty("description", "Rejects get_consumer_group after deleting 
the stream that owned the group.");
+    const std::string stream_name = 
"client-get-group-after-stream-delete-stream";
+    const std::string topic_name  = 
"client-get-group-after-stream-delete-topic";
+    const std::string group_name  = "client-get-group-after-stream-delete";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                  
make_string_identifier(topic_name), group_name));
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+
+    
ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(topic_name),
+                                            
make_string_identifier(group_name)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupSucceeds) {
+    RecordProperty("description", "Deletes an existing consumer group 
successfully.");
+    const std::string stream_name = "client-delete-group-happy-stream";
+    const std::string topic_name  = "client-delete-group-happy-topic";
+    const std::string group_name  = "client-delete-group-happy";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                  
make_string_identifier(topic_name), group_name));
+
+    ASSERT_NO_THROW(client->delete_consumer_group(
+        make_string_identifier(stream_name), 
make_string_identifier(topic_name), make_string_identifier(group_name)));
+
+    
ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(topic_name),
+                                            
make_string_identifier(group_name)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupBeforeLoginThrows) {
+    RecordProperty("description", "Rejects delete_consumer_group before 
connect, and after connect but before login.");
+    const std::string stream_name = "client-delete-group-before-login-stream";
+    const std::string topic_name  = "client-delete-group-before-login-topic";
+    const std::string group_name  = "client-delete-group-before-login";
+
+    iggy::ffi::Client *setup_client = login_to_server();
+    ASSERT_NE(setup_client, nullptr);
+    ASSERT_NO_THROW(setup_client->create_stream(stream_name));
+    
ASSERT_NO_THROW(setup_client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                               "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(setup_client->create_consumer_group(make_string_identifier(stream_name),
+                                                        
make_string_identifier(topic_name), group_name));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(setup_client));
+    setup_client = nullptr;
+
+    iggy::ffi::Client *unauthenticated_client = nullptr;
+    ASSERT_NO_THROW({ unauthenticated_client = iggy::ffi::new_connection(""); 
});
+    ASSERT_NE(unauthenticated_client, nullptr);
+
+    
ASSERT_THROW(unauthenticated_client->delete_consumer_group(make_string_identifier(stream_name),
+                                                               
make_string_identifier(topic_name),
+                                                               
make_string_identifier(group_name)),
+                 std::exception);
+    ASSERT_NO_THROW(unauthenticated_client->connect());
+    
ASSERT_THROW(unauthenticated_client->delete_consumer_group(make_string_identifier(stream_name),
+                                                               
make_string_identifier(topic_name),
+                                                               
make_string_identifier(group_name)),
+                 std::exception);
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(unauthenticated_client));
+    unauthenticated_client = nullptr;
+}
+
+TEST(LowLevelE2E_ConsumerGroup, 
DeleteConsumerGroupWithInvalidIdentifiersThrows) {
+    RecordProperty("description",
+                   "Rejects delete_consumer_group when the stream, topic, or 
group identifier is invalid.");
+    const std::string stream_name = "client-delete-group-invalid-id-stream";
+    const std::string topic_name  = "client-delete-group-invalid-id-topic";
+    const std::string group_name  = "client-delete-group-invalid-id-group";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                  
make_string_identifier(topic_name), group_name));
+
+    iggy::ffi::Identifier invalid_group_id;
+    invalid_group_id.kind   = "numeric";
+    invalid_group_id.length = 3;
+    invalid_group_id.value  = {1, 2, 3};
+
+    ASSERT_THROW(client->delete_consumer_group(make_string_identifier(""), 
make_string_identifier(topic_name),
+                                               
make_string_identifier(group_name)),
+                 std::exception);
+    
ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(""),
+                                               
make_string_identifier(group_name)),
+                 std::exception);
+    
ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(topic_name),
+                                               std::move(invalid_group_id)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, 
DeleteConsumerGroupOnNonExistentResourcesThrows) {
+    RecordProperty("description", "Rejects delete_consumer_group for streams, 
topics, or groups that do not exist.");
+    const std::string stream_name         = 
"client-delete-group-missing-resource-stream";
+    const std::string topic_name          = 
"client-delete-group-missing-resource-topic";
+    const std::string created_group_name  = 
"client-delete-group-existing-group";
+    const std::string missing_stream_name = 
"client-delete-group-missing-stream";
+    const std::string missing_topic_name  = 
"client-delete-group-missing-topic";
+    const std::string missing_group_name  = 
"client-delete-group-missing-group";
+    iggy::ffi::Client *client             = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                  
make_string_identifier(topic_name), created_group_name));
+
+    ASSERT_THROW(
+        
client->delete_consumer_group(make_string_identifier(missing_stream_name), 
make_string_identifier(topic_name),
+                                      
make_string_identifier(created_group_name)),
+        std::exception);
+    ASSERT_THROW(
+        client->delete_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(missing_topic_name),
+                                      
make_string_identifier(created_group_name)),
+        std::exception);
+    
ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(topic_name),
+                                               
make_string_identifier(missing_group_name)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupTwiceThrows) {
+    RecordProperty("description", "Rejects deleting the same consumer group 
twice.");
+    const std::string stream_name = "client-delete-group-twice-stream";
+    const std::string topic_name  = "client-delete-group-twice-topic";
+    const std::string group_name  = "client-delete-group-twice";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                  
make_string_identifier(topic_name), group_name));
+    ASSERT_NO_THROW(client->delete_consumer_group(
+        make_string_identifier(stream_name), 
make_string_identifier(topic_name), make_string_identifier(group_name)));
+
+    
ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(topic_name),
+                                               
make_string_identifier(group_name)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, 
DeleteConsumerGroupWithNumericIdentifiersSucceeds) {
+    RecordProperty(
+        "description",
+        "Deletes a consumer group successfully when stream, topic, and group 
are addressed by numeric identifiers.");
+    const std::string stream_name = "client-delete-group-numeric-stream";
+    const std::string topic_name  = "client-delete-group-numeric-topic";
+    const std::string group_name  = "client-delete-group-numeric";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    const auto created_group  = 
client->create_consumer_group(make_string_identifier(stream_name),
+                                                              
make_string_identifier(topic_name), group_name);
+    const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+
+    
ASSERT_NO_THROW(client->delete_consumer_group(make_numeric_identifier(stream_details.id),
+                                                  
make_numeric_identifier(stream_details.topics[0].id),
+                                                  
make_numeric_identifier(created_group.id)));
+
+    
ASSERT_THROW(client->get_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(topic_name),
+                                            
make_string_identifier(group_name)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, DeleteConsumerGroupAfterStreamDeletionThrows) {
+    RecordProperty("description",
+                   "Rejects delete_consumer_group after deleting the stream 
that owned the consumer group.");
+    const std::string stream_name = 
"client-delete-group-after-stream-delete-stream";
+    const std::string topic_name  = 
"client-delete-group-after-stream-delete-topic";
+    const std::string group_name  = "client-delete-group-after-stream-delete";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                  
make_string_identifier(topic_name), group_name));
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+
+    
ASSERT_THROW(client->delete_consumer_group(make_string_identifier(stream_name), 
make_string_identifier(topic_name),
+                                               
make_string_identifier(group_name)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_ConsumerGroup, 
DeleteConsumerGroupAndRecreateWithSameNameSucceeds) {
+    RecordProperty("description",
+                   "Allows recreating a consumer group with the same name 
after the previous group is deleted.");
+    const std::string stream_name = "client-delete-group-recreate-stream";
+    const std::string topic_name  = "client-delete-group-recreate-topic";
+    const std::string group_name  = "client-delete-group-recreate";
+    iggy::ffi::Client *client     = login_to_server();
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    
ASSERT_NO_THROW(client->create_consumer_group(make_string_identifier(stream_name),
+                                                  
make_string_identifier(topic_name), group_name));
+    ASSERT_NO_THROW(client->delete_consumer_group(
+        make_string_identifier(stream_name), 
make_string_identifier(topic_name), make_string_identifier(group_name)));
+
+    ASSERT_NO_THROW({
+        const auto recreated_group = 
client->create_consumer_group(make_string_identifier(stream_name),
+                                                                   
make_string_identifier(topic_name), group_name);
+        ASSERT_EQ(recreated_group.id, 0u);
+        ASSERT_EQ(recreated_group.name, group_name);
+        ASSERT_EQ(recreated_group.members_count, 0);
+        ASSERT_TRUE(recreated_group.members.empty());
+    });
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
diff --git a/foreign/cpp/tests/identifier/unit_tests.cpp 
b/foreign/cpp/tests/identifier/unit_tests.cpp
new file mode 100644
index 000000000..bed56c273
--- /dev/null
+++ b/foreign/cpp/tests/identifier/unit_tests.cpp
@@ -0,0 +1,152 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <array>
+#include <cstdint>
+#include <limits>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+
+TEST(LowLevelE2E_Identifier, FromStringCreatesStringIdentifier) {
+    RecordProperty("description", "Creates a string identifier and preserves 
its UTF-8 byte payload.");
+    const std::string value = "stream-identifier";
+    iggy::ffi::Identifier identifier;
+
+    ASSERT_NO_THROW(identifier.from_string(value));
+
+    ASSERT_EQ(identifier.kind, "string");
+    ASSERT_EQ(identifier.length, value.size());
+    ASSERT_EQ(identifier.value.size(), value.size());
+    for (size_t i = 0; i < value.size(); ++i) {
+        EXPECT_EQ(identifier.value[i], static_cast<std::uint8_t>(value[i]));
+    }
+}
+
+TEST(LowLevelE2E_Identifier, FromStringAcceptsExact255ByteUtf8Value) {
+    RecordProperty("description", "Accepts a UTF-8 string identifier whose 
encoded byte length is exactly 255.");
+    std::string value;
+    for (size_t i = 0; i < 127; ++i) {
+        value += "\xC2\xA2";
+    }
+    value += "a";
+
+    ASSERT_EQ(value.size(), 255u);
+
+    iggy::ffi::Identifier identifier;
+    ASSERT_NO_THROW(identifier.from_string(value));
+
+    ASSERT_EQ(identifier.kind, "string");
+    ASSERT_EQ(identifier.length, value.size());
+    ASSERT_EQ(identifier.value.size(), value.size());
+    for (size_t i = 0; i < value.size(); ++i) {
+        EXPECT_EQ(identifier.value[i], static_cast<std::uint8_t>(value[i]));
+    }
+}
+
+TEST(LowLevelE2E_Identifier, FromStringRejectsEmptyValue) {
+    RecordProperty("description", "Rejects creating a string identifier from 
an empty string.");
+    iggy::ffi::Identifier identifier;
+
+    ASSERT_THROW(identifier.from_string(""), std::exception);
+}
+
+TEST(LowLevelE2E_Identifier, FromStringRejectsUtf8ValueLongerThan255Bytes) {
+    RecordProperty("description", "Rejects creating a UTF-8 string identifier 
longer than 255 encoded bytes.");
+    iggy::ffi::Identifier identifier;
+    std::string too_long_value;
+    for (size_t i = 0; i < 128; ++i) {
+        too_long_value += "\xC2\xA2";
+    }
+
+    ASSERT_EQ(too_long_value.size(), 256u);
+
+    ASSERT_THROW(identifier.from_string(too_long_value), std::exception);
+}
+
+TEST(LowLevelE2E_Identifier, FromStringRejectsAsciiValueLongerThan255Bytes) {
+    RecordProperty("description", "Rejects creating an ASCII string identifier 
longer than 255 bytes.");
+    iggy::ffi::Identifier identifier;
+    const std::string too_long_value(256, 'a');
+
+    ASSERT_THROW(identifier.from_string(too_long_value), std::exception);
+}
+
+TEST(LowLevelE2E_Identifier, FromNumericCreatesNumericIdentifier) {
+    RecordProperty("description", "Creates a numeric identifier encoded as 
four little-endian bytes.");
+    iggy::ffi::Identifier identifier;
+    constexpr std::uint32_t value                        = 0x12345678;
+    constexpr std::array<std::uint8_t, 4> expected_bytes = {0x78, 0x56, 0x34, 
0x12};
+
+    ASSERT_NO_THROW(identifier.from_numeric(value));
+
+    ASSERT_EQ(identifier.kind, "numeric");
+    ASSERT_EQ(identifier.length, 4u);
+    ASSERT_EQ(identifier.value.size(), expected_bytes.size());
+    for (size_t i = 0; i < expected_bytes.size(); ++i) {
+        EXPECT_EQ(identifier.value[i], expected_bytes[i]);
+    }
+}
+
+TEST(LowLevelE2E_Identifier, FromNumericCreatesUint32MaxIdentifier) {
+    RecordProperty("description", "Creates a numeric identifier for UINT32_MAX 
using four 0xFF bytes.");
+    iggy::ffi::Identifier identifier;
+    constexpr std::array<std::uint8_t, 4> expected_bytes = {0xFF, 0xFF, 0xFF, 
0xFF};
+
+    
ASSERT_NO_THROW(identifier.from_numeric(std::numeric_limits<std::uint32_t>::max()));
+
+    ASSERT_EQ(identifier.kind, "numeric");
+    ASSERT_EQ(identifier.length, 4u);
+    ASSERT_EQ(identifier.value.size(), expected_bytes.size());
+    for (size_t i = 0; i < expected_bytes.size(); ++i) {
+        EXPECT_EQ(identifier.value[i], expected_bytes[i]);
+    }
+}
+
+TEST(LowLevelE2E_Identifier, FromNumericOverwritesExistingStringIdentifier) {
+    RecordProperty("description", "Replaces a previously created string 
identifier with numeric identifier data.");
+    iggy::ffi::Identifier identifier;
+
+    ASSERT_NO_THROW(identifier.from_string("temporary-name"));
+    ASSERT_NO_THROW(identifier.from_numeric(7));
+
+    ASSERT_EQ(identifier.kind, "numeric");
+    ASSERT_EQ(identifier.length, 4u);
+    ASSERT_EQ(identifier.value.size(), 4u);
+    EXPECT_EQ(identifier.value[0], 7u);
+    EXPECT_EQ(identifier.value[1], 0u);
+    EXPECT_EQ(identifier.value[2], 0u);
+    EXPECT_EQ(identifier.value[3], 0u);
+}
+
+TEST(LowLevelE2E_Identifier, FromStringOverwritesExistingNumericIdentifier) {
+    RecordProperty("description", "Replaces a previously created numeric 
identifier with string identifier data.");
+    const std::string value = "replacement-name";
+    iggy::ffi::Identifier identifier;
+
+    ASSERT_NO_THROW(identifier.from_numeric(42));
+    ASSERT_NO_THROW(identifier.from_string(value));
+
+    ASSERT_EQ(identifier.kind, "string");
+    ASSERT_EQ(identifier.length, value.size());
+    ASSERT_EQ(identifier.value.size(), value.size());
+    for (size_t i = 0; i < value.size(); ++i) {
+        EXPECT_EQ(identifier.value[i], static_cast<std::uint8_t>(value[i]));
+    }
+}
diff --git a/foreign/cpp/tests/partition/low_level_e2e.cpp 
b/foreign/cpp/tests/partition/low_level_e2e.cpp
new file mode 100644
index 000000000..ef06f483d
--- /dev/null
+++ b/foreign/cpp/tests/partition/low_level_e2e.cpp
@@ -0,0 +1,530 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// TODO(slbotbm): create fixture for setup/teardown.
+
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_Partition, CreatePartitionsSucceeds) {
+    RecordProperty("description", "Creates partitions for an existing topic 
and verifies the resulting count.");
+    const std::string stream_name = "cpp-create-partitions-happy-path";
+    const std::string topic_name  = "topic-create-partitions-happy-path";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    ASSERT_NO_THROW(
+        client->create_partitions(make_string_identifier(stream_name), 
make_string_identifier(topic_name), 43));
+
+    ASSERT_NO_THROW({
+        const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+        ASSERT_EQ(stream_details.topics.size(), 1u);
+        EXPECT_EQ(stream_details.topics[0].name, topic_name);
+        EXPECT_EQ(stream_details.topics[0].partitions_count, 44u);
+    });
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, CreatePartitionsBeforeLoginThrows) {
+    RecordProperty("description",
+                   "Throws when create_partitions is called before connect, 
and after connect but before login.");
+    const std::string stream_name = "cpp-create-partitions-before-login";
+    const std::string topic_name  = "topic-create-partitions-before-login";
+
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+
+    
ASSERT_THROW(client->create_partitions(make_string_identifier(stream_name), 
make_string_identifier(topic_name), 1),
+                 std::exception);
+    ASSERT_NO_THROW(client->connect());
+    
ASSERT_THROW(client->create_partitions(make_string_identifier(stream_name), 
make_string_identifier(topic_name), 1),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, CreatePartitionsOnNonExistentResourcesThrows) {
+    RecordProperty("description", "Throws when create_partitions is called for 
a stream or topic that does not exist.");
+    const std::string stream_name         = 
"cpp-create-partitions-missing-resource-stream";
+    const std::string topic_name          = 
"topic-create-partitions-missing-resource-topic";
+    const std::string missing_stream_name = 
"cpp-create-partitions-missing-stream";
+    const std::string missing_topic_name  = 
"topic-create-partitions-missing-topic";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    ASSERT_THROW(
+        client->create_partitions(make_string_identifier(missing_stream_name), 
make_string_identifier(topic_name), 1),
+        std::exception);
+    ASSERT_THROW(
+        client->create_partitions(make_string_identifier(stream_name), 
make_string_identifier(missing_topic_name), 1),
+        std::exception);
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, CreatePartitionsWithInvalidIdentifiersThrows) {
+    RecordProperty("description", "Rejects create_partitions requests that use 
invalid stream or topic identifiers.");
+    const std::string stream_name = "cpp-create-partitions-invalid-identifier";
+    const std::string topic_name  = 
"topic-create-partitions-invalid-identifier";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    iggy::ffi::Identifier invalid_stream_kind_id;
+    invalid_stream_kind_id.kind   = "invalid";
+    invalid_stream_kind_id.length = 4;
+    invalid_stream_kind_id.value  = {1, 0, 0, 0};
+    ASSERT_THROW(client->create_partitions(std::move(invalid_stream_kind_id), 
make_string_identifier(topic_name), 1),
+                 std::exception);
+
+    iggy::ffi::Identifier invalid_stream_numeric_id;
+    invalid_stream_numeric_id.kind   = "numeric";
+    invalid_stream_numeric_id.length = 1;
+    invalid_stream_numeric_id.value.push_back(1);
+    
ASSERT_THROW(client->create_partitions(std::move(invalid_stream_numeric_id), 
make_string_identifier(topic_name), 1),
+                 std::exception);
+
+    iggy::ffi::Identifier invalid_topic_kind_id;
+    invalid_topic_kind_id.kind   = "invalid";
+    invalid_topic_kind_id.length = 4;
+    invalid_topic_kind_id.value  = {1, 0, 0, 0};
+    
ASSERT_THROW(client->create_partitions(make_string_identifier(stream_name), 
std::move(invalid_topic_kind_id), 1),
+                 std::exception);
+
+    iggy::ffi::Identifier invalid_topic_numeric_id;
+    invalid_topic_numeric_id.kind   = "numeric";
+    invalid_topic_numeric_id.length = 1;
+    invalid_topic_numeric_id.value.push_back(1);
+    
ASSERT_THROW(client->create_partitions(make_string_identifier(stream_name), 
std::move(invalid_topic_numeric_id), 1),
+                 std::exception);
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, CreatePartitionsWithBoundaryPartitionsCountValues) 
{
+    RecordProperty("description",
+                   "Accepts supported create_partitions counts and rejects 
values outside the allowed range.");
+    const std::string stream_name = "cpp-create-partitions-boundary-values";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+
+    struct TestCase {
+        std::string topic_name;
+        std::uint32_t partitions_count;
+        bool should_succeed;
+        std::uint32_t expected_total_partitions;
+    };
+
+    const std::vector<TestCase> test_cases = {
+        {"topic-partitions-minus-1", static_cast<std::uint32_t>(-1), false, 1},
+        {"topic-partitions-0", 0, false, 1},
+        {"topic-partitions-1", 1, true, 2},
+        {"topic-partitions-43", 43, true, 44},
+        {"topic-partitions-1000", 1000, true, 1001},
+        {"topic-partitions-1001", 1001, false, 1},
+    };
+
+    for (const auto &test_case : test_cases) {
+        SCOPED_TRACE(test_case.topic_name);
+        
ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
test_case.topic_name, 1, "none", 0,
+                                             "server_default", 0, 
"server_default"));
+
+        if (test_case.should_succeed) {
+            
ASSERT_NO_THROW(client->create_partitions(make_string_identifier(stream_name),
+                                                      
make_string_identifier(test_case.topic_name),
+                                                      
test_case.partitions_count));
+        } else {
+            ASSERT_THROW(
+                client->create_partitions(make_string_identifier(stream_name),
+                                          
make_string_identifier(test_case.topic_name), test_case.partitions_count),
+                std::exception);
+        }
+    }
+
+    ASSERT_NO_THROW({
+        const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+        ASSERT_EQ(stream_details.topics.size(), test_cases.size());
+        for (const auto &test_case : test_cases) {
+            bool found = false;
+            for (const auto &topic : stream_details.topics) {
+                if (topic.name == test_case.topic_name) {
+                    EXPECT_EQ(topic.partitions_count, 
test_case.expected_total_partitions);
+                    found = true;
+                    break;
+                }
+            }
+            EXPECT_TRUE(found) << "Missing topic " << test_case.topic_name;
+        }
+    });
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, CreatePartitionsWithNumericIdentifiersSucceeds) {
+    RecordProperty("description",
+                   "Creates partitions successfully when valid numeric stream 
and topic identifiers are used.");
+    const std::string stream_name = 
"cpp-create-partitions-numeric-identifiers";
+    const std::string topic_name  = 
"topic-create-partitions-numeric-identifiers";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 1, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+    ASSERT_EQ(stream_details.topics.size(), 1u);
+
+    
ASSERT_NO_THROW(client->create_partitions(make_numeric_identifier(stream_details.id),
+                                              
make_numeric_identifier(stream_details.topics[0].id), 43));
+
+    ASSERT_NO_THROW({
+        const auto updated_stream_details = 
client->get_stream(make_numeric_identifier(stream_details.id));
+        ASSERT_EQ(updated_stream_details.topics.size(), 1u);
+        EXPECT_EQ(updated_stream_details.topics[0].id, 
stream_details.topics[0].id);
+        EXPECT_EQ(updated_stream_details.topics[0].name, topic_name);
+        EXPECT_EQ(updated_stream_details.topics[0].partitions_count, 44u);
+    });
+
+    
ASSERT_NO_THROW(client->delete_stream(make_numeric_identifier(stream_details.id)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsSucceeds) {
+    RecordProperty("description", "Deletes partitions from an existing topic 
and verifies the resulting count.");
+    const std::string stream_name = "cpp-delete-partitions-happy-path";
+    const std::string topic_name  = "topic-delete-partitions-happy-path";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 44, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    ASSERT_NO_THROW(
+        client->delete_partitions(make_string_identifier(stream_name), 
make_string_identifier(topic_name), 43));
+
+    ASSERT_NO_THROW({
+        const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+        ASSERT_EQ(stream_details.topics.size(), 1u);
+        EXPECT_EQ(stream_details.topics[0].name, topic_name);
+        EXPECT_EQ(stream_details.topics[0].partitions_count, 1u);
+    });
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeleteMorePartitionsThanExistingThrows) {
+    RecordProperty("description",
+                   "Rejects delete_partitions counts outside the allowed range 
and counts greater than existing.");
+    const std::string stream_name = "cpp-delete-partitions-boundary-values";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+
+    struct TestCase {
+        std::string topic_name;
+        std::uint32_t partitions_count;
+        bool should_succeed;
+        std::uint32_t initial_partitions;
+        std::uint32_t expected_total_partitions;
+    };
+
+    const std::vector<TestCase> test_cases = {
+        {"topic-delete-partitions-minus-1", static_cast<std::uint32_t>(-1), 
false, 3, 3},
+        {"topic-delete-partitions-0", 0, false, 3, 3},
+        {"topic-delete-partitions-1", 1, true, 3, 2},
+        {"topic-delete-partitions-4", 4, false, 3, 3},
+    };
+
+    for (const auto &test_case : test_cases) {
+        SCOPED_TRACE(test_case.topic_name);
+        
ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
test_case.topic_name,
+                                             test_case.initial_partitions, 
"none", 0, "server_default", 0,
+                                             "server_default"));
+
+        if (test_case.should_succeed) {
+            
ASSERT_NO_THROW(client->delete_partitions(make_string_identifier(stream_name),
+                                                      
make_string_identifier(test_case.topic_name),
+                                                      
test_case.partitions_count));
+        } else {
+            ASSERT_THROW(
+                client->delete_partitions(make_string_identifier(stream_name),
+                                          
make_string_identifier(test_case.topic_name), test_case.partitions_count),
+                std::exception);
+        }
+    }
+
+    ASSERT_NO_THROW({
+        const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+        ASSERT_EQ(stream_details.topics.size(), test_cases.size());
+        for (const auto &test_case : test_cases) {
+            bool found = false;
+            for (const auto &topic : stream_details.topics) {
+                if (topic.name == test_case.topic_name) {
+                    EXPECT_EQ(topic.partitions_count, 
test_case.expected_total_partitions);
+                    found = true;
+                    break;
+                }
+            }
+            EXPECT_TRUE(found) << "Missing topic " << test_case.topic_name;
+        }
+    });
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, 
DeletePartitionsBeforeCreatingAdditionalPartitionsSucceeds) {
+    RecordProperty("description",
+                   "Deletes partitions from the initial topic allocation 
without calling create_partitions first.");
+    const std::string stream_name = 
"cpp-delete-partitions-before-create-partitions";
+    const std::string topic_name  = 
"topic-delete-partitions-before-create-partitions";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 3, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    ASSERT_NO_THROW(
+        client->delete_partitions(make_string_identifier(stream_name), 
make_string_identifier(topic_name), 1));
+
+    ASSERT_NO_THROW({
+        const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+        ASSERT_EQ(stream_details.topics.size(), 1u);
+        EXPECT_EQ(stream_details.topics[0].partitions_count, 2u);
+    });
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsFromTopicWithZeroPartitionsThrows) 
{
+    RecordProperty("description",
+                   "Throws when delete_partitions is called with count 1 for a 
topic that currently has 0 partitions.");
+    const std::string stream_name = 
"cpp-delete-partitions-zero-existing-partitions";
+    const std::string topic_name  = 
"topic-delete-partitions-zero-existing-partitions";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 0, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    
ASSERT_THROW(client->delete_partitions(make_string_identifier(stream_name), 
make_string_identifier(topic_name), 1),
+                 std::exception);
+
+    ASSERT_NO_THROW({
+        const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+        ASSERT_EQ(stream_details.topics.size(), 1u);
+        EXPECT_EQ(stream_details.topics[0].name, topic_name);
+        EXPECT_EQ(stream_details.topics[0].partitions_count, 0u);
+    });
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsBeforeLoginThrows) {
+    RecordProperty("description",
+                   "Throws when delete_partitions is called before connect, 
and after connect but before login.");
+    const std::string stream_name = "cpp-delete-partitions-before-login";
+    const std::string topic_name  = "topic-delete-partitions-before-login";
+
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+
+    
ASSERT_THROW(client->delete_partitions(make_string_identifier(stream_name), 
make_string_identifier(topic_name), 1),
+                 std::exception);
+    ASSERT_NO_THROW(client->connect());
+    
ASSERT_THROW(client->delete_partitions(make_string_identifier(stream_name), 
make_string_identifier(topic_name), 1),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsOnNonExistentResourcesThrows) {
+    RecordProperty("description", "Throws when delete_partitions is called for 
a stream or topic that does not exist.");
+    const std::string stream_name         = 
"cpp-delete-partitions-missing-resource-stream";
+    const std::string topic_name          = 
"topic-delete-partitions-missing-resource-topic";
+    const std::string missing_stream_name = 
"cpp-delete-partitions-missing-stream";
+    const std::string missing_topic_name  = 
"topic-delete-partitions-missing-topic";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 3, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    ASSERT_THROW(
+        client->delete_partitions(make_string_identifier(missing_stream_name), 
make_string_identifier(topic_name), 1),
+        std::exception);
+    ASSERT_THROW(
+        client->delete_partitions(make_string_identifier(stream_name), 
make_string_identifier(missing_topic_name), 1),
+        std::exception);
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsWithInvalidIdentifiersThrows) {
+    RecordProperty("description", "Rejects delete_partitions requests that use 
invalid stream or topic identifiers.");
+    const std::string stream_name = "cpp-delete-partitions-invalid-identifier";
+    const std::string topic_name  = 
"topic-delete-partitions-invalid-identifier";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 3, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    iggy::ffi::Identifier invalid_stream_kind_id;
+    invalid_stream_kind_id.kind   = "invalid";
+    invalid_stream_kind_id.length = 4;
+    invalid_stream_kind_id.value  = {1, 0, 0, 0};
+    ASSERT_THROW(client->delete_partitions(std::move(invalid_stream_kind_id), 
make_string_identifier(topic_name), 1),
+                 std::exception);
+
+    iggy::ffi::Identifier invalid_stream_numeric_id;
+    invalid_stream_numeric_id.kind   = "numeric";
+    invalid_stream_numeric_id.length = 1;
+    invalid_stream_numeric_id.value.push_back(1);
+    
ASSERT_THROW(client->delete_partitions(std::move(invalid_stream_numeric_id), 
make_string_identifier(topic_name), 1),
+                 std::exception);
+
+    iggy::ffi::Identifier invalid_topic_kind_id;
+    invalid_topic_kind_id.kind   = "invalid";
+    invalid_topic_kind_id.length = 4;
+    invalid_topic_kind_id.value  = {1, 0, 0, 0};
+    
ASSERT_THROW(client->delete_partitions(make_string_identifier(stream_name), 
std::move(invalid_topic_kind_id), 1),
+                 std::exception);
+
+    iggy::ffi::Identifier invalid_topic_numeric_id;
+    invalid_topic_numeric_id.kind   = "numeric";
+    invalid_topic_numeric_id.length = 1;
+    invalid_topic_numeric_id.value.push_back(1);
+    
ASSERT_THROW(client->delete_partitions(make_string_identifier(stream_name), 
std::move(invalid_topic_numeric_id), 1),
+                 std::exception);
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsTwiceForSameTopicSucceeds) {
+    RecordProperty("description", "Allows delete_partitions to be called twice 
for the same stream and topic.");
+    const std::string stream_name = "cpp-delete-partitions-twice-same-topic";
+    const std::string topic_name  = "topic-delete-partitions-twice-same-topic";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 45, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+    ASSERT_NO_THROW(
+        client->delete_partitions(make_string_identifier(stream_name), 
make_string_identifier(topic_name), 20));
+    ASSERT_NO_THROW(
+        client->delete_partitions(make_string_identifier(stream_name), 
make_string_identifier(topic_name), 20));
+
+    ASSERT_NO_THROW({
+        const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+        ASSERT_EQ(stream_details.topics.size(), 1u);
+        EXPECT_EQ(stream_details.topics[0].name, topic_name);
+        EXPECT_EQ(stream_details.topics[0].partitions_count, 5u);
+    });
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+TEST(LowLevelE2E_Partition, DeletePartitionsAfterStreamDeletionThrows) {
+    RecordProperty("description", "Throws when delete_partitions is called 
after the stream has been deleted.");
+    const std::string stream_name = 
"cpp-delete-partitions-after-stream-deletion";
+    const std::string topic_name  = 
"topic-delete-partitions-after-stream-deletion";
+
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW(client->create_topic(make_string_identifier(stream_name), 
topic_name, 3, "none", 0,
+                                         "server_default", 0, 
"server_default"));
+
+    const auto stream_details = 
client->get_stream(make_string_identifier(stream_name));
+    ASSERT_EQ(stream_details.topics.size(), 1u);
+
+    
ASSERT_NO_THROW(client->delete_stream(make_numeric_identifier(stream_details.id)));
+
+    
ASSERT_THROW(client->delete_partitions(make_numeric_identifier(stream_details.id),
+                                           
make_numeric_identifier(stream_details.topics[0].id), 1),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
+// TODO(slbotbm): Add CreatePartitionsAfterTopicDeletionThrows test case after 
delete_topic function is added.
diff --git a/foreign/cpp/tests/stream/low_level_e2e.cpp 
b/foreign/cpp/tests/stream/low_level_e2e.cpp
index ac3e01b9e..43d0496de 100644
--- a/foreign/cpp/tests/stream/low_level_e2e.cpp
+++ b/foreign/cpp/tests/stream/low_level_e2e.cpp
@@ -25,6 +25,8 @@
 #include "lib.rs.h"
 #include "tests/common/test_helpers.hpp"
 
+// TODO(slbotbm): Add tests for purge_stream after implementing 
send_messages(...).
+
 TEST(LowLevelE2E_Stream, CreateStreamAfterLogin) {
     RecordProperty("description", "Creates a stream successfully after 
authenticating.");
     const std::string stream_name = "cpp-create-stream-after-login";
@@ -83,6 +85,27 @@ TEST(LowLevelE2E_Stream, 
CreateStreamValidatesNameConstraintsAndUniqueness) {
     client = nullptr;
 }
 
+TEST(LowLevelE2E_Stream, CreateStreamWithEmojiName) {
+    RecordProperty("description", "Creates a stream with a UTF-8 emoji name.");
+    const std::string stream_name = "πŸš€πŸš€πŸš€πŸš€Apache IggyπŸš€πŸš€πŸš€πŸš€";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    ASSERT_NO_THROW(client->create_stream(stream_name));
+    ASSERT_NO_THROW({
+        const auto stream_details              = 
client->get_stream(make_string_identifier(stream_name));
+        const std::string returned_stream_name = 
static_cast<std::string>(stream_details.name);
+        EXPECT_NE(stream_details.id, 0u);
+        EXPECT_EQ(returned_stream_name, stream_name);
+        EXPECT_EQ(stream_details.topics_count, 0u);
+        EXPECT_EQ(stream_details.topics.size(), 0u);
+    });
+
+    
ASSERT_NO_THROW(client->delete_stream(make_string_identifier(stream_name)));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+    client = nullptr;
+}
+
 TEST(LowLevelE2E_Stream, StreamCreatedAndDeletedSuccessfully) {
     RecordProperty("description", "Creates a stream and deletes it 
successfully by string identifier.");
     const std::string stream_name = "cpp-delete-stream-created-and-deleted";
diff --git a/foreign/cpp/tests/topic/low_level_e2e.cpp 
b/foreign/cpp/tests/topic/low_level_e2e.cpp
index 79675d2ee..98af86753 100644
--- a/foreign/cpp/tests/topic/low_level_e2e.cpp
+++ b/foreign/cpp/tests/topic/low_level_e2e.cpp
@@ -29,6 +29,8 @@
 #include "lib.rs.h"
 #include "tests/common/test_helpers.hpp"
 
+// TODO(slbotbm): Add tests for purge_topic after implementing 
send_messages(...).
+
 TEST(LowLevelE2E_Topic, CreateTopicWithAllOptionCombinations) {
     RecordProperty("description",
                    "Creates topics across supported option combinations and 
verifies they are all returned.");


Reply via email to