This is an automated email from the ASF dual-hosted git repository.
hubcio pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iggy.git
The following commit(s) were added to refs/heads/master by this push:
new 239d7ee0b feat(cpp): add messaging FFI functions for C++ SDK (#3046)
239d7ee0b is described below
commit 239d7ee0bd2721c572d38b06669efca21e0c2f6b
Author: xin <[email protected]>
AuthorDate: Mon May 11 21:54:34 2026 +0900
feat(cpp): add messaging FFI functions for C++ SDK (#3046)
---
foreign/cpp/Cargo.toml | 5 +-
foreign/cpp/build.rs | 1 +
foreign/cpp/src/client.rs | 234 ++++-
foreign/cpp/src/identifier.rs | 4 +
foreign/cpp/src/lib.rs | 78 ++
foreign/cpp/src/messages.rs | 85 ++
foreign/cpp/src/stream.rs | 14 +
foreign/cpp/tests/client/low_level_e2e.cpp | 1 +
foreign/cpp/tests/common/test_helpers.hpp | 17 +
foreign/cpp/tests/message/low_level_e2e.cpp | 1249 +++++++++++++++++++++++++++
foreign/cpp/tests/message/unit_tests.cpp | 116 +++
foreign/cpp/tests/stream/low_level_e2e.cpp | 170 ++++
12 files changed, 1970 insertions(+), 4 deletions(-)
diff --git a/foreign/cpp/Cargo.toml b/foreign/cpp/Cargo.toml
index 477cb0b74..a5642f771 100644
--- a/foreign/cpp/Cargo.toml
+++ b/foreign/cpp/Cargo.toml
@@ -27,10 +27,13 @@ ignored = ["cxx-build"]
crate-type = ["staticlib"]
[dependencies]
+bytes = "1.11.1"
cxx = "1.0.194"
iggy = { path = "../../core/sdk" }
iggy_common = { path = "../../core/common" }
-tokio = { version = "1.49.0", features = ["rt-multi-thread"] }
+# Explicitly enable the runtime + I/O drivers required by
`Runtime::enable_all()` in lib.rs.
+# Listing the features here insulates this crate from upstream SDK feature
changes.
+tokio = { version = "1.49.0", features = ["rt-multi-thread", "macros", "time",
"net", "io-util"] }
[build-dependencies]
cxx-build = "1.0.194"
diff --git a/foreign/cpp/build.rs b/foreign/cpp/build.rs
index 476a9d796..d2cbf0094 100644
--- a/foreign/cpp/build.rs
+++ b/foreign/cpp/build.rs
@@ -24,6 +24,7 @@ fn main() {
println!("cargo:rerun-if-changed=src/consumer_group.rs");
println!("cargo:rerun-if-changed=src/identifier.rs");
println!("cargo:rerun-if-changed=src/lib.rs");
+ println!("cargo:rerun-if-changed=src/messages.rs");
println!("cargo:rerun-if-changed=src/stream.rs");
println!("cargo:rerun-if-changed=src/topic.rs");
}
diff --git a/foreign/cpp/src/client.rs b/foreign/cpp/src/client.rs
index 341e0c1a0..d4500937f 100644
--- a/foreign/cpp/src/client.rs
+++ b/foreign/cpp/src/client.rs
@@ -17,18 +17,40 @@
use crate::{RUNTIME, ffi};
use iggy::prelude::{
- Client as IggyConnectionClient, CompressionAlgorithm as
RustCompressionAlgorithm,
+ Client as IggyConnectionClient, CompressionAlgorithm as
RustCompressionAlgorithm, Consumer,
ConsumerGroupClient, Identifier as RustIdentifier, IggyClient as
RustIggyClient,
IggyClientBuilder as RustIggyClientBuilder, IggyError, IggyExpiry as
RustIggyExpiry,
- MaxTopicSize as RustMaxTopicSize, PartitionClient, StreamClient,
TopicClient, UserClient,
+ IggyMessage, IggyTimestamp, MaxTopicSize as RustMaxTopicSize,
MessageClient, PartitionClient,
+ Partitioning, PollingStrategy, StreamClient, TopicClient, UserClient,
};
use std::str::FromStr;
use std::sync::Arc;
+/// Sentinel value passed from C++ to mean "no partition specified" — the
server picks the
+/// partition based on the consumer/strategy. Cxx FFI does not support
`Option<u32>`, so we
+/// reserve `u32::MAX` as the sentinel for `partition_id`.
+const ANY_PARTITION_ID: u32 = u32::MAX;
+
pub struct Client {
pub inner: Arc<RustIggyClient>,
}
+/// Creates a new client connection and returns a raw pointer to the
underlying [`Client`].
+///
+/// # Ownership
+///
+/// The returned `*mut Client` is owned by the caller (the C++ side). The
caller is responsible
+/// for calling [`delete_connection`] exactly once to release the resources.
Failing to do so
+/// leaks the underlying tokio runtime resources and the open network
connection.
+///
+/// # Safety
+///
+/// - Passing the pointer to [`delete_connection`] more than once is undefined
behaviour
+/// (double-free).
+/// - Using the pointer after [`delete_connection`] has been called is
undefined behaviour
+/// (use-after-free).
+/// - This function does not provide synchronisation. The pointer must not be
used concurrently
+/// from multiple threads unless the caller serialises access externally.
pub fn new_connection(connection_string: String) -> Result<*mut Client,
String> {
let connection_str = connection_string.as_str();
let client = match connection_str {
@@ -73,6 +95,17 @@ impl Client {
})
}
+ pub fn get_streams(&self) -> Result<Vec<ffi::Stream>, String> {
+ RUNTIME.block_on(async {
+ let streams = self
+ .inner
+ .get_streams()
+ .await
+ .map_err(|error| format!("Could not get streams: {error}"))?;
+ Ok(streams.into_iter().map(ffi::Stream::from).collect())
+ })
+ }
+
pub fn create_stream(&self, stream_name: String) -> Result<(), String> {
RUNTIME.block_on(async {
self.inner
@@ -127,6 +160,139 @@ impl Client {
// })
// }
+ #[allow(clippy::too_many_arguments)]
+ pub fn send_messages(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partitioning_kind: String,
+ partitioning_value: Vec<u8>,
+ messages: Vec<ffi::IggyMessageToSend>,
+ ) -> Result<(), String> {
+ let rust_stream_id = RustIdentifier::try_from(stream_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id)
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+
+ let partitioning = match partitioning_kind.as_str() {
+ "balanced" => Partitioning::balanced(),
+ "partition_id" => {
+ if partitioning_value.len() != 4 {
+ return Err(format!(
+ "Could not send messages: partition_id requires
exactly 4 bytes, got {}",
+ partitioning_value.len()
+ ));
+ }
+ let id =
+
u32::from_le_bytes(partitioning_value.as_slice().try_into().map_err(|_| {
+ "Could not send messages: invalid partition_id
value".to_string()
+ })?);
+ Partitioning::partition_id(id)
+ }
+ "messages_key" => {
+ if partitioning_value.is_empty() {
+ return Err(
+ "Could not send messages: messages_key requires a
non-empty value"
+ .to_string(),
+ );
+ }
+
Partitioning::messages_key(&partitioning_value).map_err(|error| {
+ format!("Could not send messages: invalid messages key:
{error}")
+ })?
+ }
+ _ => {
+ return Err(format!(
+ "Could not send messages: invalid partitioning kind:
{partitioning_kind}"
+ ));
+ }
+ };
+
+ let mut iggy_messages: Vec<IggyMessage> = messages
+ .into_iter()
+ .map(IggyMessage::try_from)
+ .collect::<Result<Vec<_>, _>>()?;
+
+ RUNTIME.block_on(async {
+ self.inner
+ .send_messages(
+ &rust_stream_id,
+ &rust_topic_id,
+ &partitioning,
+ &mut iggy_messages,
+ )
+ .await
+ .map_err(|error| format!("Could not send messages: {error}"))?;
+ Ok(())
+ })
+ }
+
+ #[allow(clippy::too_many_arguments)]
+ pub fn poll_messages(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ partition_id: u32,
+ consumer_kind: String,
+ consumer_id: ffi::Identifier,
+ polling_strategy_kind: String,
+ polling_strategy_value: u64,
+ count: u32,
+ auto_commit: bool,
+ ) -> Result<ffi::PolledMessages, String> {
+ let rust_stream_id = RustIdentifier::try_from(stream_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+ let rust_consumer_id = RustIdentifier::try_from(consumer_id)
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+
+ let consumer = match consumer_kind.as_str() {
+ "consumer" => Consumer::new(rust_consumer_id),
+ "consumer_group" => Consumer::group(rust_consumer_id),
+ _ => {
+ return Err(format!(
+ "Could not poll messages: invalid consumer kind:
{consumer_kind}"
+ ));
+ }
+ };
+
+ let strategy = match polling_strategy_kind.as_str() {
+ "offset" => PollingStrategy::offset(polling_strategy_value),
+ "timestamp" =>
PollingStrategy::timestamp(IggyTimestamp::from(polling_strategy_value)),
+ "first" => PollingStrategy::first(),
+ "last" => PollingStrategy::last(),
+ "next" => PollingStrategy::next(),
+ _ => {
+ return Err(format!(
+ "Could not poll messages: invalid polling strategy:
{polling_strategy_kind}"
+ ));
+ }
+ };
+
+ let opt_partition = if partition_id == ANY_PARTITION_ID {
+ None
+ } else {
+ Some(partition_id)
+ };
+
+ RUNTIME.block_on(async {
+ let polled = self
+ .inner
+ .poll_messages(
+ &rust_stream_id,
+ &rust_topic_id,
+ opt_partition,
+ &consumer,
+ &strategy,
+ count,
+ auto_commit,
+ )
+ .await
+ .map_err(|error| format!("Could not poll messages: {error}"))?;
+ Ok(ffi::PolledMessages::from(polled))
+ })
+ }
+
#[allow(clippy::too_many_arguments)]
pub fn create_topic(
&self,
@@ -378,6 +544,66 @@ impl Client {
Ok(())
})
}
+
+ pub fn join_consumer_group(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ group_id: ffi::Identifier,
+ ) -> Result<(), String> {
+ let rust_stream_id =
RustIdentifier::try_from(stream_id).map_err(|error| {
+ format!("Could not join consumer group: invalid stream identifier:
{error}")
+ })?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error|
{
+ format!("Could not join consumer group: invalid topic identifier:
{error}")
+ })?;
+ let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error|
{
+ format!("Could not join consumer group: invalid group identifier:
{error}")
+ })?;
+
+ RUNTIME.block_on(async {
+ self.inner
+ .join_consumer_group(&rust_stream_id, &rust_topic_id,
&rust_group_id)
+ .await
+ .map_err(|error| {
+ format!(
+ "Could not join consumer group '{}' for topic '{}' on
stream '{}': {error}",
+ rust_group_id, rust_topic_id, rust_stream_id
+ )
+ })?;
+ Ok(())
+ })
+ }
+
+ pub fn leave_consumer_group(
+ &self,
+ stream_id: ffi::Identifier,
+ topic_id: ffi::Identifier,
+ group_id: ffi::Identifier,
+ ) -> Result<(), String> {
+ let rust_stream_id =
RustIdentifier::try_from(stream_id).map_err(|error| {
+ format!("Could not leave consumer group: invalid stream
identifier: {error}")
+ })?;
+ let rust_topic_id = RustIdentifier::try_from(topic_id).map_err(|error|
{
+ format!("Could not leave consumer group: invalid topic identifier:
{error}")
+ })?;
+ let rust_group_id = RustIdentifier::try_from(group_id).map_err(|error|
{
+ format!("Could not leave consumer group: invalid group identifier:
{error}")
+ })?;
+
+ RUNTIME.block_on(async {
+ self.inner
+ .leave_consumer_group(&rust_stream_id, &rust_topic_id,
&rust_group_id)
+ .await
+ .map_err(|error| {
+ format!(
+ "Could not leave consumer group '{}' for topic '{}' on
stream '{}': {error}",
+ rust_group_id, rust_topic_id, rust_stream_id
+ )
+ })?;
+ Ok(())
+ })
+ }
}
pub unsafe fn delete_connection(client: *mut Client) -> Result<(), String> {
@@ -385,7 +611,9 @@ pub unsafe fn delete_connection(client: *mut Client) ->
Result<(), String> {
return Ok(());
}
- // TODO(slbotbm): Address comment from @hubcio: if logout_user will fail
you will have a leak, this will be tagged by e.g. valgrind if someone will test
iggy rigorously
+ // `Box::from_raw` below runs unconditionally, so the client is always
released regardless
+ // of `logout_result`. The result is only used to surface a logout error
to the caller — there
+ // is no leak path here.
let logout_result = RUNTIME.block_on(async { unsafe { &*client
}.inner.logout_user().await });
unsafe {
diff --git a/foreign/cpp/src/identifier.rs b/foreign/cpp/src/identifier.rs
index 02c9d8a48..93877458d 100644
--- a/foreign/cpp/src/identifier.rs
+++ b/foreign/cpp/src/identifier.rs
@@ -62,6 +62,10 @@ impl TryFrom<ffi::Identifier> for RustIdentifier {
}
}
+// Rust 1.95 added the `wrong_self_convention` lint for `from_*` methods that
take `&mut self`.
+// These methods initialize the FFI `Identifier` struct in place from C++ —
keeping the names
+// preserves the C++ ABI used by every test and downstream binding.
+#[allow(clippy::wrong_self_convention)]
impl ffi::Identifier {
pub fn from_string(&mut self, id: String) -> Result<(), String> {
*self = RustIdentifier::named(&id)
diff --git a/foreign/cpp/src/lib.rs b/foreign/cpp/src/lib.rs
index 4d47a5b94..7f87d5765 100644
--- a/foreign/cpp/src/lib.rs
+++ b/foreign/cpp/src/lib.rs
@@ -17,10 +17,12 @@
mod client;
mod consumer_group;
mod identifier;
+mod messages;
mod stream;
mod topic;
use client::{Client, delete_connection, new_connection};
+use messages::make_message;
use std::sync::LazyLock;
static RUNTIME: LazyLock<tokio::runtime::Runtime> = LazyLock::new(|| {
@@ -51,6 +53,43 @@ mod ffi {
partitions_count: u32,
}
+ struct Stream {
+ id: u32,
+ created_at: u64,
+ name: String,
+ size_bytes: u64,
+ messages_count: u64,
+ topics_count: u32,
+ }
+
+ struct IggyMessageToSend {
+ id_lo: u64,
+ id_hi: u64,
+ payload: Vec<u8>,
+ user_headers: Vec<u8>,
+ }
+
+ struct IggyMessagePolled {
+ checksum: u64,
+ id_lo: u64,
+ id_hi: u64,
+ offset: u64,
+ timestamp: u64,
+ origin_timestamp: u64,
+ user_headers_length: u32,
+ payload_length: u32,
+ reserved: u64,
+ payload: Vec<u8>,
+ user_headers: Vec<u8>,
+ }
+
+ struct PolledMessages {
+ partition_id: u32,
+ current_offset: u64,
+ count: u32,
+ messages: Vec<IggyMessagePolled>,
+ }
+
struct StreamDetails {
id: u32,
created_at: u64,
@@ -83,6 +122,7 @@ mod ffi {
fn login_user(self: &Client, username: String, password: String) ->
Result<()>;
fn connect(self: &Client) -> Result<()>;
fn create_stream(self: &Client, stream_name: String) -> Result<()>;
+ fn get_streams(self: &Client) -> Result<Vec<Stream>>;
fn get_stream(self: &Client, stream_id: Identifier) ->
Result<StreamDetails>;
fn delete_stream(self: &Client, stream_id: Identifier) -> Result<()>;
// fn purge_stream(&self, stream_id: Identifier) -> Result<()>;
@@ -129,6 +169,44 @@ mod ffi {
topic_id: Identifier,
group_id: Identifier,
) -> Result<()>;
+ fn join_consumer_group(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ group_id: Identifier,
+ ) -> Result<()>;
+ fn leave_consumer_group(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ group_id: Identifier,
+ ) -> Result<()>;
+
+ #[allow(clippy::too_many_arguments)]
+ fn poll_messages(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ partition_id: u32,
+ consumer_kind: String,
+ consumer_id: Identifier,
+ polling_strategy_kind: String,
+ polling_strategy_value: u64,
+ count: u32,
+ auto_commit: bool,
+ ) -> Result<PolledMessages>;
+
+ fn make_message(payload: Vec<u8>) -> IggyMessageToSend;
+
+ #[allow(clippy::too_many_arguments)]
+ fn send_messages(
+ self: &Client,
+ stream_id: Identifier,
+ topic_id: Identifier,
+ partitioning_kind: String,
+ partitioning_value: Vec<u8>,
+ messages: Vec<IggyMessageToSend>,
+ ) -> Result<()>;
unsafe fn delete_connection(client: *mut Client) -> Result<()>;
diff --git a/foreign/cpp/src/messages.rs b/foreign/cpp/src/messages.rs
new file mode 100644
index 000000000..2a36c62f3
--- /dev/null
+++ b/foreign/cpp/src/messages.rs
@@ -0,0 +1,85 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::ffi;
+use bytes::Bytes;
+use iggy::prelude::{IggyMessage as RustIggyMessage, PolledMessages as
RustPolledMessages};
+
+pub fn make_message(payload: Vec<u8>) -> ffi::IggyMessageToSend {
+ ffi::IggyMessageToSend {
+ id_lo: 0,
+ id_hi: 0,
+ payload,
+ user_headers: Vec::new(),
+ }
+}
+
+impl From<RustIggyMessage> for ffi::IggyMessagePolled {
+ fn from(m: RustIggyMessage) -> Self {
+ let id_bytes = m.header.id.to_le_bytes();
+ let id_lo = u64::from_le_bytes(id_bytes[0..8].try_into().unwrap());
+ let id_hi = u64::from_le_bytes(id_bytes[8..16].try_into().unwrap());
+ ffi::IggyMessagePolled {
+ checksum: m.header.checksum,
+ id_lo,
+ id_hi,
+ offset: m.header.offset,
+ timestamp: m.header.timestamp,
+ origin_timestamp: m.header.origin_timestamp,
+ user_headers_length: m.header.user_headers_length,
+ payload_length: m.header.payload_length,
+ reserved: m.header.reserved,
+ payload: m.payload.to_vec(),
+ user_headers: m.user_headers.map(|h|
h.to_vec()).unwrap_or_default(),
+ }
+ }
+}
+
+impl TryFrom<ffi::IggyMessageToSend> for RustIggyMessage {
+ type Error = String;
+
+ fn try_from(m: ffi::IggyMessageToSend) -> Result<Self, Self::Error> {
+ if !m.user_headers.is_empty() {
+ return Err(
+ "Could not convert message: user_headers are not yet supported
in the C++ SDK"
+ .to_string(),
+ );
+ }
+ let id = ((m.id_hi as u128) << 64) | (m.id_lo as u128);
+ let payload = Bytes::from(m.payload);
+ RustIggyMessage::builder()
+ .id(id)
+ .payload(payload)
+ .build()
+ .map_err(|error| format!("Could not convert message: {error}"))
+ }
+}
+
+impl From<RustPolledMessages> for ffi::PolledMessages {
+ fn from(p: RustPolledMessages) -> Self {
+ ffi::PolledMessages {
+ partition_id: p.partition_id,
+ current_offset: p.current_offset,
+ count: p.count,
+ messages: p
+ .messages
+ .into_iter()
+ .map(ffi::IggyMessagePolled::from)
+ .collect(),
+ }
+ }
+}
diff --git a/foreign/cpp/src/stream.rs b/foreign/cpp/src/stream.rs
index 5c1f6e9d7..b6e0ae22b 100644
--- a/foreign/cpp/src/stream.rs
+++ b/foreign/cpp/src/stream.rs
@@ -16,8 +16,22 @@
// under the License.
use crate::ffi;
+use iggy::prelude::Stream as RustStream;
use iggy::prelude::StreamDetails as RustStreamDetails;
+impl From<RustStream> for ffi::Stream {
+ fn from(s: RustStream) -> Self {
+ ffi::Stream {
+ id: s.id,
+ created_at: s.created_at.as_micros(),
+ name: s.name,
+ size_bytes: s.size.as_bytes_u64(),
+ messages_count: s.messages_count,
+ topics_count: s.topics_count,
+ }
+ }
+}
+
impl From<RustStreamDetails> for ffi::StreamDetails {
fn from(stream: RustStreamDetails) -> Self {
ffi::StreamDetails {
diff --git a/foreign/cpp/tests/client/low_level_e2e.cpp
b/foreign/cpp/tests/client/low_level_e2e.cpp
index 26b82b82f..7306f0c95 100644
--- a/foreign/cpp/tests/client/low_level_e2e.cpp
+++ b/foreign/cpp/tests/client/low_level_e2e.cpp
@@ -16,6 +16,7 @@
// under the License.
// TODO(slbotbm): create fixture for setup/teardown.
+// TODO(slbotbm): Add tests for join_consumer_group() and
leave_consumer_group()
#include <string>
diff --git a/foreign/cpp/tests/common/test_helpers.hpp
b/foreign/cpp/tests/common/test_helpers.hpp
index 5457c09d7..15851f08c 100644
--- a/foreign/cpp/tests/common/test_helpers.hpp
+++ b/foreign/cpp/tests/common/test_helpers.hpp
@@ -42,3 +42,20 @@ inline iggy::ffi::Client *login_to_server() {
client->login_user("iggy", "iggy");
return client;
}
+
+inline rust::Vec<std::uint8_t> to_payload(const std::string &s) {
+ rust::Vec<std::uint8_t> v;
+ for (const char c : s) {
+ v.push_back(static_cast<std::uint8_t>(c));
+ }
+ return v;
+}
+
+inline rust::Vec<std::uint8_t> partition_id_bytes(std::uint32_t id) {
+ rust::Vec<std::uint8_t> v;
+ v.push_back(static_cast<std::uint8_t>(id & 0xFF));
+ v.push_back(static_cast<std::uint8_t>((id >> 8) & 0xFF));
+ v.push_back(static_cast<std::uint8_t>((id >> 16) & 0xFF));
+ v.push_back(static_cast<std::uint8_t>((id >> 24) & 0xFF));
+ return v;
+}
diff --git a/foreign/cpp/tests/message/low_level_e2e.cpp
b/foreign/cpp/tests/message/low_level_e2e.cpp
new file mode 100644
index 000000000..4290afadf
--- /dev/null
+++ b/foreign/cpp/tests/message/low_level_e2e.cpp
@@ -0,0 +1,1249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstdint>
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) {
+ RecordProperty("description", "Sends 10 messages and polls them back,
verifying count, offsets, and payloads.");
+ const std::string stream_name = "cpp-msg-roundtrip";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 10; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("test message " +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0),
+ "partition_id",
partition_id_bytes(0), std::move(messages)));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 100, false);
+
+ ASSERT_EQ(polled.partition_id, 0u) << "Polled partition_id mismatches the
partition we sent to";
+ ASSERT_EQ(polled.count, 10u);
+ ASSERT_EQ(polled.messages.size(), 10u);
+ for (std::uint32_t i = 0; i < 10; i++) {
+ ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+ std::string expected = "test message " + std::to_string(i);
+ std::string actual(polled.messages[i].payload.begin(),
polled.messages[i].payload.end());
+ ASSERT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+ }
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) {
+ RecordProperty("description", "Verifies that polled message IDs match the
sent IDs.");
+ const std::string stream_name = "cpp-msg-verify-ids";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ auto msg = iggy::ffi::make_message(to_payload("id-test-message"));
+ msg.id_lo = 42;
+ msg.id_hi = 0;
+ messages.push_back(std::move(msg));
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 100, false);
+
+ ASSERT_EQ(polled.messages.size(), 1u);
+ ASSERT_EQ(polled.messages[0].id_lo, 42u);
+ ASSERT_EQ(polled.messages[0].id_hi, 0u);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) {
+ RecordProperty("description", "Verifies polling from an empty partition
returns zero messages.");
+ const std::string stream_name = "cpp-msg-empty-poll";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 100, false);
+
+ ASSERT_EQ(polled.count, 0u);
+ ASSERT_EQ(polled.messages.size(), 0u);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) {
+ RecordProperty("description", "Verifies send_messages throws when not
authenticated.");
+ iggy::ffi::Client *client = nullptr;
+ ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+ ASSERT_NE(client, nullptr);
+ ASSERT_NO_THROW(client->connect());
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ auto msg = iggy::ffi::make_message(to_payload("should-fail"));
+ messages.push_back(std::move(msg));
+
+ ASSERT_THROW(client->send_messages(make_numeric_identifier(1),
make_numeric_identifier(1), "partition_id",
+ partition_id_bytes(0),
std::move(messages)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidStreamId) {
+ RecordProperty("description", "Throws when sending messages with an
invalid stream identifier.");
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ auto msg = iggy::ffi::make_message(to_payload("test"));
+ messages.push_back(std::move(msg));
+
+ iggy::ffi::Identifier invalid_id;
+ invalid_id.kind = "invalid";
+ invalid_id.length = 0;
+
+ ASSERT_THROW(client->send_messages(invalid_id, make_numeric_identifier(1),
"partition_id", partition_id_bytes(0),
+ std::move(messages)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToNonExistentStream) {
+ RecordProperty("description", "Throws when sending messages to a
non-existent stream.");
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ auto msg = iggy::ffi::make_message(to_payload("test"));
+ messages.push_back(std::move(msg));
+
+
ASSERT_THROW(client->send_messages(make_string_identifier("nonexistent-stream-12345"),
make_numeric_identifier(0),
+ "partition_id", partition_id_bytes(0),
std::move(messages)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningKind) {
+ RecordProperty("description", "Throws when sending messages with an
invalid partitioning kind.");
+ const std::string stream_name = "cpp-msg-invalid-part-kind";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ auto msg = iggy::ffi::make_message(to_payload("test"));
+ messages.push_back(std::move(msg));
+
+ ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "invalid_kind",
+ partition_id_bytes(0),
std::move(messages)),
+ std::exception);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningValue) {
+ RecordProperty("description", "Throws when sending messages with
insufficient partitioning value bytes.");
+ const std::string stream_name = "cpp-msg-invalid-part-val";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ auto msg = iggy::ffi::make_message(to_payload("test"));
+ messages.push_back(std::move(msg));
+
+ rust::Vec<std::uint8_t> short_bytes;
+ short_bytes.push_back(0x00);
+ short_bytes.push_back(0x01);
+
+ ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ std::move(short_bytes),
std::move(messages)),
+ std::exception);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToSpecificPartitionVerified) {
+ RecordProperty("description",
+ "Verifies messages sent to a specific partition are only
retrievable from that partition.");
+ const std::string stream_name = "cpp-msg-specific-part";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 3,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("partition-test-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled_part0 =
client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0,
+ "consumer",
make_numeric_identifier(1), "offset", 0, 100, false);
+ ASSERT_EQ(polled_part0.partition_id, 0u);
+ ASSERT_EQ(polled_part0.count, 5u);
+
+ auto polled_part1 =
client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 1,
+ "consumer",
make_numeric_identifier(1), "offset", 0, 100, false);
+ ASSERT_EQ(polled_part1.partition_id, 1u);
+ ASSERT_EQ(polled_part1.count, 0u);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendEmptyMessageVectorThrows) {
+ RecordProperty("description", "Throws when sending an empty message
vector.");
+ const std::string stream_name = "cpp-msg-empty-vec";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> empty_messages;
+
+ ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0),
std::move(empty_messages)),
+ std::exception);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithEmptyPayloadThrows) {
+ RecordProperty("description", "Throws when sending a message with an empty
payload.");
+ const std::string stream_name = "cpp-msg-empty-payload";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ rust::Vec<std::uint8_t> empty_payload;
+ auto msg = iggy::ffi::make_message(std::move(empty_payload));
+ messages.push_back(std::move(msg));
+
+ ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0),
std::move(messages)),
+ std::exception);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithOversizedPayloadThrows) {
+ RecordProperty("description", "Throws when sending a message exceeding
maximum payload size.");
+ const std::string stream_name = "cpp-msg-oversized";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ // Build a payload one byte over the SDK's max payload size (64 MB).
cxx::Vec exposes no
+ // public reserve API, so the loop relies on amortised geometric growth.
+ constexpr std::uint32_t kOversizedPayloadBytes = 64'000'001u;
+ rust::Vec<std::uint8_t> oversized_payload;
+ for (std::uint32_t i = 0; i < kOversizedPayloadBytes; i++) {
+ oversized_payload.push_back(0x41);
+ }
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ auto msg = iggy::ffi::make_message(std::move(oversized_payload));
+ messages.push_back(std::move(msg));
+
+ ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0),
std::move(messages)),
+ std::exception);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesPreservesOrder) {
+ RecordProperty("description", "Verifies messages are stored and retrieved
in the order they were sent.");
+ const std::string stream_name = "cpp-msg-order";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 50; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("order-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 100, false);
+
+ ASSERT_EQ(polled.count, 50u);
+ for (std::uint32_t i = 0; i < 50; i++) {
+ ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+ std::string expected = "order-" + std::to_string(i);
+ std::string actual(polled.messages[i].payload.begin(),
polled.messages[i].payload.end());
+ EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+ }
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithDuplicateIds) {
+ RecordProperty("description", "Verifies sending multiple messages with the
same ID succeeds.");
+ const std::string stream_name = "cpp-msg-dup-ids";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 3; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("dup-id-msg-" +
std::to_string(i)));
+ msg.id_lo = 99;
+ msg.id_hi = 0;
+ messages.push_back(std::move(msg));
+ }
+
+ ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0),
+ "partition_id",
partition_id_bytes(0), std::move(messages)));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 100, false);
+
+ ASSERT_EQ(polled.count, 3u);
+ for (std::size_t i = 0; i < polled.messages.size(); i++) {
+ EXPECT_EQ(polled.messages[i].id_lo, 99u);
+ }
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithVariousPayloads) {
+ RecordProperty("description",
+ "Verifies various payload types including null bytes,
UTF-8, and binary data are preserved.");
+ const std::string stream_name = "cpp-msg-various-payloads";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<std::uint8_t> payload_null;
+ payload_null.push_back(0x00);
+ payload_null.push_back(0x01);
+ payload_null.push_back(0x00);
+ payload_null.push_back(0xFF);
+
+ rust::Vec<std::uint8_t> payload_binary;
+ payload_binary.push_back(0xDE);
+ payload_binary.push_back(0xAD);
+ payload_binary.push_back(0xBE);
+ payload_binary.push_back(0xEF);
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+
+ auto msg0 = iggy::ffi::make_message(to_payload("simple ascii"));
+ messages.push_back(std::move(msg0));
+
+ auto msg1 = iggy::ffi::make_message(std::move(payload_null));
+ messages.push_back(std::move(msg1));
+
+ auto msg2 = iggy::ffi::make_message(to_payload("héllo wörld"));
+ messages.push_back(std::move(msg2));
+
+ auto msg3 = iggy::ffi::make_message(std::move(payload_binary));
+ messages.push_back(std::move(msg3));
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 100, false);
+
+ ASSERT_EQ(polled.count, 4u);
+
+ std::string ascii_actual(polled.messages[0].payload.begin(),
polled.messages[0].payload.end());
+ EXPECT_EQ(ascii_actual, "simple ascii");
+
+ ASSERT_EQ(polled.messages[1].payload.size(), 4u);
+ EXPECT_EQ(polled.messages[1].payload[0], 0x00);
+ EXPECT_EQ(polled.messages[1].payload[1], 0x01);
+ EXPECT_EQ(polled.messages[1].payload[2], 0x00);
+ EXPECT_EQ(polled.messages[1].payload[3], 0xFF);
+
+ std::string utf8_actual(polled.messages[2].payload.begin(),
polled.messages[2].payload.end());
+ EXPECT_EQ(utf8_actual, "héllo wörld");
+
+ ASSERT_EQ(polled.messages[3].payload.size(), 4u);
+ EXPECT_EQ(polled.messages[3].payload[0], 0xDE);
+ EXPECT_EQ(polled.messages[3].payload[1], 0xAD);
+ EXPECT_EQ(polled.messages[3].payload[2], 0xBE);
+ EXPECT_EQ(polled.messages[3].payload[3], 0xEF);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesBeforeLoginThrows) {
+ RecordProperty("description", "Throws when polling messages before
authentication.");
+ iggy::ffi::Client *client = nullptr;
+ ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+ ASSERT_NE(client, nullptr);
+ ASSERT_NO_THROW(client->connect());
+
+ ASSERT_THROW(client->poll_messages(make_numeric_identifier(1),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 10, false),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidStreamIdThrows) {
+ RecordProperty("description", "Throws when polling messages with an
invalid stream identifier.");
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ iggy::ffi::Identifier invalid_id;
+ invalid_id.kind = "invalid";
+ invalid_id.length = 0;
+
+ ASSERT_THROW(client->poll_messages(invalid_id, make_numeric_identifier(0),
0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 10, false),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromNonExistentStreamThrows) {
+ RecordProperty("description", "Throws when polling messages from a
non-existent stream.");
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+
ASSERT_THROW(client->poll_messages(make_string_identifier("nonexistent-stream-poll"),
make_numeric_identifier(0), 0,
+ "consumer", make_numeric_identifier(1),
"offset", 0, 10, false),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerKindThrows) {
+ RecordProperty("description", "Throws when polling messages with an
invalid consumer kind.");
+ const std::string stream_name = "cpp-msg-invalid-consumer";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "invalid",
+ make_numeric_identifier(1), "offset",
0, 10, false),
+ std::exception);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidStrategyKindThrows) {
+ RecordProperty("description", "Throws when polling messages with an
invalid polling strategy kind.");
+ const std::string stream_name = "cpp-msg-invalid-strategy";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "invalid",
0, 10, false),
+ std::exception);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesCountLessThanAvailable) {
+ RecordProperty("description", "Returns only the requested count when fewer
messages are requested than available.");
+ const std::string stream_name = "cpp-msg-count-less";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 10; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("msg-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 5, false);
+
+ ASSERT_EQ(polled.count, 5u);
+ ASSERT_EQ(polled.messages.size(), 5u);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithLargeOffset) {
+ RecordProperty("description", "Returns zero messages when polling with an
offset beyond available messages.");
+ const std::string stream_name = "cpp-msg-large-offset";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("msg-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
999999, 100, false);
+
+ ASSERT_EQ(polled.count, 0u);
+ ASSERT_EQ(polled.messages.size(), 0u);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFirstStrategy) {
+ RecordProperty("description", "Verifies first polling strategy returns
messages from the beginning.");
+ const std::string stream_name = "cpp-msg-first-strategy";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 10; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("msg-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "first",
0, 3, false);
+
+ ASSERT_EQ(polled.count, 3u);
+ ASSERT_EQ(polled.messages.size(), 3u);
+ EXPECT_EQ(polled.messages[0].offset, 0u);
+ for (std::uint32_t i = 0; i < 3; i++) {
+ EXPECT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+ std::string expected = "msg-" + std::to_string(i);
+ std::string actual(polled.messages[i].payload.begin(),
polled.messages[i].payload.end());
+ EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+ }
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesLastStrategy) {
+ RecordProperty("description", "Verifies last polling strategy returns
messages from the end.");
+ const std::string stream_name = "cpp-msg-last-strategy";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 10; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("msg-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "last", 0,
3, false);
+
+ ASSERT_EQ(polled.count, 3u);
+ ASSERT_EQ(polled.messages.size(), 3u);
+ EXPECT_EQ(polled.messages[0].offset, 7u);
+ EXPECT_EQ(polled.messages[2].offset, 9u);
+ for (std::uint32_t i = 0; i < 3; i++) {
+ std::string expected = "msg-" + std::to_string(7 + i);
+ std::string actual(polled.messages[i].payload.begin(),
polled.messages[i].payload.end());
+ EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i;
+ }
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesNextStrategyNoAutoCommit) {
+ RecordProperty("description",
+ "Verifies next strategy without auto-commit returns the
same messages on repeated calls.");
+ const std::string stream_name = "cpp-msg-next-no-commit";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("msg-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled1 = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "next",
0, 100, false);
+ ASSERT_EQ(polled1.count, 5u);
+
+ auto polled2 = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "next",
0, 100, false);
+ ASSERT_EQ(polled2.count, 5u);
+ for (std::uint32_t i = 0; i < 5; i++) {
+ EXPECT_EQ(polled1.messages[i].offset, static_cast<std::uint64_t>(i));
+ std::string expected = "msg-" + std::to_string(i);
+ std::string actual(polled1.messages[i].payload.begin(),
polled1.messages[i].payload.end());
+ EXPECT_EQ(actual, expected) << "polled1 payload mismatch at index " <<
i;
+ }
+ for (std::uint32_t i = 0; i < 5; i++) {
+ EXPECT_EQ(polled2.messages[i].offset, static_cast<std::uint64_t>(i));
+ std::string expected = "msg-" + std::to_string(i);
+ std::string actual(polled2.messages[i].payload.begin(),
polled2.messages[i].payload.end());
+ EXPECT_EQ(actual, expected) << "polled2 payload mismatch at index " <<
i;
+ }
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesNextStrategyAutoCommit) {
+ RecordProperty("description", "Verifies next strategy with auto-commit
advances the offset on subsequent polls.");
+ const std::string stream_name = "cpp-msg-next-auto-commit";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 10; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("msg-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled1 = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "next",
0, 5, true);
+ ASSERT_EQ(polled1.count, 5u);
+ EXPECT_EQ(polled1.messages[0].offset, 0u);
+ EXPECT_EQ(polled1.messages[4].offset, 4u);
+
+ auto polled2 = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "next",
0, 5, true);
+ ASSERT_EQ(polled2.count, 5u);
+ EXPECT_EQ(polled2.messages[0].offset, 5u);
+ EXPECT_EQ(polled2.messages[4].offset, 9u);
+ for (std::uint32_t i = 0; i < 5; i++) {
+ std::string expected1 = "msg-" + std::to_string(i);
+ std::string actual1(polled1.messages[i].payload.begin(),
polled1.messages[i].payload.end());
+ EXPECT_EQ(actual1, expected1) << "polled1 payload mismatch at index "
<< i;
+ }
+ for (std::uint32_t i = 0; i < 5; i++) {
+ std::string expected2 = "msg-" + std::to_string(5 + i);
+ std::string actual2(polled2.messages[i].payload.begin(),
polled2.messages[i].payload.end());
+ EXPECT_EQ(actual2, expected2) << "polled2 payload mismatch at index "
<< i;
+ }
+
+ auto polled3 = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "next",
0, 5, true);
+ ASSERT_EQ(polled3.count, 0u);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesConsumerIdIndependence) {
+ RecordProperty("description", "Verifies different consumer IDs maintain
independent offsets.");
+ const std::string stream_name = "cpp-msg-consumer-indep";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("msg-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled_c1 = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0,
+ "consumer",
make_numeric_identifier(1), "next", 0, 3, true);
+ ASSERT_EQ(polled_c1.count, 3u);
+
+ auto polled_c2 = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0,
+ "consumer",
make_numeric_identifier(2), "next", 0, 5, true);
+ ASSERT_EQ(polled_c2.count, 5u);
+
+ auto polled_c1_again =
client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0,
+ "consumer",
make_numeric_identifier(1), "next", 0, 5, true);
+ ASSERT_EQ(polled_c1_again.count, 2u);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesMultipleSendsThenPollOrder) {
+ RecordProperty("description", "Verifies message ordering is preserved
across multiple send batches.");
+ const std::string stream_name = "cpp-msg-multi-batch-order";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> batch1;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("batch1-" +
std::to_string(i)));
+ batch1.push_back(std::move(msg));
+ }
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(batch1));
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> batch2;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("batch2-" +
std::to_string(i)));
+ batch2.push_back(std::move(msg));
+ }
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(batch2));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 100, false);
+
+ ASSERT_EQ(polled.count, 10u);
+ for (std::uint32_t i = 0; i < 10; i++) {
+ EXPECT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i)) <<
"Offset mismatch at index " << i;
+ }
+ for (std::uint32_t i = 0; i < 5; i++) {
+ std::string expected = "batch1-" + std::to_string(i);
+ std::string actual(polled.messages[i].payload.begin(),
polled.messages[i].payload.end());
+ EXPECT_EQ(actual, expected) << "batch1 payload mismatch at index " <<
i;
+ }
+ for (std::uint32_t i = 0; i < 5; i++) {
+ std::string expected = "batch2-" + std::to_string(i);
+ std::string actual(polled.messages[5 + i].payload.begin(),
polled.messages[5 + i].payload.end());
+ EXPECT_EQ(actual, expected) << "batch2 payload mismatch at index " <<
i;
+ }
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesMultipleCustomIds) {
+ RecordProperty("description", "Verifies multiple messages with distinct
custom IDs are all preserved.");
+ const std::string stream_name = "cpp-msg-multi-custom-ids";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ const std::uint64_t id_values[] = {100, 200, 300, 400, 500};
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("msg-" +
std::to_string(i)));
+ msg.id_lo = id_values[i];
+ msg.id_hi = 0;
+ messages.push_back(std::move(msg));
+ }
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 100, false);
+
+ ASSERT_EQ(polled.count, 5u);
+ for (std::uint32_t i = 0; i < 5; i++) {
+ EXPECT_EQ(polled.messages[i].id_lo, id_values[i]) << "ID mismatch at
index " << i;
+ EXPECT_EQ(polled.messages[i].id_hi, 0u);
+ }
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeletedThrows) {
+ RecordProperty("description", "Throws when polling messages after the
stream has been deleted.");
+ const std::string stream_name = "cpp-msg-deleted-stream";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ auto msg = iggy::ffi::make_message(to_payload("test"));
+ messages.push_back(std::move(msg));
+
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ std::uint32_t saved_stream_id = stream.id;
+ client->delete_stream(make_numeric_identifier(saved_stream_id));
+
+
ASSERT_THROW(client->poll_messages(make_numeric_identifier(saved_stream_id),
make_numeric_identifier(0), 0,
+ "consumer", make_numeric_identifier(1),
"offset", 0, 10, false),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionIdThrows) {
+ RecordProperty("description", "Throws when polling with a non-existent
partition ID.");
+ const std::string stream_name = "cpp-msg-invalid-partition";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 9999, "consumer",
+ make_numeric_identifier(1), "offset",
0, 10, false),
+ std::exception);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithCountZeroThrows) {
+ RecordProperty("description", "Throws when polling with count=0.");
+ const std::string stream_name = "cpp-msg-count-zero";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 0, false),
+ std::exception);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithoutSpecifyingPartition) {
+ RecordProperty("description",
+ "Verifies polling with partition_id=u32::MAX defaults to
partition 0 and returns messages.");
+ const std::string stream_name = "cpp-msg-no-partition";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("msg-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), UINT32_MAX,
+ "consumer",
make_numeric_identifier(1), "offset", 0, 100, false);
+
+ // The Rust side maps UINT32_MAX to None, so the server picks a partition.
With a single
+ // partition topic that should always be partition 0.
+ ASSERT_EQ(polled.partition_id, 0u) << "u32::MAX sentinel did not map to
None — partition_id sentinel regression?";
+ ASSERT_EQ(polled.count, 5u);
+ ASSERT_EQ(polled.messages.size(), 5u);
+ for (std::uint32_t i = 0; i < 5; i++) {
+ std::string expected = "msg-" + std::to_string(i);
+ std::string actual(polled.messages[i].payload.begin(),
polled.messages[i].payload.end());
+ EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i;
+ }
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesTimestampStrategy) {
+ RecordProperty("description",
+ "Verifies timestamp polling strategy returns messages with
timestamp >= the specified value.");
+ const std::string stream_name = "cpp-msg-timestamp-strategy";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> batch1;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("batch1-" +
std::to_string(i)));
+ batch1.push_back(std::move(msg));
+ }
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(batch1));
+
+ std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> batch2;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("batch2-" +
std::to_string(i)));
+ batch2.push_back(std::move(msg));
+ }
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(batch2));
+
+ auto all = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset", 0,
100, false);
+ ASSERT_EQ(all.count, 10u);
+
+ // IggyTimestamp::now() is microsecond-resolution and we slept 100ms
between batches; a gap
+ // smaller than half that window means the test has degraded into a
tautology on busy CI.
+ constexpr std::uint64_t kMinTimestampGapMicros = 50'000;
+ std::uint64_t batch1_timestamp = all.messages[0].timestamp;
+ std::uint64_t batch2_timestamp = all.messages[5].timestamp;
+ ASSERT_GT(batch2_timestamp, batch1_timestamp);
+ ASSERT_GE(batch2_timestamp - batch1_timestamp, kMinTimestampGapMicros)
+ << "Timestamp gap collapsed (" << (batch2_timestamp - batch1_timestamp)
+ << "us) — test no longer exercises timestamp filtering";
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(2),
"timestamp", batch2_timestamp, 100, false);
+
+ ASSERT_GE(polled.count, 5u);
+ // The server contract is `timestamp >= polling_strategy_value`. If a
batch1 message lands on
+ // exactly the same microsecond as batch2's first message, the count can
legitimately exceed 5,
+ // so verify by prefix rather than indexing each message against
`batch2-N`.
+ for (std::size_t i = 0; i < polled.messages.size(); i++) {
+ EXPECT_GE(polled.messages[i].timestamp, batch2_timestamp)
+ << "Message at index " << i << " has earlier timestamp";
+ std::string actual(polled.messages[i].payload.begin(),
polled.messages[i].payload.end());
+ EXPECT_TRUE(actual.rfind("batch1-", 0) == 0 || actual.rfind("batch2-",
0) == 0)
+ << "Polled message at index " << i << " has unexpected payload: "
<< actual;
+ }
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesMonotonicOffsets) {
+ RecordProperty("description",
+ "Verifies offsets are monotonically increasing and
continuous across multiple polls.");
+ const std::string stream_name = "cpp-msg-monotonic-offsets";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 20; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("mono-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ std::uint64_t expected_offset = 0;
+ for (int chunk = 0; chunk < 4; chunk++) {
+ auto polled =
+ client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
expected_offset, 5, false);
+
+ ASSERT_EQ(polled.count, 5u) << "Chunk " << chunk;
+ ASSERT_EQ(polled.messages.size(), 5u) << "Chunk " << chunk;
+
+ for (std::size_t i = 0; i < polled.messages.size(); i++) {
+ EXPECT_EQ(polled.messages[i].offset, expected_offset) << "Chunk "
<< chunk << " index " << i;
+ expected_offset++;
+ }
+ }
+
+ ASSERT_EQ(expected_offset, 20u);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesLargeBatch) {
+ RecordProperty("description", "Verifies sending a large batch of 1000
messages succeeds and all are retrievable.");
+ const std::string stream_name = "cpp-msg-large-batch";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 1000; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("batch-msg-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+
+ ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0),
+ "partition_id",
partition_id_bytes(0), std::move(messages)));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 1000, false);
+
+ ASSERT_EQ(polled.count, 1000u);
+ ASSERT_EQ(polled.messages.size(), 1000u);
+ EXPECT_EQ(polled.messages[0].offset, 0u);
+ EXPECT_EQ(polled.messages[999].offset, 999u);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidTopicIdThrows) {
+ RecordProperty("description", "Throws when sending messages with an
invalid topic identifier.");
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ auto msg = iggy::ffi::make_message(to_payload("test"));
+ messages.push_back(std::move(msg));
+
+ iggy::ffi::Identifier invalid_id;
+ invalid_id.kind = "invalid";
+ invalid_id.length = 0;
+
+ ASSERT_THROW(client->send_messages(make_numeric_identifier(1), invalid_id,
"partition_id", partition_id_bytes(0),
+ std::move(messages)),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidTopicIdThrows) {
+ RecordProperty("description", "Throws when polling messages with an
invalid topic identifier.");
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ iggy::ffi::Identifier invalid_id;
+ invalid_id.kind = "invalid";
+ invalid_id.length = 0;
+
+ ASSERT_THROW(client->poll_messages(make_numeric_identifier(1), invalid_id,
0, "consumer",
+ make_numeric_identifier(1), "offset",
0, 10, false),
+ std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerIdThrows) {
+ RecordProperty("description", "Throws when polling messages with an
invalid consumer identifier.");
+ const std::string stream_name = "cpp-msg-invalid-consumer-id";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ iggy::ffi::Identifier invalid_id;
+ invalid_id.kind = "invalid";
+ invalid_id.length = 0;
+
+ ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0, "consumer",
+ invalid_id, "offset", 0, 10, false),
+ std::exception);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, ConsumerGroupCreateJoinAndPollMessages) {
+ RecordProperty("description",
+ "Creates a consumer group, joins it, sends messages, and
polls them using consumer_group kind.");
+ const std::string stream_name = "cpp-msg-consumer-group";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ auto group =
+ client->create_consumer_group(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "test-group");
+ ASSERT_EQ(group.members_count, 0u);
+
+
ASSERT_NO_THROW(client->join_consumer_group(make_numeric_identifier(stream.id),
make_numeric_identifier(0),
+
make_numeric_identifier(group.id)));
+
+ auto group_after_join =
client->get_consumer_group(make_numeric_identifier(stream.id),
make_numeric_identifier(0),
+
make_numeric_identifier(group.id));
+ ASSERT_EQ(group_after_join.members_count, 1u);
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 10; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("cg-msg-" +
std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto polled = client->poll_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), 0,
+ "consumer_group",
make_numeric_identifier(group.id), "offset", 0, 100, false);
+
+ ASSERT_EQ(polled.count, 10u);
+ ASSERT_EQ(polled.messages.size(), 10u);
+ for (std::uint32_t i = 0; i < 10; i++) {
+ std::string expected = "cg-msg-" + std::to_string(i);
+ std::string actual(polled.messages[i].payload.begin(),
polled.messages[i].payload.end());
+ EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+ }
+
+
ASSERT_NO_THROW(client->leave_consumer_group(make_numeric_identifier(stream.id),
make_numeric_identifier(0),
+
make_numeric_identifier(group.id)));
+
+ auto group_after_leave =
client->get_consumer_group(make_numeric_identifier(stream.id),
make_numeric_identifier(0),
+
make_numeric_identifier(group.id));
+ ASSERT_EQ(group_after_leave.members_count, 0u);
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
diff --git a/foreign/cpp/tests/message/unit_tests.cpp
b/foreign/cpp/tests/message/unit_tests.cpp
new file mode 100644
index 000000000..6a2768c46
--- /dev/null
+++ b/foreign/cpp/tests/message/unit_tests.cpp
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+
+TEST(MessageTest, MakeMessageSetsPayload) {
+ RecordProperty("description", "Verifies make_message stores payload bytes
correctly.");
+ rust::Vec<std::uint8_t> payload;
+ const std::string text = "hello world";
+ for (const char c : text) {
+ payload.push_back(static_cast<std::uint8_t>(c));
+ }
+
+ auto msg = iggy::ffi::make_message(std::move(payload));
+
+ ASSERT_EQ(msg.payload.size(), text.size());
+ for (std::size_t i = 0; i < text.size(); i++) {
+ EXPECT_EQ(msg.payload[i], static_cast<std::uint8_t>(text[i]));
+ }
+}
+
+TEST(MessageTest, MakeMessageZerosIdAndHeaders) {
+ RecordProperty("description", "Verifies make_message initializes id and
user_headers to zero/empty.");
+ rust::Vec<std::uint8_t> payload;
+ payload.push_back(0x42);
+
+ auto msg = iggy::ffi::make_message(std::move(payload));
+
+ EXPECT_EQ(msg.id_lo, 0u);
+ EXPECT_EQ(msg.id_hi, 0u);
+ EXPECT_TRUE(msg.user_headers.empty());
+}
+
+TEST(MessageTest, MakeMessageWithEmptyPayload) {
+ RecordProperty("description", "Verifies make_message accepts an empty
payload.");
+ rust::Vec<std::uint8_t> empty_payload;
+
+ auto msg = iggy::ffi::make_message(std::move(empty_payload));
+
+ ASSERT_EQ(msg.payload.size(), 0u);
+}
+
+TEST(MessageTest, MakeMessageWithSingleByte) {
+ RecordProperty("description", "Verifies make_message works with a
single-byte payload.");
+ rust::Vec<std::uint8_t> payload;
+ payload.push_back(0xFF);
+
+ auto msg = iggy::ffi::make_message(std::move(payload));
+
+ ASSERT_EQ(msg.payload.size(), 1u);
+ EXPECT_EQ(msg.payload[0], 0xFF);
+}
+
+TEST(MessageTest, MakeMessageWithNullBytes) {
+ RecordProperty("description", "Verifies make_message preserves null bytes
in payload.");
+ rust::Vec<std::uint8_t> payload;
+ payload.push_back(0x00);
+ payload.push_back(0x01);
+ payload.push_back(0x00);
+
+ auto msg = iggy::ffi::make_message(std::move(payload));
+
+ ASSERT_EQ(msg.payload.size(), 3u);
+ EXPECT_EQ(msg.payload[0], 0x00);
+ EXPECT_EQ(msg.payload[1], 0x01);
+ EXPECT_EQ(msg.payload[2], 0x00);
+}
+
+TEST(MessageTest, MakeMessageThenSetCustomId) {
+ RecordProperty("description", "Verifies custom ID can be set after
make_message without affecting payload.");
+ rust::Vec<std::uint8_t> payload;
+ payload.push_back(0x42);
+ auto msg = iggy::ffi::make_message(std::move(payload));
+
+ msg.id_lo = 100;
+ msg.id_hi = 200;
+
+ EXPECT_EQ(msg.id_lo, 100u);
+ EXPECT_EQ(msg.id_hi, 200u);
+ ASSERT_EQ(msg.payload.size(), 1u);
+ EXPECT_EQ(msg.payload[0], 0x42);
+}
+
+TEST(MessageTest, MakeMessageWithLargePayload) {
+ RecordProperty("description", "Verifies make_message handles a larger
payload correctly.");
+ rust::Vec<std::uint8_t> payload;
+ for (std::uint32_t i = 0; i < 10000; i++) {
+ payload.push_back(static_cast<std::uint8_t>(i % 256));
+ }
+
+ auto msg = iggy::ffi::make_message(std::move(payload));
+
+ ASSERT_EQ(msg.payload.size(), 10000u);
+ EXPECT_EQ(msg.payload[0], 0u);
+ EXPECT_EQ(msg.payload[255], 255u);
+ EXPECT_EQ(msg.payload[256], 0u);
+}
diff --git a/foreign/cpp/tests/stream/low_level_e2e.cpp
b/foreign/cpp/tests/stream/low_level_e2e.cpp
index 43d0496de..f723571fa 100644
--- a/foreign/cpp/tests/stream/low_level_e2e.cpp
+++ b/foreign/cpp/tests/stream/low_level_e2e.cpp
@@ -19,6 +19,7 @@
#include <cstdint>
#include <string>
+#include <vector>
#include <gtest/gtest.h>
@@ -293,3 +294,172 @@ TEST(LowLevelE2E_Stream,
GetStreamByNumericIdentifierReturnsStreamDetails) {
ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
client = nullptr;
}
+
+TEST(LowLevelE2E_Stream, GetStreamsReturnsEmptyAfterCleanup) {
+ RecordProperty("description", "Verifies get_streams returns empty vector
after cleaning up all streams.");
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ auto streams = client->get_streams();
+ for (const auto &s : streams) {
+ client->delete_stream(make_numeric_identifier(s.id));
+ }
+
+ streams = client->get_streams();
+ ASSERT_EQ(streams.size(), 0);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Stream, GetStreamsReturnsStreamAfterCreation) {
+ RecordProperty("description", "Verifies created stream appears in
get_streams result.");
+ const std::string stream_name = "cpp-stream-get-streams";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto streams = client->get_streams();
+ ASSERT_GE(streams.size(), 1);
+
+ bool found = false;
+ for (const auto &s : streams) {
+ if (std::string(s.name) == stream_name) {
+ found = true;
+ EXPECT_GT(s.created_at, static_cast<std::uint64_t>(0));
+ EXPECT_EQ(s.size_bytes, static_cast<std::uint64_t>(0));
+ EXPECT_EQ(s.messages_count, static_cast<std::uint64_t>(0));
+ EXPECT_EQ(s.topics_count, 0u);
+ break;
+ }
+ }
+ ASSERT_TRUE(found) << "Stream '" << stream_name << "' not found in
get_streams result";
+
+ client->delete_stream(make_string_identifier(stream_name));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Stream, GetStreamsFieldsVerification) {
+ RecordProperty("description",
+ "Verifies get_streams returns correct field values after
creating stream with topic and messages.");
+ const std::string stream_name = "cpp-stream-fields-verify";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+ auto stream = client->get_stream(make_string_identifier(stream_name));
+ client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1,
"none", 0, "never_expire", 0,
+ "server_default");
+
+ rust::Vec<iggy::ffi::IggyMessageToSend> messages;
+ for (std::uint32_t i = 0; i < 5; i++) {
+ auto msg = iggy::ffi::make_message(to_payload("field-verify-message-"
+ std::to_string(i)));
+ messages.push_back(std::move(msg));
+ }
+ client->send_messages(make_numeric_identifier(stream.id),
make_numeric_identifier(0), "partition_id",
+ partition_id_bytes(0), std::move(messages));
+
+ auto streams = client->get_streams();
+ ASSERT_GE(streams.size(), 1u);
+
+ bool found = false;
+ for (const auto &s : streams) {
+ if (std::string(s.name) == stream_name) {
+ found = true;
+ EXPECT_EQ(s.topics_count, 1u);
+ EXPECT_EQ(s.messages_count, 5u);
+ break;
+ }
+ }
+ ASSERT_TRUE(found) << "Stream '" << stream_name << "' not found in
get_streams result";
+
+ client->delete_stream(make_numeric_identifier(stream.id));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Stream, GetStreamsBeforeLoginThrows) {
+ RecordProperty("description", "Throws when get_streams is called before
authentication.");
+ iggy::ffi::Client *client = nullptr;
+ ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+ ASSERT_NE(client, nullptr);
+
+ ASSERT_THROW(client->get_streams(), std::exception);
+ ASSERT_NO_THROW(client->connect());
+ ASSERT_THROW(client->get_streams(), std::exception);
+
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Stream, GetStreamsConsistentWithGetStream) {
+ RecordProperty("description", "Verifies get_streams result is consistent
with get_stream for the same stream.");
+ const std::string stream_name = "cpp-stream-consistency";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+
+ std::string list_name;
+ std::uint32_t list_id = 0;
+ std::uint32_t list_topics_count = 0;
+ std::uint64_t list_created_at = 0;
+ std::uint64_t list_size_bytes = 0;
+ auto streams = client->get_streams();
+ for (const auto &s : streams) {
+ if (std::string(s.name) == stream_name) {
+ list_name = std::string(s.name);
+ list_id = s.id;
+ list_topics_count = s.topics_count;
+ list_created_at = s.created_at;
+ list_size_bytes = s.size_bytes;
+ break;
+ }
+ }
+ ASSERT_FALSE(list_name.empty()) << "Stream '" << stream_name << "' not
found in get_streams result";
+
+ auto single =
client->get_stream(make_string_identifier(stream_name));
+ auto single_name = std::string(single.name);
+ auto single_topics = single.topics_count;
+
+ EXPECT_EQ(list_name, single_name);
+ EXPECT_EQ(list_id, single.id);
+ EXPECT_EQ(list_topics_count, single_topics);
+ EXPECT_EQ(list_created_at, single.created_at);
+ EXPECT_EQ(list_size_bytes, single.size_bytes);
+
+ client->delete_stream(make_string_identifier(stream_name));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}
+
+TEST(LowLevelE2E_Stream, GetStreamsRepeatedCallsReturnSameResult) {
+ RecordProperty("description", "Verifies repeated get_streams calls return
consistent results.");
+ const std::string stream_name = "cpp-stream-repeated";
+ iggy::ffi::Client *client = login_to_server();
+ ASSERT_NE(client, nullptr);
+
+ client->create_stream(stream_name);
+
+ auto streams1 = client->get_streams();
+ auto streams2 = client->get_streams();
+ auto streams3 = client->get_streams();
+
+ ASSERT_EQ(streams1.size(), streams2.size());
+ ASSERT_EQ(streams2.size(), streams3.size());
+
+ auto contains_stream = [&](const rust::Vec<iggy::ffi::Stream> &vec) {
+ for (const auto &s : vec) {
+ if (std::string(s.name) == stream_name) {
+ return true;
+ }
+ }
+ return false;
+ };
+
+ ASSERT_TRUE(contains_stream(streams1)) << "Stream not found in first call";
+ ASSERT_TRUE(contains_stream(streams2)) << "Stream not found in second
call";
+ ASSERT_TRUE(contains_stream(streams3)) << "Stream not found in third call";
+
+ client->delete_stream(make_string_identifier(stream_name));
+ ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+ client = nullptr;
+}