slbotbm commented on code in PR #3046:
URL: https://github.com/apache/iggy/pull/3046#discussion_r3214970312


##########
foreign/cpp/tests/message/low_level_e2e.cpp:
##########
@@ -0,0 +1,1266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstdint>
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) {
+    RecordProperty("description", "Sends 10 messages and polls them back, 
verifying count, offsets, and payloads.");
+    const std::string stream_name = "cpp-msg-roundtrip";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("test message " + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 10u);
+    ASSERT_EQ(polled.messages.size(), 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "test message " + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        ASSERT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) {
+    RecordProperty("description", "Verifies that polled message IDs match the 
sent IDs.");
+    const std::string stream_name = "cpp-msg-verify-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("id-test-message"));
+    msg.id_lo = 42;
+    msg.id_hi = 0;
+    messages.push_back(std::move(msg));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.messages.size(), 1u);
+    ASSERT_EQ(polled.messages[0].id_lo, 42u);
+    ASSERT_EQ(polled.messages[0].id_hi, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) {
+    RecordProperty("description", "Verifies polling from an empty partition 
returns zero messages.");
+    const std::string stream_name = "cpp-msg-empty-poll";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 0u);
+    ASSERT_EQ(polled.messages.size(), 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) {
+    RecordProperty("description", "Verifies send_messages throws when not 
authenticated.");
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+    ASSERT_NO_THROW(client->connect());
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("should-fail"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(1), 
make_numeric_identifier(1), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidStreamId) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid stream identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->send_messages(invalid_id, make_numeric_identifier(1), 
"partition_id", partition_id_bytes(0),
+                                       std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToNonExistentStream) {
+    RecordProperty("description", "Throws when sending messages to a 
non-existent stream.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    
ASSERT_THROW(client->send_messages(make_string_identifier("nonexistent-stream-12345"),
 make_numeric_identifier(0),
+                                       "partition_id", partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningKind) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid partitioning kind.");
+    const std::string stream_name = "cpp-msg-invalid-part-kind";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "invalid_kind",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningValue) {
+    RecordProperty("description", "Throws when sending messages with 
insufficient partitioning value bytes.");
+    const std::string stream_name = "cpp-msg-invalid-part-val";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    rust::Vec<std::uint8_t> short_bytes;
+    short_bytes.push_back(0x00);
+    short_bytes.push_back(0x01);
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       std::move(short_bytes), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToSpecificPartitionVerified) {
+    RecordProperty("description",
+                   "Verifies messages sent to a specific partition are only 
retrievable from that partition.");
+    const std::string stream_name = "cpp-msg-specific-part";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 3, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("partition-test-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled_part0 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part0.count, 5u);
+
+    auto polled_part1 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 1,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part1.count, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendEmptyMessageVectorThrows) {
+    RecordProperty("description", "Throws when sending an empty message 
vector.");
+    const std::string stream_name = "cpp-msg-empty-vec";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> empty_messages;
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(empty_messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithEmptyPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message with an empty 
payload.");
+    const std::string stream_name = "cpp-msg-empty-payload";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    rust::Vec<std::uint8_t> empty_payload;
+    msg.new_message(std::move(empty_payload));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithOversizedPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message exceeding 
maximum payload size.");
+    const std::string stream_name = "cpp-msg-oversized";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<std::uint8_t> oversized_payload;
+    for (std::uint32_t i = 0; i < 64000001u; i++) {

Review Comment:
   @seokjin0414 cxx does have a reserve function, and the following code can 
utilize it to pre-reserve 64MB + 1 capacity:
   ```cpp
   constexpr std::uint32_t kOversizedPayloadBytes = 64'000'001u;
       rust::Vec<std::uint8_t> oversized_payload;
       oversized_payload.reserve(kOversizedPayloadBytes);
       for (std::uint32_t i = 0; i < kOversizedPayloadBytes; i++) {
           oversized_payload.push_back(0x41);
       }
   ```



##########
foreign/cpp/tests/message/low_level_e2e.cpp:
##########
@@ -0,0 +1,1266 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <chrono>
+#include <cstdint>
+#include <string>
+#include <thread>
+
+#include <gtest/gtest.h>
+
+#include "lib.rs.h"
+#include "tests/common/test_helpers.hpp"
+
+TEST(LowLevelE2E_Message, SendAndPollMessagesRoundTrip) {
+    RecordProperty("description", "Sends 10 messages and polls them back, 
verifying count, offsets, and payloads.");
+    const std::string stream_name = "cpp-msg-roundtrip";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 10; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("test message " + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    ASSERT_NO_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0),
+                                          "partition_id", 
partition_id_bytes(0), std::move(messages)));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 10u);
+    ASSERT_EQ(polled.messages.size(), 10u);
+    for (std::uint32_t i = 0; i < 10; i++) {
+        ASSERT_EQ(polled.messages[i].offset, static_cast<std::uint64_t>(i));
+        std::string expected = "test message " + std::to_string(i);
+        std::string actual(polled.messages[i].payload.begin(), 
polled.messages[i].payload.end());
+        ASSERT_EQ(actual, expected) << "Payload mismatch at offset " << i;
+    }
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesVerifyMessageIds) {
+    RecordProperty("description", "Verifies that polled message IDs match the 
sent IDs.");
+    const std::string stream_name = "cpp-msg-verify-ids";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("id-test-message"));
+    msg.id_lo = 42;
+    msg.id_hi = 0;
+    messages.push_back(std::move(msg));
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.messages.size(), 1u);
+    ASSERT_EQ(polled.messages[0].id_lo, 42u);
+    ASSERT_EQ(polled.messages[0].id_hi, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, PollMessagesFromEmptyPartition) {
+    RecordProperty("description", "Verifies polling from an empty partition 
returns zero messages.");
+    const std::string stream_name = "cpp-msg-empty-poll";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    auto polled = client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0, "consumer",
+                                        make_numeric_identifier(1), "offset", 
0, 100, false);
+
+    ASSERT_EQ(polled.count, 0u);
+    ASSERT_EQ(polled.messages.size(), 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesBeforeLoginThrows) {
+    RecordProperty("description", "Verifies send_messages throws when not 
authenticated.");
+    iggy::ffi::Client *client = nullptr;
+    ASSERT_NO_THROW({ client = iggy::ffi::new_connection(""); });
+    ASSERT_NE(client, nullptr);
+    ASSERT_NO_THROW(client->connect());
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("should-fail"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(1), 
make_numeric_identifier(1), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidStreamId) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid stream identifier.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    iggy::ffi::Identifier invalid_id;
+    invalid_id.kind   = "invalid";
+    invalid_id.length = 0;
+
+    ASSERT_THROW(client->send_messages(invalid_id, make_numeric_identifier(1), 
"partition_id", partition_id_bytes(0),
+                                       std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToNonExistentStream) {
+    RecordProperty("description", "Throws when sending messages to a 
non-existent stream.");
+    iggy::ffi::Client *client = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    
ASSERT_THROW(client->send_messages(make_string_identifier("nonexistent-stream-12345"),
 make_numeric_identifier(0),
+                                       "partition_id", partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningKind) {
+    RecordProperty("description", "Throws when sending messages with an 
invalid partitioning kind.");
+    const std::string stream_name = "cpp-msg-invalid-part-kind";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "invalid_kind",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesWithInvalidPartitioningValue) {
+    RecordProperty("description", "Throws when sending messages with 
insufficient partitioning value bytes.");
+    const std::string stream_name = "cpp-msg-invalid-part-val";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    msg.new_message(to_payload("test"));
+    messages.push_back(std::move(msg));
+
+    rust::Vec<std::uint8_t> short_bytes;
+    short_bytes.push_back(0x00);
+    short_bytes.push_back(0x01);
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       std::move(short_bytes), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessagesToSpecificPartitionVerified) {
+    RecordProperty("description",
+                   "Verifies messages sent to a specific partition are only 
retrievable from that partition.");
+    const std::string stream_name = "cpp-msg-specific-part";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 3, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    for (std::uint32_t i = 0; i < 5; i++) {
+        iggy::ffi::Message msg;
+        msg.new_message(to_payload("partition-test-" + std::to_string(i)));
+        messages.push_back(std::move(msg));
+    }
+
+    client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                          partition_id_bytes(0), std::move(messages));
+
+    auto polled_part0 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 0,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part0.count, 5u);
+
+    auto polled_part1 = 
client->poll_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), 1,
+                                              "consumer", 
make_numeric_identifier(1), "offset", 0, 100, false);
+    ASSERT_EQ(polled_part1.count, 0u);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendEmptyMessageVectorThrows) {
+    RecordProperty("description", "Throws when sending an empty message 
vector.");
+    const std::string stream_name = "cpp-msg-empty-vec";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> empty_messages;
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(empty_messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithEmptyPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message with an empty 
payload.");
+    const std::string stream_name = "cpp-msg-empty-payload";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<iggy::ffi::Message> messages;
+    iggy::ffi::Message msg;
+    rust::Vec<std::uint8_t> empty_payload;
+    msg.new_message(std::move(empty_payload));
+    messages.push_back(std::move(msg));
+
+    ASSERT_THROW(client->send_messages(make_numeric_identifier(stream.id), 
make_numeric_identifier(0), "partition_id",
+                                       partition_id_bytes(0), 
std::move(messages)),
+                 std::exception);
+
+    client->delete_stream(make_numeric_identifier(stream.id));
+    ASSERT_NO_THROW(iggy::ffi::delete_connection(client));
+}
+
+TEST(LowLevelE2E_Message, SendMessageWithOversizedPayloadThrows) {
+    RecordProperty("description", "Throws when sending a message exceeding 
maximum payload size.");
+    const std::string stream_name = "cpp-msg-oversized";
+    iggy::ffi::Client *client     = login_to_server();
+    ASSERT_NE(client, nullptr);
+
+    client->create_stream(stream_name);
+    auto stream = client->get_stream(make_string_identifier(stream_name));
+    client->create_topic(make_numeric_identifier(stream.id), "test-topic", 1, 
"none", 0, "never_expire", 0,
+                         "server_default");
+
+    rust::Vec<std::uint8_t> oversized_payload;
+    for (std::uint32_t i = 0; i < 64000001u; i++) {

Review Comment:
   
   @hubcio there are many more places in the tests where magic numbers such as 
64000001 are being used. Instead of applying a one-off fix for this case, I 
would like to address them later.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to