This is an automated email from the ASF dual-hosted git repository. yecol pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-graphar.git
The following commit(s) were added to refs/heads/main by this push: new 294e90b3 feat(c++): add function and example for read and write label chunk (#650) 294e90b3 is described below commit 294e90b3bdf8ae9c2f6f9d01667235b17507896a Author: Elssky <43638383+els...@users.noreply.github.com> AuthorDate: Tue Oct 29 16:13:35 2024 +0800 feat(c++): add function and example for read and write label chunk (#650) --- cpp/CMakeLists.txt | 1 + cpp/src/graphar/arrow/chunk_reader.cc | 76 +++++++++++++++++++++ cpp/src/graphar/arrow/chunk_reader.h | 49 +++++++++++++- cpp/src/graphar/arrow/chunk_writer.cc | 123 +++++++++++++++++++++++++++++++++- cpp/src/graphar/arrow/chunk_writer.h | 44 ++++++++++++ cpp/src/graphar/filesystem.cc | 19 +++++- cpp/src/graphar/filesystem.h | 9 +++ cpp/src/graphar/general_params.h | 2 +- cpp/test/test_multi_label.cc | 96 ++++++++++++++++++++++++++ 9 files changed, 415 insertions(+), 4 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index d162392b..cd12d486 100644 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -458,6 +458,7 @@ if (BUILD_TESTS) add_test(test_chunk_info_reader SRCS test/test_chunk_info_reader.cc) add_test(test_arrow_chunk_reader SRCS test/test_arrow_chunk_reader.cc) add_test(test_graph SRCS test/test_graph.cc) + add_test(test_multi_label SRCS test/test_multi_label.cc) # enable_testing() endif() diff --git a/cpp/src/graphar/arrow/chunk_reader.cc b/cpp/src/graphar/arrow/chunk_reader.cc index f4add8a4..ff6adab4 100644 --- a/cpp/src/graphar/arrow/chunk_reader.cc +++ b/cpp/src/graphar/arrow/chunk_reader.cc @@ -51,6 +51,18 @@ Result<std::shared_ptr<arrow::Schema>> PropertyGroupToSchema( return arrow::schema(fields); } +Result<std::shared_ptr<arrow::Schema>> LabelToSchema( + std::vector<std::string> labels, bool contain_index_column = false) { + std::vector<std::shared_ptr<arrow::Field>> fields; + if (contain_index_column) { + fields.push_back(std::make_shared<arrow::Field>( + GeneralParams::kVertexIndexCol, arrow::int64())); + } + for (const auto& lab : labels) { + fields.push_back(std::make_shared<arrow::Field>(lab, arrow::boolean())); + } + return arrow::schema(fields); +} Status GeneralCast(const std::shared_ptr<arrow::Array>& in, const std::shared_ptr<arrow::DataType>& to_type, std::shared_ptr<arrow::Array>* out) { @@ -148,6 +160,29 @@ VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader( PropertyGroupToSchema(property_group_, true)); } +// initialize for labels +VertexPropertyArrowChunkReader::VertexPropertyArrowChunkReader( + const std::shared_ptr<VertexInfo>& vertex_info, + const std::vector<std::string>& labels, const std::string& prefix, + const util::FilterOptions& options) + : vertex_info_(std::move(vertex_info)), + labels_(labels), + chunk_index_(0), + seek_id_(0), + schema_(nullptr), + chunk_table_(nullptr), + filter_options_(options) { + GAR_ASSIGN_OR_RAISE_ERROR(fs_, FileSystemFromUriOrPath(prefix, &prefix_)); + + std::string base_dir = prefix_ + vertex_info_->GetPrefix() + "labels/chunk" + + std::to_string(chunk_index_); + GAR_ASSIGN_OR_RAISE_ERROR(chunk_num_, + util::GetVertexChunkNum(prefix_, vertex_info)); + GAR_ASSIGN_OR_RAISE_ERROR(vertex_num_, + util::GetVertexNum(prefix_, vertex_info_)); + GAR_ASSIGN_OR_RAISE_ERROR(schema_, LabelToSchema(labels)); +} + Status VertexPropertyArrowChunkReader::seek(IdType id) { seek_id_ = id; IdType pre_chunk_index = chunk_index_; @@ -186,6 +221,24 @@ VertexPropertyArrowChunkReader::GetChunk() { return chunk_table_->Slice(row_offset); } +Result<std::shared_ptr<arrow::Table>> +VertexPropertyArrowChunkReader::GetLabelChunk() { + FileType filetype = FileType::PARQUET; + if (chunk_table_ == nullptr) { + std::string path = prefix_ + vertex_info_->GetPrefix() + "labels/chunk" + + std::to_string(chunk_index_); + GAR_ASSIGN_OR_RAISE(chunk_table_, + fs_->ReadFileToTable(path, filetype, filter_options_)); + // TODO(acezen): filter pushdown doesn't support cast schema now + // if (schema_ != nullptr && filter_options_.filter == nullptr) { + // GAR_RETURN_NOT_OK( + // CastTableWithSchema(chunk_table_, schema_, &chunk_table_)); + // } + } + IdType row_offset = seek_id_ - chunk_index_ * vertex_info_->GetChunkSize(); + return chunk_table_->Slice(row_offset); +} + Status VertexPropertyArrowChunkReader::next_chunk() { if (++chunk_index_ >= chunk_num_) { return Status::IndexError( @@ -247,6 +300,29 @@ VertexPropertyArrowChunkReader::Make( return Make(vertex_info, property_group, graph_info->GetPrefix(), options); } +Result<std::shared_ptr<VertexPropertyArrowChunkReader>> +VertexPropertyArrowChunkReader::Make( + const std::shared_ptr<VertexInfo>& vertex_info, + const std::vector<std::string>& labels, const std::string& prefix, + const util::FilterOptions& options) { + return std::make_shared<VertexPropertyArrowChunkReader>(vertex_info, labels, + prefix, options); +} + +Result<std::shared_ptr<VertexPropertyArrowChunkReader>> +VertexPropertyArrowChunkReader::Make( + const std::shared_ptr<GraphInfo>& graph_info, const std::string& type, + const std::vector<std::string>& labels, + const util::FilterOptions& options) { + auto vertex_info = graph_info->GetVertexInfo(type); + if (!vertex_info) { + return Status::KeyError("The vertex type ", type, + " doesn't exist in graph ", graph_info->GetName(), + "."); + } + return Make(vertex_info, labels, graph_info->GetPrefix(), options); +} + AdjListArrowChunkReader::AdjListArrowChunkReader( const std::shared_ptr<EdgeInfo>& edge_info, AdjListType adj_list_type, const std::string& prefix) diff --git a/cpp/src/graphar/arrow/chunk_reader.h b/cpp/src/graphar/arrow/chunk_reader.h index b204753b..739995ec 100644 --- a/cpp/src/graphar/arrow/chunk_reader.h +++ b/cpp/src/graphar/arrow/chunk_reader.h @@ -53,6 +53,19 @@ class VertexPropertyArrowChunkReader { const std::shared_ptr<PropertyGroup>& property_group, const std::string& prefix, const util::FilterOptions& options = {}); + VertexPropertyArrowChunkReader() : vertex_info_(nullptr), prefix_("") {} + + /** + * @brief Initialize the VertexPropertyArrowChunkReader. + * + * @param vertex_info The vertex info that describes the vertex type. + * @param labels The labels of the vertex type. + * @param prefix The absolute prefix. + */ + VertexPropertyArrowChunkReader(const std::shared_ptr<VertexInfo>& vertex_info, + const std::vector<std::string>& labels, + const std::string& prefix, + const util::FilterOptions& options = {}); /** * @brief Sets chunk position indicator for reader by internal vertex id. * If internal vertex id is not found, will return Status::IndexError @@ -67,7 +80,11 @@ class VertexPropertyArrowChunkReader { * @brief Return the current arrow chunk table of chunk position indicator. */ Result<std::shared_ptr<arrow::Table>> GetChunk(); - + /** + * @brief Return the current arrow label chunk table of chunk position + * indicator. + */ + Result<std::shared_ptr<arrow::Table>> GetLabelChunk(); /** * @brief Sets chunk position indicator to next chunk. * @@ -138,10 +155,40 @@ class VertexPropertyArrowChunkReader { const std::string& property_name, const util::FilterOptions& options = {}); + /** + * @brief Create a VertexPropertyArrowChunkReader instance from vertex info + * for labels. + * + * @param vertex_info The vertex info. + * @param labels The name of labels you want to read. + * @param prefix The absolute prefix of the graph. + * @param options The filter options, default is empty. + */ + static Result<std::shared_ptr<VertexPropertyArrowChunkReader>> Make( + const std::shared_ptr<VertexInfo>& vertex_info, + const std::vector<std::string>& labels, const std::string& prefix, + const util::FilterOptions& options = {}); + + /** + * @brief Create a VertexPropertyArrowChunkReader instance from graph info + * for labels. + * + * @param graph_info The graph info. + * @param type The vertex type. + * @param labels The name of labels you want to read. + * @param prefix The absolute prefix of the graph. + * @param options The filter options, default is empty. + */ + static Result<std::shared_ptr<VertexPropertyArrowChunkReader>> Make( + const std::shared_ptr<GraphInfo>& graph_info, const std::string& type, + const std::vector<std::string>& labels, + const util::FilterOptions& options = {}); + private: std::shared_ptr<VertexInfo> vertex_info_; std::shared_ptr<PropertyGroup> property_group_; std::string prefix_; + std::vector<std::string> labels_; IdType chunk_index_; IdType seek_id_; IdType chunk_num_; diff --git a/cpp/src/graphar/arrow/chunk_writer.cc b/cpp/src/graphar/arrow/chunk_writer.cc index 2e3e7353..5be9d5e2 100644 --- a/cpp/src/graphar/arrow/chunk_writer.cc +++ b/cpp/src/graphar/arrow/chunk_writer.cc @@ -17,7 +17,7 @@ * under the License. */ -#include <iostream> +#include <unordered_map> #include <utility> #include "arrow/api.h" @@ -251,6 +251,23 @@ Status VertexPropertyWriter::WriteChunk( return Status::OK(); } +Status VertexPropertyWriter::WriteLabelChunk( + const std::shared_ptr<arrow::Table>& input_table, IdType chunk_index, + FileType file_type, ValidateLevel validate_level) const { + auto schema = input_table->schema(); + std::vector<int> indices; + for (int i = 0; i < schema->num_fields(); i++) { + indices.push_back(i); + } + + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto in_table, + input_table->SelectColumns(indices)); + std::string suffix = + vertex_info_->GetPrefix() + "labels/chunk" + std::to_string(chunk_index); + std::string path = prefix_ + suffix; + return fs_->WriteLabelTableToFile(input_table, path); +} + Status VertexPropertyWriter::WriteTable( const std::shared_ptr<arrow::Table>& input_table, const std::shared_ptr<PropertyGroup>& property_group, @@ -287,9 +304,113 @@ Status VertexPropertyWriter::WriteTable( GAR_RETURN_NOT_OK(WriteTable(table_with_index, property_group, start_chunk_index, validate_level)); } + auto labels = vertex_info_->GetLabels(); + if (!labels.empty()) { + GAR_ASSIGN_OR_RAISE(auto label_table, GetLabelTable(input_table, labels)) + GAR_RETURN_NOT_OK(WriteLabelTable(label_table, start_chunk_index, + FileType::PARQUET, validate_level)); + } + + return Status::OK(); +} + +// Helper function to split a string by a delimiter +std::vector<std::string> SplitString(const std::string& str, char delimiter) { + std::vector<std::string> tokens; + std::string token; + std::istringstream tokenStream(str); + while (std::getline(tokenStream, token, delimiter)) { + tokens.push_back(token); + } + return tokens; +} + +Status VertexPropertyWriter::WriteLabelTable( + const std::shared_ptr<arrow::Table>& input_table, IdType start_chunk_index, + FileType file_type, ValidateLevel validate_level) const { + auto schema = input_table->schema(); + int indice = schema->GetFieldIndex(GeneralParams::kVertexIndexCol); + IdType chunk_size = vertex_info_->GetChunkSize(); + int64_t length = input_table->num_rows(); + IdType chunk_index = start_chunk_index; + for (int64_t offset = 0; offset < length; + offset += chunk_size, chunk_index++) { + auto in_chunk = input_table->Slice(offset, chunk_size); + GAR_RETURN_NOT_OK( + WriteLabelChunk(in_chunk, chunk_index, file_type, validate_level)); + } return Status::OK(); } +Result<std::shared_ptr<arrow::Table>> VertexPropertyWriter::GetLabelTable( + const std::shared_ptr<arrow::Table>& input_table, + const std::vector<std::string>& labels) const { + // Find the label column index + auto label_col_idx = + input_table->schema()->GetFieldIndex(GeneralParams::kLabelCol); + if (label_col_idx == -1) { + return Status::KeyError("label column not found in the input table."); + } + + // Create a matrix of booleans with dimensions [number of rows, number of + // labels] + std::vector<std::vector<bool>> bool_matrix( + input_table->num_rows(), std::vector<bool>(labels.size(), false)); + + // Create a map for labels to column indices + std::unordered_map<std::string, int> label_to_index; + for (size_t i = 0; i < labels.size(); ++i) { + label_to_index[labels[i]] = i; + } + + int row_offset = 0; // Offset for where to fill the bool_matrix + // Iterate through each chunk of the :LABEL column + for (int64_t chunk_idx = 0; + chunk_idx < input_table->column(label_col_idx)->num_chunks(); + ++chunk_idx) { + auto chunk = input_table->column(label_col_idx)->chunk(chunk_idx); + auto label_column = std::static_pointer_cast<arrow::StringArray>(chunk); + + // Populate the matrix based on :LABEL column values + for (int64_t row = 0; row < label_column->length(); ++row) { + if (label_column->IsValid(row)) { + std::string labels_string = label_column->GetString(row); + auto row_labels = SplitString(labels_string, ';'); + for (const auto& lbl : row_labels) { + if (label_to_index.find(lbl) != label_to_index.end()) { + bool_matrix[row_offset + row][label_to_index[lbl]] = true; + } + } + } + } + + row_offset += + label_column->length(); // Update the row offset for the next chunk + } + + // Create Arrow arrays for each label column + arrow::FieldVector fields; + arrow::ArrayVector arrays; + + for (const auto& label : labels) { + arrow::BooleanBuilder builder; + for (const auto& row : bool_matrix) { + builder.Append(row[label_to_index[label]]); + } + + std::shared_ptr<arrow::Array> array; + builder.Finish(&array); + fields.push_back(arrow::field(label, arrow::boolean())); + arrays.push_back(array); + } + + // Create the Arrow Table with the boolean columns + auto schema = std::make_shared<arrow::Schema>(fields); + auto result_table = arrow::Table::Make(schema, arrays); + + return result_table; +} + Result<std::shared_ptr<VertexPropertyWriter>> VertexPropertyWriter::Make( const std::shared_ptr<VertexInfo>& vertex_info, const std::string& prefix, const ValidateLevel& validate_level) { diff --git a/cpp/src/graphar/arrow/chunk_writer.h b/cpp/src/graphar/arrow/chunk_writer.h index 65a8813e..23c7f415 100644 --- a/cpp/src/graphar/arrow/chunk_writer.h +++ b/cpp/src/graphar/arrow/chunk_writer.h @@ -117,6 +117,21 @@ class VertexPropertyWriter { const std::shared_ptr<PropertyGroup>& property_group, IdType chunk_index, ValidateLevel validate_level = ValidateLevel::default_validate) const; + /** + * @brief Write all labels of a single vertex chunk + * to corresponding files. + * + * @param input_table The table containing data. + * @param chunk_index The index of the vertex chunk. + * @param validate_level The validate level for this operation, + * which is the writer's validate level by default. + * @return Status: ok or error. + */ + Status WriteLabelChunk( + const std::shared_ptr<arrow::Table>& input_table, IdType chunk_index, + FileType file_type, + ValidateLevel validate_level = ValidateLevel::default_validate) const; + /** * @brief Write all property groups of a single vertex chunk * to corresponding files. @@ -163,6 +178,35 @@ class VertexPropertyWriter { IdType start_chunk_index, ValidateLevel validate_level = ValidateLevel::default_validate) const; + /** + * @brief Write all labels for multiple vertex chunks + * to corresponding files. + * + * @param input_table The table containing data. + * @param start_chunk_index The start index of the vertex chunks. + * @param validate_level The validate level for this operation, + * which is the writer's validate level by default. + * @return Status: ok or error. + */ + Status WriteLabelTable( + const std::shared_ptr<arrow::Table>& input_table, + IdType start_chunk_index, FileType file_type, + ValidateLevel validate_level = ValidateLevel::default_validate) const; + + /** + * @brief Get label column from table to formulate label table + * @param input_table The table containing data. + * @param labels The labels. + * @return The table only containing label columns + * */ + Result<std::shared_ptr<arrow::Table>> GetLabelTable( + const std::shared_ptr<arrow::Table>& input_table, + const std::vector<std::string>& labels) const; + + Result<std::shared_ptr<arrow::Table>> GetLabelTableAndRandomlyAddLabels( + const std::shared_ptr<arrow::Table>& input_table, + const std::vector<std::string>& labels) const; + /** * @brief Construct a VertexPropertyWriter from vertex info. * diff --git a/cpp/src/graphar/filesystem.cc b/cpp/src/graphar/filesystem.cc index 92bd55c3..dfbbcb1d 100644 --- a/cpp/src/graphar/filesystem.cc +++ b/cpp/src/graphar/filesystem.cc @@ -238,7 +238,6 @@ Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table, auto column_num = schema->num_fields(); parquet::WriterProperties::Builder builder; builder.compression(arrow::Compression::type::ZSTD); // enable compression - builder.encoding(graphar::GeneralParams::kLabelCol, parquet::Encoding::RLE); RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable( *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024, builder.build(), parquet::default_arrow_writer_properties())); @@ -263,6 +262,24 @@ Status FileSystem::WriteTableToFile(const std::shared_ptr<arrow::Table>& table, return Status::OK(); } +Status FileSystem::WriteLabelTableToFile( + const std::shared_ptr<arrow::Table>& table, const std::string& path) const + noexcept { + // try to create the directory, oss filesystem may not support this, ignore + ARROW_UNUSED(arrow_fs_->CreateDir(path.substr(0, path.find_last_of("/")))); + GAR_RETURN_ON_ARROW_ERROR_AND_ASSIGN(auto output_stream, + arrow_fs_->OpenOutputStream(path)); + auto schema = table->schema(); + auto column_num = schema->num_fields(); + parquet::WriterProperties::Builder builder; + builder.compression(arrow::Compression::type::ZSTD); // enable compression + builder.encoding(parquet::Encoding::RLE); + RETURN_NOT_ARROW_OK(parquet::arrow::WriteTable( + *table, arrow::default_memory_pool(), output_stream, 64 * 1024 * 1024, + builder.build(), parquet::default_arrow_writer_properties())); + return Status::OK(); +} + Status FileSystem::CopyFile(const std::string& src_path, const std::string& dst_path) const noexcept { // try to create the directory, oss filesystem may not support this, ignore diff --git a/cpp/src/graphar/filesystem.h b/cpp/src/graphar/filesystem.h index 309bb564..939b73c6 100644 --- a/cpp/src/graphar/filesystem.h +++ b/cpp/src/graphar/filesystem.h @@ -114,6 +114,15 @@ class FileSystem { FileType file_type, const std::string& path) const noexcept; + /** + * @brief Write a label table to a file with parquet type. + * @param input_table The label table to write. + * @param path The path of the output file. + * @return A Status indicating OK if successful, or an error if unsuccessful. + */ + Status WriteLabelTableToFile(const std::shared_ptr<arrow::Table>& table, + const std::string& path) const noexcept; + /** * Copy a file. * diff --git a/cpp/src/graphar/general_params.h b/cpp/src/graphar/general_params.h index 71572193..da9ed7ac 100644 --- a/cpp/src/graphar/general_params.h +++ b/cpp/src/graphar/general_params.h @@ -27,7 +27,7 @@ struct GeneralParams { static constexpr const char* kDstIndexCol = "_graphArDstIndex"; static constexpr const char* kOffsetCol = "_graphArOffset"; static constexpr const char* kPrimaryCol = "_graphArPrimary"; - static constexpr const char* kLabelCol = "_graphArLabel"; + static constexpr const char* kLabelCol = ":LABEL"; }; } // namespace graphar diff --git a/cpp/test/test_multi_label.cc b/cpp/test/test_multi_label.cc new file mode 100644 index 00000000..0f02299d --- /dev/null +++ b/cpp/test/test_multi_label.cc @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <fstream> +#include <iostream> +#include <unordered_map> +#include "arrow/api.h" +#include "arrow/csv/api.h" +#include "arrow/filesystem/api.h" +#include "arrow/io/api.h" +#include "parquet/arrow/writer.h" + +#include "./util.h" +#include "graphar/api/arrow_reader.h" +#include "graphar/api/high_level_writer.h" + +#include <catch2/catch_test_macros.hpp> + +std::shared_ptr<arrow::Table> read_csv_to_table(const std::string& filename) { + arrow::csv::ReadOptions read_options{}; + arrow::csv::ParseOptions parse_options{}; + arrow::csv::ConvertOptions convert_options{}; + + parse_options.delimiter = '|'; + + auto input = + arrow::io::ReadableFile::Open(filename, arrow::default_memory_pool()) + .ValueOrDie(); + + auto reader = arrow::csv::TableReader::Make(arrow::io::default_io_context(), + input, read_options, + parse_options, convert_options) + .ValueOrDie(); + + std::shared_ptr<arrow::Table> table; + table = reader->Read().ValueOrDie(); + + return table; +} + +namespace graphar { +TEST_CASE_METHOD(GlobalFixture, "test_multi_label_builder") { + std::cout << "Test multi label builder" << std::endl; + + // construct graph information from file + std::string path = test_data_dir + "/ldbc/parquet/" + "ldbc.graph.yml"; + auto graph_info = graphar::GraphInfo::Load(path).value(); + auto vertex_info = graph_info->GetVertexInfo("organisation"); + + auto labels = vertex_info->GetLabels(); + + std::unordered_map<std::string, size_t> code; + + std::vector<std::vector<bool>> label_column_data; + + // read labels csv file as arrow table + auto table = read_csv_to_table(test_data_dir + "/ldbc/organisation_0_0.csv"); + std::string table_message = table->ToString(); + + auto schema = table->schema(); + std::cout << schema->ToString() << std::endl; + + // write arrow table as parquet chunk + auto maybe_writer = + VertexPropertyWriter::Make(vertex_info, test_data_dir + "/ldbc/parquet/"); + REQUIRE(!maybe_writer.has_error()); + auto writer = maybe_writer.value(); + REQUIRE(writer->WriteTable(table, 0).ok()); + REQUIRE(writer->WriteVerticesNum(table->num_rows()).ok()); + + // read label chunk as arrow table + auto maybe_reader = + VertexPropertyArrowChunkReader::Make(graph_info, "organisation", labels); + assert(maybe_reader.status().ok()); + auto reader = maybe_reader.value(); + assert(reader->seek(0).ok()); + assert(reader->GetLabelChunk().status().ok()); + assert(reader->next_chunk().ok()); +} +} // namespace graphar --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@graphar.apache.org For additional commands, e-mail: commits-h...@graphar.apache.org