seokjin0414 commented on code in PR #3046:
URL: https://github.com/apache/iggy/pull/3046#discussion_r3173005492


##########
foreign/cpp/tests/message/low_level_e2e.cpp:
##########
@@ -0,0 +1,1266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstdint>
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) {
+    RecordProperty("description", "Sends 10 messages and polls them back, 
verifying count, offsets, and payloads.");
+    const std::string stream_name = "cpp-msg-roundtrip";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("test message " + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 10u);
+    ASSERT_EQ(polled.messages.size(), 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "test message " + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        ASSERT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) {
+    RecordProperty("description", "Verifies that polled message IDs match the 
sent IDs.");
+    const std::string stream_name = "cpp-msg-verify-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("id-test-message"));
+    msg.id_lo = 42;
+    msg.id_hi = 0;
+    messages.push_back(std::move(msg));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.messages.size(), 1u);
+    ASSERT_EQ(polled.messages[0].id_lo, 42u);
+    ASSERT_EQ(polled.messages[0].id_hi, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) {
+    RecordProperty("description", "Verifies polling from an empty partition 
returns zero messages.");
+    const std::string stream_name = "cpp-msg-empty-poll";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 0u);
+    ASSERT_EQ(polled.messages.size(), 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) {

Review Comment:
   leaving as-is — @slbotbm said he'd handle it in a follow-up.



##########
foreign/cpp/tests/message/low_level_e2e.cpp:
##########
@@ -0,0 +1,1266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstdint>
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) {
+    RecordProperty("description", "Sends 10 messages and polls them back, 
verifying count, offsets, and payloads.");
+    const std::string stream_name = "cpp-msg-roundtrip";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("test message " + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 10u);
+    ASSERT_EQ(polled.messages.size(), 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "test message " + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        ASSERT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) {
+    RecordProperty("description", "Verifies that polled message IDs match the 
sent IDs.");
+    const std::string stream_name = "cpp-msg-verify-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("id-test-message"));
+    msg.id_lo = 42;
+    msg.id_hi = 0;
+    messages.push_back(std::move(msg));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.messages.size(), 1u);
+    ASSERT_EQ(polled.messages[0].id_lo, 42u);
+    ASSERT_EQ(polled.messages[0].id_hi, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) {
+    RecordProperty("description", "Verifies polling from an empty partition 
returns zero messages.");
+    const std::string stream_name = "cpp-msg-empty-poll";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 0u);
+    ASSERT_EQ(polled.messages.size(), 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) {
+    RecordProperty("description", "Verifies send_messages throws when not 
authenticated.");
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+    ASSERT_NO_THROW(client->connect());
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("should-fail"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(1), 
make_numeric_identifier(1), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidStreamId) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid stream identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->send_messages(invalid_id, make_numeric_identifier(1), 
"partition_id", partition_id_bytes(0),
+                                       std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToNonExistentStream) {
+    RecordProperty("description", "Throws when sending messages to a 
non-existent stream.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    
ASSERT_THROW(client->send_messages(make_string_identifier("nonexistent-stream-12345"),
 make_numeric_identifier(0),
+                                       "partition_id", partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningKind) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid partitioning kind.");
+    const std::string stream_name = "cpp-msg-invalid-part-kind";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "invalid_kind",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningValue) {
+    RecordProperty("description", "Throws when sending messages with 
insufficient partitioning value bytes.");
+    const std::string stream_name = "cpp-msg-invalid-part-val";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    rust::Vec<std::uint8_t> short_bytes;
+    short_bytes.push_back(0x00);
+    short_bytes.push_back(0x01);
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       std::move(short_bytes), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToSpecificPartitionVerified) {
+    RecordProperty("description",
+                   "Verifies messages sent to a specific partition are only 
retrievable from that partition.");
+    const std::string stream_name = "cpp-msg-specific-part";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 3, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("partition-test-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled_part0 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part0.count, 5u);
+
+    auto polled_part1 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 1,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part1.count, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendEmptyMessageVectorThrows) {
+    RecordProperty("description", "Throws when sending an empty message 
vector.");
+    const std::string stream_name = "cpp-msg-empty-vec";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> empty_messages;
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(empty_messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithEmptyPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message with an empty 
payload.");
+    const std::string stream_name = "cpp-msg-empty-payload";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    rust::Vec<std::uint8_t> empty_payload;
+    msg.new_message(std::move(empty_payload));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithOversizedPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message exceeding 
maximum payload size.");
+    const std::string stream_name = "cpp-msg-oversized";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<std::uint8_t> oversized_payload;
+    for (std::uint32_t i = 0; i < 64000001u; i++) {

Review Comment:
   tried `reserve_total` but cxx 1.0.194 has it private (caught in CI, reverted 
in 5d1f1e692). loop now relies on amortised growth — left a comment about it.



##########
foreign/cpp/tests/message/low_level_e2e.cpp:
##########
@@ -0,0 +1,1266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstdint>
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) {
+    RecordProperty("description", "Sends 10 messages and polls them back, 
verifying count, offsets, and payloads.");
+    const std::string stream_name = "cpp-msg-roundtrip";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("test message " + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 10u);
+    ASSERT_EQ(polled.messages.size(), 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "test message " + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        ASSERT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) {
+    RecordProperty("description", "Verifies that polled message IDs match the 
sent IDs.");
+    const std::string stream_name = "cpp-msg-verify-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("id-test-message"));
+    msg.id_lo = 42;
+    msg.id_hi = 0;
+    messages.push_back(std::move(msg));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.messages.size(), 1u);
+    ASSERT_EQ(polled.messages[0].id_lo, 42u);
+    ASSERT_EQ(polled.messages[0].id_hi, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) {
+    RecordProperty("description", "Verifies polling from an empty partition 
returns zero messages.");
+    const std::string stream_name = "cpp-msg-empty-poll";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 0u);
+    ASSERT_EQ(polled.messages.size(), 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) {
+    RecordProperty("description", "Verifies send_messages throws when not 
authenticated.");
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+    ASSERT_NO_THROW(client->connect());
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("should-fail"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(1), 
make_numeric_identifier(1), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidStreamId) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid stream identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->send_messages(invalid_id, make_numeric_identifier(1), 
"partition_id", partition_id_bytes(0),
+                                       std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToNonExistentStream) {
+    RecordProperty("description", "Throws when sending messages to a 
non-existent stream.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    
ASSERT_THROW(client->send_messages(make_string_identifier("nonexistent-stream-12345"),
 make_numeric_identifier(0),
+                                       "partition_id", partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningKind) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid partitioning kind.");
+    const std::string stream_name = "cpp-msg-invalid-part-kind";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "invalid_kind",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningValue) {
+    RecordProperty("description", "Throws when sending messages with 
insufficient partitioning value bytes.");
+    const std::string stream_name = "cpp-msg-invalid-part-val";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    rust::Vec<std::uint8_t> short_bytes;
+    short_bytes.push_back(0x00);
+    short_bytes.push_back(0x01);
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       std::move(short_bytes), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToSpecificPartitionVerified) {
+    RecordProperty("description",
+                   "Verifies messages sent to a specific partition are only 
retrievable from that partition.");
+    const std::string stream_name = "cpp-msg-specific-part";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 3, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("partition-test-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled_part0 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part0.count, 5u);
+
+    auto polled_part1 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 1,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part1.count, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendEmptyMessageVectorThrows) {
+    RecordProperty("description", "Throws when sending an empty message 
vector.");
+    const std::string stream_name = "cpp-msg-empty-vec";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> empty_messages;
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(empty_messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithEmptyPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message with an empty 
payload.");
+    const std::string stream_name = "cpp-msg-empty-payload";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    rust::Vec<std::uint8_t> empty_payload;
+    msg.new_message(std::move(empty_payload));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithOversizedPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message exceeding 
maximum payload size.");
+    const std::string stream_name = "cpp-msg-oversized";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<std::uint8_t> oversized_payload;
+    for (std::uint32_t i = 0; i < 64000001u; i++) {
+        oversized_payload.push_back(0x41);
+    }
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(std::move(oversized_payload));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesPreservesOrder) {
+    RecordProperty("description", "Verifies messages are stored and retrieved 
in the order they were sent.");
+    const std::string stream_name = "cpp-msg-order";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 50; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("order-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 50u);
+    for (std::uint32_t i = 0; i < 50; i++) {
+        ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "order-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithDuplicateIds) {
+    RecordProperty("description", "Verifies sending multiple messages with the 
same ID succeeds.");
+    const std::string stream_name = "cpp-msg-dup-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 3; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("dup-id-msg-" + std::to_string(i)));
+        msg.id_lo = 99;
+        msg.id_hi = 0;
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 3u);
+    for (std::size_t i = 0; i < polled.messages.size(); i++) {
+        EXPECT_EQ(polled.messages[i].id_lo, 99u);
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithVariousPayloads) {
+    RecordProperty("description",
+                   "Verifies various payload types including null bytes, 
UTF-8, and binary data are preserved.");
+    const std::string stream_name = "cpp-msg-various-payloads";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<std::uint8_t> payload_null;
+    payload_null.push_back(0x00);
+    payload_null.push_back(0x01);
+    payload_null.push_back(0x00);
+    payload_null.push_back(0xFF);
+
+    rust::Vec<std::uint8_t> payload_binary;
+    payload_binary.push_back(0xDE);
+    payload_binary.push_back(0xAD);
+    payload_binary.push_back(0xBE);
+    payload_binary.push_back(0xEF);
+
+    rust::Vec<iggy::ffi::Message> messages;
+
+    iggy::ffi::Message msg0;
+    msg0.new_message(to_payload("simple ascii"));
+    messages.push_back(std::move(msg0));
+
+    iggy::ffi::Message msg1;
+    msg1.new_message(std::move(payload_null));
+    messages.push_back(std::move(msg1));
+
+    iggy::ffi::Message msg2;
+    msg2.new_message(to_payload("héllo wörld"));
+    messages.push_back(std::move(msg2));
+
+    iggy::ffi::Message msg3;
+    msg3.new_message(std::move(payload_binary));
+    messages.push_back(std::move(msg3));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 4u);
+
+    std::string ascii_actual(polled.messages[0].payload.begin(), 
polled.messages[0].payload.end());
+    EXPECT_EQ(ascii_actual, "simple ascii");
+
+    ASSERT_EQ(polled.messages[1].payload.size(), 4u);
+    EXPECT_EQ(polled.messages[1].payload[0], 0x00);
+    EXPECT_EQ(polled.messages[1].payload[1], 0x01);
+    EXPECT_EQ(polled.messages[1].payload[2], 0x00);
+    EXPECT_EQ(polled.messages[1].payload[3], 0xFF);
+
+    std::string utf8_actual(polled.messages[2].payload.begin(), 
polled.messages[2].payload.end());
+    EXPECT_EQ(utf8_actual, "héllo wörld");
+
+    ASSERT_EQ(polled.messages[3].payload.size(), 4u);
+    EXPECT_EQ(polled.messages[3].payload[0], 0xDE);
+    EXPECT_EQ(polled.messages[3].payload[1], 0xAD);
+    EXPECT_EQ(polled.messages[3].payload[2], 0xBE);
+    EXPECT_EQ(polled.messages[3].payload[3], 0xEF);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesBeforeLoginThrows) {
+    RecordProperty("description", "Throws when polling messages before 
authentication.");
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+    ASSERT_NO_THROW(client->connect());
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(1), 
make_numeric_identifier(0), 0, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidStreamIdThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid stream identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->poll_messages(invalid_id, make_numeric_identifier(0), 
0, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromNonExistentStreamThrows) {
+    RecordProperty("description", "Throws when polling messages from a 
non-existent stream.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    
ASSERT_THROW(client->poll_messages(make_string_identifier("nonexistent-stream-poll"),
 make_numeric_identifier(0), 0,
+                                       "consumer", make_numeric_identifier(1), 
"offset", 0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerKindThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid consumer kind.");
+    const std::string stream_name = "cpp-msg-invalid-consumer";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "invalid",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidStrategyKindThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid polling strategy kind.");
+    const std::string stream_name = "cpp-msg-invalid-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                       make_numeric_identifier(1), "invalid", 
0, 10, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesCountLessThanAvailable) {
+    RecordProperty("description", "Returns only the requested count when fewer 
messages are requested than available.");
+    const std::string stream_name = "cpp-msg-count-less";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 5, false);
+
+    ASSERT_EQ(polled.count, 5u);
+    ASSERT_EQ(polled.messages.size(), 5u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithLargeOffset) {
+    RecordProperty("description", "Returns zero messages when polling with an 
offset beyond available messages.");
+    const std::string stream_name = "cpp-msg-large-offset";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
999999, 100, false);
+
+    ASSERT_EQ(polled.count, 0u);
+    ASSERT_EQ(polled.messages.size(), 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFirstStrategy) {
+    RecordProperty("description", "Verifies first polling strategy returns 
messages from the beginning.");
+    const std::string stream_name = "cpp-msg-first-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "first", 
0, 3, false);
+
+    ASSERT_EQ(polled.count, 3u);
+    ASSERT_EQ(polled.messages.size(), 3u);
+    EXPECT_EQ(polled.messages[0].offset, 0u);
+    for (std::uint32_t i = 0; i < 3; i++) {
+        EXPECT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesLastStrategy) {
+    RecordProperty("description", "Verifies last polling strategy returns 
messages from the end.");
+    const std::string stream_name = "cpp-msg-last-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "last", 0, 
3, false);
+
+    ASSERT_EQ(polled.count, 3u);
+    ASSERT_EQ(polled.messages.size(), 3u);
+    EXPECT_EQ(polled.messages[0].offset, 7u);
+    EXPECT_EQ(polled.messages[2].offset, 9u);
+    for (std::uint32_t i = 0; i < 3; i++) {
+        std::string expected = "msg-" + std::to_string(7 + i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesNextStrategyNoAutoCommit) {
+    RecordProperty("description",
+                   "Verifies next strategy without auto-commit returns the 
same messages on repeated calls.");
+    const std::string stream_name = "cpp-msg-next-no-commit";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled1 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 100, false);
+    ASSERT_EQ(polled1.count, 5u);
+
+    auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 100, false);
+    ASSERT_EQ(polled2.count, 5u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        EXPECT_EQ(polled1.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled1.messages[i].payload.begin(), 
polled1.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "polled1 payload mismatch at index " << 
i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        EXPECT_EQ(polled2.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled2.messages[i].payload.begin(), 
polled2.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "polled2 payload mismatch at index " << 
i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesNextStrategyAutoCommit) {
+    RecordProperty("description", "Verifies next strategy with auto-commit 
advances the offset on subsequent polls.");
+    const std::string stream_name = "cpp-msg-next-auto-commit";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled1 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 5, true);
+    ASSERT_EQ(polled1.count, 5u);
+    EXPECT_EQ(polled1.messages[0].offset, 0u);
+    EXPECT_EQ(polled1.messages[4].offset, 4u);
+
+    auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 5, true);
+    ASSERT_EQ(polled2.count, 5u);
+    EXPECT_EQ(polled2.messages[0].offset, 5u);
+    EXPECT_EQ(polled2.messages[4].offset, 9u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected1 = "msg-" + std::to_string(i);
+        std::string actual1(polled1.messages[i].payload.begin(), 
polled1.messages[i].payload.end());
+        EXPECT_EQ(actual1, expected1) << "polled1 payload mismatch at index " 
<< i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected2 = "msg-" + std::to_string(5 + i);
+        std::string actual2(polled2.messages[i].payload.begin(), 
polled2.messages[i].payload.end());
+        EXPECT_EQ(actual2, expected2) << "polled2 payload mismatch at index " 
<< i;
+    }
+
+    auto polled3 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 5, true);
+    ASSERT_EQ(polled3.count, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesConsumerIdIndependence) {
+    RecordProperty("description", "Verifies different consumer IDs maintain 
independent offsets.");
+    const std::string stream_name = "cpp-msg-consumer-indep";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled_c1 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                           "consumer", 
make_numeric_identifier(1), "next", 0, 3, true);
+    ASSERT_EQ(polled_c1.count, 3u);
+
+    auto polled_c2 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                           "consumer", 
make_numeric_identifier(2), "next", 0, 5, true);
+    ASSERT_EQ(polled_c2.count, 5u);
+
+    auto polled_c1_again = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                                 "consumer", 
make_numeric_identifier(1), "next", 0, 5, true);
+    ASSERT_EQ(polled_c1_again.count, 2u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesMultipleSendsThenPollOrder) {
+    RecordProperty("description", "Verifies message ordering is preserved 
across multiple send batches.");
+    const std::string stream_name = "cpp-msg-multi-batch-order";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> batch1;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("batch1-" + std::to_string(i)));
+        batch1.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch1));
+
+    rust::Vec<iggy::ffi::Message> batch2;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("batch2-" + std::to_string(i)));
+        batch2.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch2));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        EXPECT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i)) << 
"Offset mismatch at index " << i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected = "batch1-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "batch1 payload mismatch at index " << 
i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected = "batch2-" + std::to_string(i);
+        std::string actual(polled.messages[5 + i].payload.begin(), 
polled.messages[5 + i].payload.end());
+        EXPECT_EQ(actual, expected) << "batch2 payload mismatch at index " << 
i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesMultipleCustomIds) {
+    RecordProperty("description", "Verifies multiple messages with distinct 
custom IDs are all preserved.");
+    const std::string stream_name = "cpp-msg-multi-custom-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    const std::uint64_t id_values[] = {100, 200, 300, 400, 500};
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        msg.id_lo = id_values[i];
+        msg.id_hi = 0;
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 5u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        EXPECT_EQ(polled.messages[i].id_lo, id_values[i]) << "ID mismatch at 
index " << i;
+        EXPECT_EQ(polled.messages[i].id_hi, 0u);
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeletedThrows) {
+    RecordProperty("description", "Throws when polling messages after the 
stream has been deleted.");
+    const std::string stream_name = "cpp-msg-deleted-stream";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    std::uint32_t saved_stream_id = stream.id;
+    client->delete_stream(make_numeric_identifier(saved_stream_id));
+
+    
ASSERT_THROW(client->poll_messages(make_numeric_identifier(saved_stream_id), 
make_numeric_identifier(0), 0,
+                                       "consumer", make_numeric_identifier(1), 
"offset", 0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionIdThrows) {
+    RecordProperty("description", "Throws when polling with a non-existent 
partition ID.");
+    const std::string stream_name = "cpp-msg-invalid-partition";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 9999, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithCountZeroThrows) {
+    RecordProperty("description", "Throws when polling with count=0.");
+    const std::string stream_name = "cpp-msg-count-zero";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 0, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithoutSpecifyingPartition) {
+    RecordProperty("description",
+                   "Verifies polling with partition_id=u32::MAX defaults to 
partition 0 and returns messages.");
+    const std::string stream_name = "cpp-msg-no-partition";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), UINT32_MAX,
+                                        "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+
+    ASSERT_EQ(polled.count, 5u);
+    ASSERT_EQ(polled.messages.size(), 5u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesTimestampStrategy) {
+    RecordProperty("description",
+                   "Verifies timestamp polling strategy returns messages with 
timestamp >= the specified value.");
+    const std::string stream_name = "cpp-msg-timestamp-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> batch1;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("batch1-" + std::to_string(i)));
+        batch1.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch1));
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));

Review Comment:
   added an explicit `>= 50ms` gap assert so it fails loudly instead of 
silently degrading.



##########
foreign/cpp/tests/message/low_level_e2e.cpp:
##########
@@ -0,0 +1,1266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstdint>
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) {
+    RecordProperty("description", "Sends 10 messages and polls them back, 
verifying count, offsets, and payloads.");
+    const std::string stream_name = "cpp-msg-roundtrip";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("test message " + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 10u);
+    ASSERT_EQ(polled.messages.size(), 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "test message " + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        ASSERT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) {
+    RecordProperty("description", "Verifies that polled message IDs match the 
sent IDs.");
+    const std::string stream_name = "cpp-msg-verify-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("id-test-message"));
+    msg.id_lo = 42;
+    msg.id_hi = 0;
+    messages.push_back(std::move(msg));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.messages.size(), 1u);
+    ASSERT_EQ(polled.messages[0].id_lo, 42u);
+    ASSERT_EQ(polled.messages[0].id_hi, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) {
+    RecordProperty("description", "Verifies polling from an empty partition 
returns zero messages.");
+    const std::string stream_name = "cpp-msg-empty-poll";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 0u);
+    ASSERT_EQ(polled.messages.size(), 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) {
+    RecordProperty("description", "Verifies send_messages throws when not 
authenticated.");
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+    ASSERT_NO_THROW(client->connect());
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("should-fail"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(1), 
make_numeric_identifier(1), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidStreamId) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid stream identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->send_messages(invalid_id, make_numeric_identifier(1), 
"partition_id", partition_id_bytes(0),
+                                       std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToNonExistentStream) {
+    RecordProperty("description", "Throws when sending messages to a 
non-existent stream.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    
ASSERT_THROW(client->send_messages(make_string_identifier("nonexistent-stream-12345"),
 make_numeric_identifier(0),
+                                       "partition_id", partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningKind) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid partitioning kind.");
+    const std::string stream_name = "cpp-msg-invalid-part-kind";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "invalid_kind",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningValue) {
+    RecordProperty("description", "Throws when sending messages with 
insufficient partitioning value bytes.");
+    const std::string stream_name = "cpp-msg-invalid-part-val";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    rust::Vec<std::uint8_t> short_bytes;
+    short_bytes.push_back(0x00);
+    short_bytes.push_back(0x01);
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       std::move(short_bytes), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToSpecificPartitionVerified) {
+    RecordProperty("description",
+                   "Verifies messages sent to a specific partition are only 
retrievable from that partition.");
+    const std::string stream_name = "cpp-msg-specific-part";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 3, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("partition-test-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled_part0 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part0.count, 5u);
+
+    auto polled_part1 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 1,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part1.count, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendEmptyMessageVectorThrows) {
+    RecordProperty("description", "Throws when sending an empty message 
vector.");
+    const std::string stream_name = "cpp-msg-empty-vec";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> empty_messages;
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(empty_messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithEmptyPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message with an empty 
payload.");
+    const std::string stream_name = "cpp-msg-empty-payload";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    rust::Vec<std::uint8_t> empty_payload;
+    msg.new_message(std::move(empty_payload));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithOversizedPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message exceeding 
maximum payload size.");
+    const std::string stream_name = "cpp-msg-oversized";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<std::uint8_t> oversized_payload;
+    for (std::uint32_t i = 0; i < 64000001u; i++) {
+        oversized_payload.push_back(0x41);
+    }
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(std::move(oversized_payload));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesPreservesOrder) {
+    RecordProperty("description", "Verifies messages are stored and retrieved 
in the order they were sent.");
+    const std::string stream_name = "cpp-msg-order";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 50; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("order-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 50u);
+    for (std::uint32_t i = 0; i < 50; i++) {
+        ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "order-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithDuplicateIds) {
+    RecordProperty("description", "Verifies sending multiple messages with the 
same ID succeeds.");
+    const std::string stream_name = "cpp-msg-dup-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 3; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("dup-id-msg-" + std::to_string(i)));
+        msg.id_lo = 99;
+        msg.id_hi = 0;
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 3u);
+    for (std::size_t i = 0; i < polled.messages.size(); i++) {
+        EXPECT_EQ(polled.messages[i].id_lo, 99u);
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithVariousPayloads) {
+    RecordProperty("description",
+                   "Verifies various payload types including null bytes, 
UTF-8, and binary data are preserved.");
+    const std::string stream_name = "cpp-msg-various-payloads";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<std::uint8_t> payload_null;
+    payload_null.push_back(0x00);
+    payload_null.push_back(0x01);
+    payload_null.push_back(0x00);
+    payload_null.push_back(0xFF);
+
+    rust::Vec<std::uint8_t> payload_binary;
+    payload_binary.push_back(0xDE);
+    payload_binary.push_back(0xAD);
+    payload_binary.push_back(0xBE);
+    payload_binary.push_back(0xEF);
+
+    rust::Vec<iggy::ffi::Message> messages;
+
+    iggy::ffi::Message msg0;
+    msg0.new_message(to_payload("simple ascii"));
+    messages.push_back(std::move(msg0));
+
+    iggy::ffi::Message msg1;
+    msg1.new_message(std::move(payload_null));
+    messages.push_back(std::move(msg1));
+
+    iggy::ffi::Message msg2;
+    msg2.new_message(to_payload("héllo wörld"));
+    messages.push_back(std::move(msg2));
+
+    iggy::ffi::Message msg3;
+    msg3.new_message(std::move(payload_binary));
+    messages.push_back(std::move(msg3));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 4u);
+
+    std::string ascii_actual(polled.messages[0].payload.begin(), 
polled.messages[0].payload.end());
+    EXPECT_EQ(ascii_actual, "simple ascii");
+
+    ASSERT_EQ(polled.messages[1].payload.size(), 4u);
+    EXPECT_EQ(polled.messages[1].payload[0], 0x00);
+    EXPECT_EQ(polled.messages[1].payload[1], 0x01);
+    EXPECT_EQ(polled.messages[1].payload[2], 0x00);
+    EXPECT_EQ(polled.messages[1].payload[3], 0xFF);
+
+    std::string utf8_actual(polled.messages[2].payload.begin(), 
polled.messages[2].payload.end());
+    EXPECT_EQ(utf8_actual, "héllo wörld");
+
+    ASSERT_EQ(polled.messages[3].payload.size(), 4u);
+    EXPECT_EQ(polled.messages[3].payload[0], 0xDE);
+    EXPECT_EQ(polled.messages[3].payload[1], 0xAD);
+    EXPECT_EQ(polled.messages[3].payload[2], 0xBE);
+    EXPECT_EQ(polled.messages[3].payload[3], 0xEF);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesBeforeLoginThrows) {
+    RecordProperty("description", "Throws when polling messages before 
authentication.");
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+    ASSERT_NO_THROW(client->connect());
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(1), 
make_numeric_identifier(0), 0, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidStreamIdThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid stream identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->poll_messages(invalid_id, make_numeric_identifier(0), 
0, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromNonExistentStreamThrows) {
+    RecordProperty("description", "Throws when polling messages from a 
non-existent stream.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    
ASSERT_THROW(client->poll_messages(make_string_identifier("nonexistent-stream-poll"),
 make_numeric_identifier(0), 0,
+                                       "consumer", make_numeric_identifier(1), 
"offset", 0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidConsumerKindThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid consumer kind.");
+    const std::string stream_name = "cpp-msg-invalid-consumer";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "invalid",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidStrategyKindThrows) {
+    RecordProperty("description", "Throws when polling messages with an 
invalid polling strategy kind.");
+    const std::string stream_name = "cpp-msg-invalid-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                       make_numeric_identifier(1), "invalid", 
0, 10, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesCountLessThanAvailable) {
+    RecordProperty("description", "Returns only the requested count when fewer 
messages are requested than available.");
+    const std::string stream_name = "cpp-msg-count-less";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 5, false);
+
+    ASSERT_EQ(polled.count, 5u);
+    ASSERT_EQ(polled.messages.size(), 5u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithLargeOffset) {
+    RecordProperty("description", "Returns zero messages when polling with an 
offset beyond available messages.");
+    const std::string stream_name = "cpp-msg-large-offset";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
999999, 100, false);
+
+    ASSERT_EQ(polled.count, 0u);
+    ASSERT_EQ(polled.messages.size(), 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFirstStrategy) {
+    RecordProperty("description", "Verifies first polling strategy returns 
messages from the beginning.");
+    const std::string stream_name = "cpp-msg-first-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "first", 
0, 3, false);
+
+    ASSERT_EQ(polled.count, 3u);
+    ASSERT_EQ(polled.messages.size(), 3u);
+    EXPECT_EQ(polled.messages[0].offset, 0u);
+    for (std::uint32_t i = 0; i < 3; i++) {
+        EXPECT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesLastStrategy) {
+    RecordProperty("description", "Verifies last polling strategy returns 
messages from the end.");
+    const std::string stream_name = "cpp-msg-last-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "last", 0, 
3, false);
+
+    ASSERT_EQ(polled.count, 3u);
+    ASSERT_EQ(polled.messages.size(), 3u);
+    EXPECT_EQ(polled.messages[0].offset, 7u);
+    EXPECT_EQ(polled.messages[2].offset, 9u);
+    for (std::uint32_t i = 0; i < 3; i++) {
+        std::string expected = "msg-" + std::to_string(7 + i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesNextStrategyNoAutoCommit) {
+    RecordProperty("description",
+                   "Verifies next strategy without auto-commit returns the 
same messages on repeated calls.");
+    const std::string stream_name = "cpp-msg-next-no-commit";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled1 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 100, false);
+    ASSERT_EQ(polled1.count, 5u);
+
+    auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 100, false);
+    ASSERT_EQ(polled2.count, 5u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        EXPECT_EQ(polled1.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled1.messages[i].payload.begin(), 
polled1.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "polled1 payload mismatch at index " << 
i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        EXPECT_EQ(polled2.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled2.messages[i].payload.begin(), 
polled2.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "polled2 payload mismatch at index " << 
i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesNextStrategyAutoCommit) {
+    RecordProperty("description", "Verifies next strategy with auto-commit 
advances the offset on subsequent polls.");
+    const std::string stream_name = "cpp-msg-next-auto-commit";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled1 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 5, true);
+    ASSERT_EQ(polled1.count, 5u);
+    EXPECT_EQ(polled1.messages[0].offset, 0u);
+    EXPECT_EQ(polled1.messages[4].offset, 4u);
+
+    auto polled2 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 5, true);
+    ASSERT_EQ(polled2.count, 5u);
+    EXPECT_EQ(polled2.messages[0].offset, 5u);
+    EXPECT_EQ(polled2.messages[4].offset, 9u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected1 = "msg-" + std::to_string(i);
+        std::string actual1(polled1.messages[i].payload.begin(), 
polled1.messages[i].payload.end());
+        EXPECT_EQ(actual1, expected1) << "polled1 payload mismatch at index " 
<< i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected2 = "msg-" + std::to_string(5 + i);
+        std::string actual2(polled2.messages[i].payload.begin(), 
polled2.messages[i].payload.end());
+        EXPECT_EQ(actual2, expected2) << "polled2 payload mismatch at index " 
<< i;
+    }
+
+    auto polled3 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                         make_numeric_identifier(1), "next", 
0, 5, true);
+    ASSERT_EQ(polled3.count, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesConsumerIdIndependence) {
+    RecordProperty("description", "Verifies different consumer IDs maintain 
independent offsets.");
+    const std::string stream_name = "cpp-msg-consumer-indep";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled_c1 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                           "consumer", 
make_numeric_identifier(1), "next", 0, 3, true);
+    ASSERT_EQ(polled_c1.count, 3u);
+
+    auto polled_c2 = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                           "consumer", 
make_numeric_identifier(2), "next", 0, 5, true);
+    ASSERT_EQ(polled_c2.count, 5u);
+
+    auto polled_c1_again = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                                 "consumer", 
make_numeric_identifier(1), "next", 0, 5, true);
+    ASSERT_EQ(polled_c1_again.count, 2u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesMultipleSendsThenPollOrder) {
+    RecordProperty("description", "Verifies message ordering is preserved 
across multiple send batches.");
+    const std::string stream_name = "cpp-msg-multi-batch-order";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> batch1;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("batch1-" + std::to_string(i)));
+        batch1.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch1));
+
+    rust::Vec<iggy::ffi::Message> batch2;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("batch2-" + std::to_string(i)));
+        batch2.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch2));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        EXPECT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i)) << 
"Offset mismatch at index " << i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected = "batch1-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "batch1 payload mismatch at index " << 
i;
+    }
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected = "batch2-" + std::to_string(i);
+        std::string actual(polled.messages[5 + i].payload.begin(), 
polled.messages[5 + i].payload.end());
+        EXPECT_EQ(actual, expected) << "batch2 payload mismatch at index " << 
i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesMultipleCustomIds) {
+    RecordProperty("description", "Verifies multiple messages with distinct 
custom IDs are all preserved.");
+    const std::string stream_name = "cpp-msg-multi-custom-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    const std::uint64_t id_values[] = {100, 200, 300, 400, 500};
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        msg.id_lo = id_values[i];
+        msg.id_hi = 0;
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 5u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        EXPECT_EQ(polled.messages[i].id_lo, id_values[i]) << "ID mismatch at 
index " << i;
+        EXPECT_EQ(polled.messages[i].id_hi, 0u);
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesAfterStreamDeletedThrows) {
+    RecordProperty("description", "Throws when polling messages after the 
stream has been deleted.");
+    const std::string stream_name = "cpp-msg-deleted-stream";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    std::uint32_t saved_stream_id = stream.id;
+    client->delete_stream(make_numeric_identifier(saved_stream_id));
+
+    
ASSERT_THROW(client->poll_messages(make_numeric_identifier(saved_stream_id), 
make_numeric_identifier(0), 0,
+                                       "consumer", make_numeric_identifier(1), 
"offset", 0, 10, false),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithInvalidPartitionIdThrows) {
+    RecordProperty("description", "Throws when polling with a non-existent 
partition ID.");
+    const std::string stream_name = "cpp-msg-invalid-partition";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 9999, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 10, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithCountZeroThrows) {
+    RecordProperty("description", "Throws when polling with count=0.");
+    const std::string stream_name = "cpp-msg-count-zero";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    ASSERT_THROW(client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                       make_numeric_identifier(1), "offset", 
0, 0, false),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesWithoutSpecifyingPartition) {
+    RecordProperty("description",
+                   "Verifies polling with partition_id=u32::MAX defaults to 
partition 0 and returns messages.");
+    const std::string stream_name = "cpp-msg-no-partition";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("msg-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), UINT32_MAX,
+                                        "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+
+    ASSERT_EQ(polled.count, 5u);
+    ASSERT_EQ(polled.messages.size(), 5u);
+    for (std::uint32_t i = 0; i < 5; i++) {
+        std::string expected = "msg-" + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        EXPECT_EQ(actual, expected) << "Payload mismatch at index " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesTimestampStrategy) {
+    RecordProperty("description",
+                   "Verifies timestamp polling strategy returns messages with 
timestamp >= the specified value.");
+    const std::string stream_name = "cpp-msg-timestamp-strategy";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> batch1;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("batch1-" + std::to_string(i)));
+        batch1.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch1));
+
+    std::this_thread::sleep_for(std::chrono::milliseconds(100));
+
+    rust::Vec<iggy::ffi::Message> batch2;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("batch2-" + std::to_string(i)));
+        batch2.push_back(std::move(msg));
+    }
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(batch2));
+
+    auto all = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                     make_numeric_identifier(1), "offset", 0, 
100, false);
+    ASSERT_EQ(all.count, 10u);
+
+    std::uint64_t batch2_timestamp = all.messages[5].timestamp;
+    ASSERT_GT(batch2_timestamp, all.messages[0].timestamp);
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(2), 
"timestamp", batch2_timestamp, 100, false);
+
+    ASSERT_GE(polled.count, 5u);

Review Comment:
   switched to prefix matching (`batch1-` or `batch2-`) so boundary inclusion 
doesn't cause confusing payload mismatches.



##########
foreign/cpp/tests/message/unit_tests.cpp:
##########
@@ -0,0 +1,161 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+
+TEST(MessageTest, NewMessageSetsPayloadAndLength) {
+    RecordProperty("description", "Verifies new_message sets payload and 
payload_length correctly.");
+    iggy::ffi::Message msg;
+    rust::Vec<std::uint8_t> payload;
+    const std::string text = "hello world";
+    for (const char c : text) {
+        payload.push_back(static_cast<std::uint8_t>(c));
+    }
+
+    msg.new_message(std::move(payload));
+
+    ASSERT_EQ(msg.payload_length, static_cast<std::uint32_t>(text.size()));
+    ASSERT_EQ(msg.payload.size(), text.size());
+    for (std::size_t i = 0; i < text.size(); i++) {
+        EXPECT_EQ(msg.payload[i], static_cast<std::uint8_t>(text[i]));
+    }
+}
+
+TEST(MessageTest, NewMessageZerosHeaderFields) {
+    RecordProperty("description", "Verifies new_message initializes all header 
fields to zero.");
+    iggy::ffi::Message msg;
+    msg.checksum         = 999;
+    msg.id_lo            = 999;
+    msg.id_hi            = 999;
+    msg.offset           = 999;
+    msg.timestamp        = 999;
+    msg.origin_timestamp = 999;
+    msg.reserved         = 999;
+
+    rust::Vec<std::uint8_t> payload;
+    payload.push_back(0x42);
+    msg.new_message(std::move(payload));
+
+    EXPECT_EQ(msg.checksum, 0u);
+    EXPECT_EQ(msg.id_lo, 0u);
+    EXPECT_EQ(msg.id_hi, 0u);
+    EXPECT_EQ(msg.offset, 0u);
+    EXPECT_EQ(msg.timestamp, 0u);
+    EXPECT_EQ(msg.origin_timestamp, 0u);
+    EXPECT_EQ(msg.user_headers_length, 0u);
+    EXPECT_EQ(msg.reserved, 0u);
+    EXPECT_TRUE(msg.user_headers.empty());
+}
+
+TEST(MessageTest, NewMessageWithEmptyPayload) {
+    RecordProperty("description", "Verifies new_message accepts an empty 
payload.");
+    iggy::ffi::Message msg;
+    rust::Vec<std::uint8_t> empty_payload;
+
+    msg.new_message(std::move(empty_payload));
+
+    ASSERT_EQ(msg.payload_length, 0u);
+    ASSERT_EQ(msg.payload.size(), 0u);
+}
+
+TEST(MessageTest, NewMessageWithSingleByte) {
+    RecordProperty("description", "Verifies new_message works with a 
single-byte payload.");
+    iggy::ffi::Message msg;
+    rust::Vec<std::uint8_t> payload;
+    payload.push_back(0xFF);
+
+    msg.new_message(std::move(payload));
+
+    ASSERT_EQ(msg.payload_length, 1u);
+    ASSERT_EQ(msg.payload.size(), 1u);
+    EXPECT_EQ(msg.payload[0], 0xFF);
+}
+
+TEST(MessageTest, NewMessageWithNullBytes) {
+    RecordProperty("description", "Verifies new_message preserves null bytes 
in payload.");
+    iggy::ffi::Message msg;
+    rust::Vec<std::uint8_t> payload;
+    payload.push_back(0x00);
+    payload.push_back(0x01);
+    payload.push_back(0x00);
+
+    msg.new_message(std::move(payload));
+
+    ASSERT_EQ(msg.payload_length, 3u);
+    ASSERT_EQ(msg.payload.size(), 3u);
+    EXPECT_EQ(msg.payload[0], 0x00);
+    EXPECT_EQ(msg.payload[1], 0x01);
+    EXPECT_EQ(msg.payload[2], 0x00);
+}
+
+TEST(MessageTest, NewMessageOverwritesPreviousState) {

Review Comment:
   gone with the api reshape — no more "second call zeros fields" since 
`make_message` returns by value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to