leekeiabstraction commented on code in PR #352: URL: https://github.com/apache/fluss-rust/pull/352#discussion_r2830426053
########## bindings/cpp/test/test_log_table.cpp: ########## @@ -0,0 +1,854 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <arrow/api.h> +#include <gtest/gtest.h> + +#include <algorithm> +#include <chrono> +#include <thread> +#include <tuple> + +#include "test_utils.h" + +class LogTableTest : public ::testing::Test { + protected: + fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } + + fluss::Connection& connection() { + return fluss_test::FlussTestEnvironment::Instance()->GetConnection(); + } +}; + +TEST_F(LogTableTest, AppendRecordBatchAndScan) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_append_record_batch_and_scan_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("c1", fluss::DataType::Int()) + .AddColumn("c2", fluss::DataType::String()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Create append writer + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + // Append Arrow record batches + { + auto c1 = arrow::Int32Builder(); + c1.AppendValues({1, 2, 3}).ok(); + auto c2 = arrow::StringBuilder(); + c2.AppendValues({"a1", "a2", "a3"}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("c1", arrow::int32()), arrow::field("c2", arrow::utf8())}), + 3, {c1.Finish().ValueOrDie(), c2.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + + { + auto c1 = arrow::Int32Builder(); + c1.AppendValues({4, 5, 6}).ok(); + auto c2 = arrow::StringBuilder(); + c2.AppendValues({"a4", "a5", "a6"}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("c1", arrow::int32()), arrow::field("c2", arrow::utf8())}), + 3, {c1.Finish().ValueOrDie(), c2.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + + ASSERT_OK(append_writer.Flush()); + + // Create scanner and subscribe + fluss::Table scan_table; + ASSERT_OK(conn.GetTable(table_path, scan_table)); + int32_t num_buckets = scan_table.GetTableInfo().num_buckets; + + auto table_scan = scan_table.NewScan(); + fluss::LogScanner log_scanner; + ASSERT_OK(table_scan.CreateLogScanner(log_scanner)); + + for (int32_t bucket_id = 0; bucket_id < num_buckets; ++bucket_id) { + ASSERT_OK(log_scanner.Subscribe(bucket_id, fluss::EARLIEST_OFFSET)); + } + + // Poll for records + fluss::ScanRecords scan_records; + ASSERT_OK(log_scanner.Poll(10000, scan_records)); + + ASSERT_EQ(scan_records.Size(), 6u) << "Expected 6 records"; + + // Collect and sort by offset + std::vector<std::pair<int32_t, std::string>> records; + for (size_t i = 0; i < scan_records.Size(); ++i) { + auto rec = scan_records[i]; + records.emplace_back(rec.row.GetInt32(0), std::string(rec.row.GetString(1))); + } + std::sort(records.begin(), records.end()); + + std::vector<int32_t> expected_c1 = {1, 2, 3, 4, 5, 6}; + std::vector<std::string> expected_c2 = {"a1", "a2", "a3", "a4", "a5", "a6"}; + + for (size_t i = 0; i < 6; ++i) { + EXPECT_EQ(records[i].first, expected_c1[i]) << "c1 mismatch at row " << i; + EXPECT_EQ(records[i].second, expected_c2[i]) << "c2 mismatch at row " << i; + } + + // Test unsubscribe + ASSERT_OK(log_scanner.Unsubscribe(0)); + + // Verify unsubscribe_partition fails on a non-partitioned table + auto unsub_result = log_scanner.UnsubscribePartition(0, 0); + ASSERT_FALSE(unsub_result.Ok()) + << "unsubscribe_partition should fail on a non-partitioned table"; + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, ListOffsets) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_list_offsets_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + // Wait for table initialization + std::this_thread::sleep_for(std::chrono::seconds(2)); + + // Earliest offset should be 0 for empty table + std::unordered_map<int32_t, int64_t> earliest_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Earliest(), earliest_offsets)); + EXPECT_EQ(earliest_offsets[0], 0) << "Earliest offset should be 0 for bucket 0"; + + // Latest offset should be 0 for empty table + std::unordered_map<int32_t, int64_t> latest_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), latest_offsets)); + EXPECT_EQ(latest_offsets[0], 0) << "Latest offset should be 0 for empty table"; + + auto before_append_ms = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + // Append records + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + { + auto id_builder = arrow::Int32Builder(); + id_builder.AppendValues({1, 2, 3}).ok(); + auto name_builder = arrow::StringBuilder(); + name_builder.AppendValues({"alice", "bob", "charlie"}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema( + {arrow::field("id", arrow::int32()), arrow::field("name", arrow::utf8())}), + 3, {id_builder.Finish().ValueOrDie(), name_builder.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + ASSERT_OK(append_writer.Flush()); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + auto after_append_ms = + std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::system_clock::now().time_since_epoch()) + .count(); + + // Latest offset after appending should be 3 + std::unordered_map<int32_t, int64_t> latest_after; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Latest(), latest_after)); + EXPECT_EQ(latest_after[0], 3) << "Latest offset should be 3 after appending 3 records"; + + // Earliest offset should still be 0 + std::unordered_map<int32_t, int64_t> earliest_after; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Earliest(), earliest_after)); + EXPECT_EQ(earliest_after[0], 0) << "Earliest offset should still be 0"; + + // Timestamp before append should resolve to offset 0 + std::unordered_map<int32_t, int64_t> ts_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Timestamp(before_append_ms), + ts_offsets)); + EXPECT_EQ(ts_offsets[0], 0) + << "Timestamp before append should resolve to offset 0"; + + // Timestamp after append should resolve to offset 3 + std::unordered_map<int32_t, int64_t> ts_after_offsets; + ASSERT_OK(adm.ListOffsets(table_path, {0}, fluss::OffsetSpec::Timestamp(after_append_ms), + ts_after_offsets)); + EXPECT_EQ(ts_after_offsets[0], 3) + << "Timestamp after append should resolve to offset 3"; + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, TestProject) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_project_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("col_a", fluss::DataType::Int()) + .AddColumn("col_b", fluss::DataType::String()) + .AddColumn("col_c", fluss::DataType::Int()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Append 3 records + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + { + auto col_a_builder = arrow::Int32Builder(); + col_a_builder.AppendValues({1, 2, 3}).ok(); + auto col_b_builder = arrow::StringBuilder(); + col_b_builder.AppendValues({"x", "y", "z"}).ok(); + auto col_c_builder = arrow::Int32Builder(); + col_c_builder.AppendValues({10, 20, 30}).ok(); + + auto batch = arrow::RecordBatch::Make( + arrow::schema({arrow::field("col_a", arrow::int32()), + arrow::field("col_b", arrow::utf8()), + arrow::field("col_c", arrow::int32())}), + 3, + {col_a_builder.Finish().ValueOrDie(), col_b_builder.Finish().ValueOrDie(), + col_c_builder.Finish().ValueOrDie()}); + + ASSERT_OK(append_writer.AppendArrowBatch(batch)); + } + ASSERT_OK(append_writer.Flush()); + + // Test project_by_name: select col_b and col_c only + { + fluss::Table proj_table; + ASSERT_OK(conn.GetTable(table_path, proj_table)); + auto scan = proj_table.NewScan(); + scan.ProjectByName({"col_b", "col_c"}); + fluss::LogScanner scanner; + ASSERT_OK(scan.CreateLogScanner(scanner)); + + ASSERT_OK(scanner.Subscribe(0, 0)); + + fluss::ScanRecords records; + ASSERT_OK(scanner.Poll(10000, records)); + + ASSERT_EQ(records.Size(), 3u) << "Should have 3 records with project_by_name"; + + std::vector<std::string> expected_col_b = {"x", "y", "z"}; + std::vector<int32_t> expected_col_c = {10, 20, 30}; + + // Collect and sort by col_c to get deterministic order + std::vector<std::pair<std::string, int32_t>> collected; + for (size_t i = 0; i < records.Size(); ++i) { + auto rec = records[i]; + collected.emplace_back(std::string(rec.row.GetString(0)), rec.row.GetInt32(1)); + } + std::sort(collected.begin(), collected.end(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(collected[i].first, expected_col_b[i]) << "col_b mismatch at index " << i; + EXPECT_EQ(collected[i].second, expected_col_c[i]) << "col_c mismatch at index " << i; + } + } + + // Test project by column indices: select col_b (1) and col_a (0) in that order + { + fluss::Table proj_table; + ASSERT_OK(conn.GetTable(table_path, proj_table)); + auto scan = proj_table.NewScan(); + scan.ProjectByIndex({1, 0}); + fluss::LogScanner scanner; + ASSERT_OK(scan.CreateLogScanner(scanner)); + + ASSERT_OK(scanner.Subscribe(0, 0)); + + fluss::ScanRecords records; + ASSERT_OK(scanner.Poll(10000, records)); + + ASSERT_EQ(records.Size(), 3u); + + std::vector<std::string> expected_col_b = {"x", "y", "z"}; + std::vector<int32_t> expected_col_a = {1, 2, 3}; + + std::vector<std::pair<std::string, int32_t>> collected; + for (size_t i = 0; i < records.Size(); ++i) { + auto rec = records[i]; + collected.emplace_back(std::string(rec.row.GetString(0)), rec.row.GetInt32(1)); + } + std::sort(collected.begin(), collected.end(), + [](const auto& a, const auto& b) { return a.second < b.second; }); + + for (size_t i = 0; i < 3; ++i) { + EXPECT_EQ(collected[i].first, expected_col_b[i]) << "col_b mismatch at index " << i; + EXPECT_EQ(collected[i].second, expected_col_a[i]) << "col_a mismatch at index " << i; + } + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, TestPollBatches) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_poll_batches_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + std::this_thread::sleep_for(std::chrono::seconds(1)); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto scan = table.NewScan(); + fluss::LogScanner scanner; + ASSERT_OK(scan.CreateRecordBatchLogScanner(scanner)); + ASSERT_OK(scanner.Subscribe(0, 0)); + + // Test 1: Empty table should return empty result + { + fluss::ArrowRecordBatches batches; + ASSERT_OK(scanner.PollRecordBatch(500, batches)); + ASSERT_TRUE(batches.Empty()); + } + + // Append data + auto table_append = table.NewAppend(); + fluss::AppendWriter writer; + ASSERT_OK(table_append.CreateWriter(writer)); + + auto make_batch = [](std::vector<int32_t> ids, std::vector<std::string> names) { + auto id_builder = arrow::Int32Builder(); + id_builder.AppendValues(ids).ok(); + auto name_builder = arrow::StringBuilder(); + name_builder.AppendValues(names).ok(); + return arrow::RecordBatch::Make( + arrow::schema( + {arrow::field("id", arrow::int32()), arrow::field("name", arrow::utf8())}), + static_cast<int64_t>(ids.size()), + {id_builder.Finish().ValueOrDie(), name_builder.Finish().ValueOrDie()}); + }; + + ASSERT_OK(writer.AppendArrowBatch(make_batch({1, 2}, {"a", "b"}))); + ASSERT_OK(writer.AppendArrowBatch(make_batch({3, 4}, {"c", "d"}))); + ASSERT_OK(writer.AppendArrowBatch(make_batch({5, 6}, {"e", "f"}))); + ASSERT_OK(writer.Flush()); + + // Extract ids from Arrow batches + auto extract_ids = [](const fluss::ArrowRecordBatches& batches) { + std::vector<int32_t> ids; + for (const auto& batch : batches) { + auto arr = + std::static_pointer_cast<arrow::Int32Array>(batch->GetArrowRecordBatch()->column(0)); + for (int64_t i = 0; i < arr->length(); ++i) { + ids.push_back(arr->Value(i)); + } + } + return ids; + }; + + // Test 2: Poll until we get all 6 records + std::vector<int32_t> all_ids; + auto deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (all_ids.size() < 6 && std::chrono::steady_clock::now() < deadline) { + fluss::ArrowRecordBatches batches; + ASSERT_OK(scanner.PollRecordBatch(5000, batches)); + auto ids = extract_ids(batches); + all_ids.insert(all_ids.end(), ids.begin(), ids.end()); + } + ASSERT_EQ(all_ids, (std::vector<int32_t>{1, 2, 3, 4, 5, 6})); + + // Test 3: Append more and verify offset continuation (no duplicates) + ASSERT_OK(writer.AppendArrowBatch(make_batch({7, 8}, {"g", "h"}))); + ASSERT_OK(writer.Flush()); + + std::vector<int32_t> new_ids; + deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (new_ids.size() < 2 && std::chrono::steady_clock::now() < deadline) { + fluss::ArrowRecordBatches batches; + ASSERT_OK(scanner.PollRecordBatch(5000, batches)); + auto ids = extract_ids(batches); + new_ids.insert(new_ids.end(), ids.begin(), ids.end()); + } + ASSERT_EQ(new_ids, (std::vector<int32_t>{7, 8})); + + // Test 4: Subscribing from mid-offset should truncate batch + { + fluss::Table trunc_table; + ASSERT_OK(conn.GetTable(table_path, trunc_table)); + auto trunc_scan = trunc_table.NewScan(); + fluss::LogScanner trunc_scanner; + ASSERT_OK(trunc_scan.CreateRecordBatchLogScanner(trunc_scanner)); + ASSERT_OK(trunc_scanner.Subscribe(0, 3)); + + std::vector<int32_t> trunc_ids; + deadline = std::chrono::steady_clock::now() + std::chrono::seconds(10); + while (trunc_ids.size() < 5 && std::chrono::steady_clock::now() < deadline) { + fluss::ArrowRecordBatches batches; + ASSERT_OK(trunc_scanner.PollRecordBatch(5000, batches)); + auto ids = extract_ids(batches); + trunc_ids.insert(trunc_ids.end(), ids.begin(), ids.end()); + } + ASSERT_EQ(trunc_ids, (std::vector<int32_t>{4, 5, 6, 7, 8})); + } + + // Test 5: Projection should only return requested columns + { + fluss::Table proj_table; + ASSERT_OK(conn.GetTable(table_path, proj_table)); + auto proj_scan = proj_table.NewScan(); + proj_scan.ProjectByName({"id"}); + fluss::LogScanner proj_scanner; + ASSERT_OK(proj_scan.CreateRecordBatchLogScanner(proj_scanner)); + ASSERT_OK(proj_scanner.Subscribe(0, 0)); + + fluss::ArrowRecordBatches proj_batches; + ASSERT_OK(proj_scanner.PollRecordBatch(10000, proj_batches)); + + ASSERT_FALSE(proj_batches.Empty()); + EXPECT_EQ(proj_batches[0]->GetArrowRecordBatch()->num_columns(), 1) + << "Projected batch should have 1 column (id), not 2"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(LogTableTest, AllSupportedDatatypes) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_log_all_datatypes_cpp"); + + // Create a log table with all supported datatypes + auto schema = + fluss::Schema::NewBuilder() + .AddColumn("col_tinyint", fluss::DataType::TinyInt()) + .AddColumn("col_smallint", fluss::DataType::SmallInt()) + .AddColumn("col_int", fluss::DataType::Int()) + .AddColumn("col_bigint", fluss::DataType::BigInt()) + .AddColumn("col_float", fluss::DataType::Float()) + .AddColumn("col_double", fluss::DataType::Double()) + .AddColumn("col_boolean", fluss::DataType::Boolean()) + .AddColumn("col_char", fluss::DataType::Char(10)) + .AddColumn("col_string", fluss::DataType::String()) + .AddColumn("col_decimal", fluss::DataType::Decimal(10, 2)) + .AddColumn("col_date", fluss::DataType::Date()) + .AddColumn("col_time", fluss::DataType::Time()) + .AddColumn("col_timestamp", fluss::DataType::Timestamp()) + .AddColumn("col_timestamp_ltz", fluss::DataType::TimestampLtz()) + .AddColumn("col_bytes", fluss::DataType::Bytes()) + .AddColumn("col_binary", fluss::DataType::Binary(4)) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + size_t field_count = table.GetTableInfo().schema.columns.size(); + + auto table_append = table.NewAppend(); + fluss::AppendWriter append_writer; + ASSERT_OK(table_append.CreateWriter(append_writer)); + + // Test data + int32_t col_tinyint = 127; + int32_t col_smallint = 32767; + int32_t col_int = 2147483647; + int64_t col_bigint = 9223372036854775807LL; + float col_float = 3.14f; + double col_double = 2.718281828459045; + bool col_boolean = true; + std::string col_char = "hello"; + std::string col_string = "world of fluss rust client"; + std::string col_decimal = "123.45"; + auto col_date = fluss::Date::FromDays(20476); // 2026-01-23 + auto col_time = fluss::Time::FromMillis(36827000); // 10:13:47 + auto col_timestamp = fluss::Timestamp::FromMillisNanos(1769163227123, 456000); + auto col_timestamp_ltz = fluss::Timestamp::FromMillisNanos(1769163227123, 456000); + std::vector<uint8_t> col_bytes = {'b', 'i', 'n', 'a', 'r', 'y', ' ', 'd', 'a', 't', 'a'}; + std::vector<uint8_t> col_binary = {0xDE, 0xAD, 0xBE, 0xEF}; + + // Append a row with all datatypes + { + fluss::GenericRow row(field_count); + row.SetInt32(0, col_tinyint); + row.SetInt32(1, col_smallint); + row.SetInt32(2, col_int); + row.SetInt64(3, col_bigint); + row.SetFloat32(4, col_float); + row.SetFloat64(5, col_double); + row.SetBool(6, col_boolean); + row.SetString(7, col_char); + row.SetString(8, col_string); + row.SetDecimal(9, col_decimal); + row.SetDate(10, col_date); + row.SetTime(11, col_time); + row.SetTimestampNtz(12, col_timestamp); + row.SetTimestampLtz(13, col_timestamp_ltz); + row.SetBytes(14, col_bytes); + row.SetBytes(15, col_binary); + ASSERT_OK(append_writer.Append(row)); + } + + // Append a row with null values + { + fluss::GenericRow row_with_nulls(field_count); + for (size_t i = 0; i < field_count; ++i) { + row_with_nulls.SetNull(i); + } + ASSERT_OK(append_writer.Append(row_with_nulls)); + } + + ASSERT_OK(append_writer.Flush()); + + // Scan the records + fluss::Table scan_table; + ASSERT_OK(conn.GetTable(table_path, scan_table)); + auto table_scan = scan_table.NewScan(); + fluss::LogScanner log_scanner; + ASSERT_OK(table_scan.CreateLogScanner(log_scanner)); + ASSERT_OK(log_scanner.Subscribe(0, 0)); Review Comment: This particular part deals with data types testing, bucket subscription testing should not matter. As for the other parts, agreed. The pattern adopted from initial rust test, I'll update them. ########## bindings/cpp/test/test_kv_table.cpp: ########## @@ -0,0 +1,643 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <gtest/gtest.h> + +#include "test_utils.h" + +class KvTableTest : public ::testing::Test { + protected: + fluss::Admin& admin() { return fluss_test::FlussTestEnvironment::Instance()->GetAdmin(); } + + fluss::Connection& connection() { + return fluss_test::FlussTestEnvironment::Instance()->GetConnection(); + } +}; + +TEST_F(KvTableTest, UpsertDeleteAndLookup) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_upsert_and_lookup_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("age", fluss::DataType::BigInt()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Create upsert writer + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Upsert 3 rows (fire-and-forget, then flush) + struct TestData { + int32_t id; + std::string name; + int64_t age; + }; + std::vector<TestData> test_data = {{1, "Verso", 32}, {2, "Noco", 25}, {3, "Esquie", 35}}; + + for (const auto& d : test_data) { + fluss::GenericRow row(3); + row.SetInt32(0, d.id); + row.SetString(1, d.name); + row.SetInt64(2, d.age); + ASSERT_OK(upsert_writer.Upsert(row)); + } + ASSERT_OK(upsert_writer.Flush()); + + // Create lookuper + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + // Verify lookup results + for (const auto& d : test_data) { + fluss::GenericRow key(3); + key.SetInt32(0, d.id); + + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()) << "Row with id=" << d.id << " should exist"; + + EXPECT_EQ(result.GetInt32(0), d.id) << "id mismatch"; + EXPECT_EQ(result.GetString(1), d.name) << "name mismatch"; + EXPECT_EQ(result.GetInt64(2), d.age) << "age mismatch"; + } + + // Update record with id=1 (await acknowledgment) + { + fluss::GenericRow updated_row(3); + updated_row.SetInt32(0, 1); + updated_row.SetString(1, "Verso"); + updated_row.SetInt64(2, 33); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(updated_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify the update + { + fluss::GenericRow key(3); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64(2), 33) << "Age should be updated"; + EXPECT_EQ(result.GetString(1), "Verso") << "Name should remain unchanged"; + } + + // Delete record with id=1 (await acknowledgment) + { + fluss::GenericRow delete_row(3); + delete_row.SetInt32(0, 1); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Delete(delete_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify deletion + { + fluss::GenericRow key(3); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_FALSE(result.Found()) << "Record 1 should not exist after delete"; + } + + // Verify other records still exist + for (int id : {2, 3}) { + fluss::GenericRow key(3); + key.SetInt32(0, id); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()) << "Record " << id + << " should still exist after deleting record 1"; + } + + // Lookup non-existent key + { + fluss::GenericRow key(3); + key.SetInt32(0, 999); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_FALSE(result.Found()) << "Non-existent key should return not found"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, CompositePrimaryKeys) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_composite_pk_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("region", fluss::DataType::String()) + .AddColumn("score", fluss::DataType::BigInt()) + .AddColumn("user_id", fluss::DataType::Int()) + .SetPrimaryKeys({"region", "user_id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + // Insert records with composite keys + struct TestData { + std::string region; + int32_t user_id; + int64_t score; + }; + std::vector<TestData> test_data = { + {"US", 1, 100}, {"US", 2, 200}, {"EU", 1, 150}, {"EU", 2, 250}}; + + for (const auto& d : test_data) { + auto row = table.NewRow(); + row.Set("region", d.region); + row.Set("score", d.score); + row.Set("user_id", d.user_id); + ASSERT_OK(upsert_writer.Upsert(row)); + } + ASSERT_OK(upsert_writer.Flush()); + + // Create lookuper + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + // Lookup (US, 1) - should return score 100 + { + auto key = table.NewRow(); + key.Set("region", "US"); + key.Set("user_id", 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64("score"), 100) << "Score for (US, 1) should be 100"; + } + + // Lookup (EU, 2) - should return score 250 + { + auto key = table.NewRow(); + key.Set("region", "EU"); + key.Set("user_id", 2); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64("score"), 250) << "Score for (EU, 2) should be 250"; + } + + // Update (US, 1) score (await acknowledgment) + { + auto update_row = table.NewRow(); + update_row.Set("region", "US"); + update_row.Set("user_id", 1); + update_row.Set("score", static_cast<int64_t>(500)); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(update_row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify update + { + auto key = table.NewRow(); + key.Set("region", "US"); + key.Set("user_id", 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt64("score"), 500) << "Row score should be updated"; + } + + ASSERT_OK(adm.DropTable(table_path, false)); +} + +TEST_F(KvTableTest, PartialUpdate) { + auto& adm = admin(); + auto& conn = connection(); + + fluss::TablePath table_path("fluss", "test_partial_update_cpp"); + + auto schema = fluss::Schema::NewBuilder() + .AddColumn("id", fluss::DataType::Int()) + .AddColumn("name", fluss::DataType::String()) + .AddColumn("age", fluss::DataType::BigInt()) + .AddColumn("score", fluss::DataType::BigInt()) + .SetPrimaryKeys({"id"}) + .Build(); + + auto table_descriptor = fluss::TableDescriptor::NewBuilder() + .SetSchema(schema) + .SetProperty("table.replication.factor", "1") + .Build(); + + fluss_test::CreateTable(adm, table_path, table_descriptor); + + fluss::Table table; + ASSERT_OK(conn.GetTable(table_path, table)); + + // Insert initial record with all columns + auto table_upsert = table.NewUpsert(); + fluss::UpsertWriter upsert_writer; + ASSERT_OK(table_upsert.CreateWriter(upsert_writer)); + + { + fluss::GenericRow row(4); + row.SetInt32(0, 1); + row.SetString(1, "Verso"); + row.SetInt64(2, 32); + row.SetInt64(3, 6942); + fluss::WriteResult wr; + ASSERT_OK(upsert_writer.Upsert(row, wr)); + ASSERT_OK(wr.Wait()); + } + + // Verify initial record + fluss::Lookuper lookuper; + ASSERT_OK(table.NewLookup().CreateLookuper(lookuper)); + + { + fluss::GenericRow key(4); + key.SetInt32(0, 1); + fluss::LookupResult result; + ASSERT_OK(lookuper.Lookup(key, result)); + ASSERT_TRUE(result.Found()); + EXPECT_EQ(result.GetInt32(0), 1); + EXPECT_EQ(result.GetString(1), "Verso"); + EXPECT_EQ(result.GetInt64(2), 32); + EXPECT_EQ(result.GetInt64(3), 6942); + } + + // Create partial update writer to update only score column + auto partial_upsert = table.NewUpsert(); + partial_upsert.PartialUpdateByName({"id", "score"}); Review Comment: Addressed in latest push -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
