This is an automated email from the ASF dual-hosted git repository.

lixueclaire pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-graphar.git


The following commit(s) were added to refs/heads/main by this push:
     new b86304a7 feat(c++): label filtering API, benchmarks, and examples 
(#654)
b86304a7 is described below

commit b86304a7c1f3a5bee01910dd2b196dec54303ad6
Author: Elssky <43638383+els...@users.noreply.github.com>
AuthorDate: Mon Nov 11 09:45:36 2024 +0800

    feat(c++): label filtering API, benchmarks, and examples (#654)
---
 cpp/CMakeLists.txt                         |   1 +
 cpp/benchmarks/benchmark_util.h            |   6 +
 cpp/benchmarks/label_filter_benchmark.cc   | 136 ++++++++++++++
 cpp/examples/label_filtering_example.cc    |  95 ++++++++++
 cpp/src/graphar/high-level/graph_reader.cc | 292 ++++++++++++++++++++++++++++-
 cpp/src/graphar/high-level/graph_reader.h  | 188 +++++++++++++++++--
 cpp/src/graphar/label.cc                   | 113 +++++++++++
 cpp/src/graphar/label.h                    |  62 ++++++
 8 files changed, 874 insertions(+), 19 deletions(-)

diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index cd12d486..730c46c0 100644
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -477,6 +477,7 @@ if (BUILD_BENCHMARKS)
         target_link_libraries(${target} PRIVATE benchmark::benchmark_main 
graphar ${CMAKE_DL_LIBS})
     endmacro()
     add_benchmark(arrow_chunk_reader_benchmark SRCS 
benchmarks/arrow_chunk_reader_benchmark.cc)
+    add_benchmark(label_filter_benchmark SRCS 
benchmarks/label_filter_benchmark.cc)
     add_benchmark(graph_info_benchmark SRCS benchmarks/graph_info_benchmark.cc)
 endif()
 
diff --git a/cpp/benchmarks/benchmark_util.h b/cpp/benchmarks/benchmark_util.h
index 19cd4737..6c0494ff 100644
--- a/cpp/benchmarks/benchmark_util.h
+++ b/cpp/benchmarks/benchmark_util.h
@@ -41,6 +41,10 @@ class BenchmarkFixture : public ::benchmark::Fixture {
     path_ = std::string(c_root) + "/ldbc_sample/parquet/ldbc_sample.graph.yml";
     auto maybe_graph_info = GraphInfo::Load(path_);
     graph_info_ = maybe_graph_info.value();
+
+    second_path_ = std::string(c_root) + "/ldbc/parquet/ldbc.graph.yml";
+    auto second_maybe_graph_info = GraphInfo::Load(second_path_);
+    second_graph_info_ = second_maybe_graph_info.value();
   }
 
   void TearDown(const ::benchmark::State& state) override {}
@@ -48,5 +52,7 @@ class BenchmarkFixture : public ::benchmark::Fixture {
  protected:
   std::string path_;
   std::shared_ptr<GraphInfo> graph_info_;
+  std::string second_path_;
+  std::shared_ptr<GraphInfo> second_graph_info_;
 };
 }  // namespace graphar
diff --git a/cpp/benchmarks/label_filter_benchmark.cc 
b/cpp/benchmarks/label_filter_benchmark.cc
new file mode 100644
index 00000000..d575a075
--- /dev/null
+++ b/cpp/benchmarks/label_filter_benchmark.cc
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "benchmark/benchmark.h"
+
+#include "./benchmark_util.h"
+#include "graphar/api/high_level_reader.h"
+#include "graphar/api/info.h"
+
+namespace graphar {
+
+std::shared_ptr<graphar::VerticesCollection> SingleLabelFilter(
+    const std::shared_ptr<graphar::GraphInfo>& graph_info) {
+  std::string type = "organisation";
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  std::string filter_label = "university";
+  auto maybe_filter_vertices_collection =
+      VerticesCollection::verticesWithLabel(filter_label, graph_info, type);
+  auto filter_vertices = maybe_filter_vertices_collection.value();
+  return filter_vertices;
+}
+
+void SingleLabelFilterbyAcero(
+    const std::shared_ptr<graphar::GraphInfo>& graph_info) {
+  std::string type = "organisation";
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  std::string filter_label = "university";
+  auto maybe_filter_vertices_collection =
+      VerticesCollection::verticesWithLabelbyAcero(filter_label, graph_info,
+                                                   type);
+  auto filter_vertices = maybe_filter_vertices_collection.value();
+}
+
+void MultiLabelFilter(const std::shared_ptr<graphar::GraphInfo>& graph_info) {
+  std::string type = "organisation";
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  std::vector<std::string> filter_label = {"university", "company"};
+  auto maybe_filter_vertices_collection =
+      VerticesCollection::verticesWithMultipleLabels(filter_label, graph_info,
+                                                     type);
+  auto filter_vertices = maybe_filter_vertices_collection.value();
+}
+
+void MultiLabelFilterbyAcero(
+    const std::shared_ptr<graphar::GraphInfo>& graph_info) {
+  std::string type = "organisation";
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  std::vector<std::string> filter_label = {"university", "company"};
+  auto maybe_filter_vertices_collection =
+      VerticesCollection::verticesWithMultipleLabelsbyAcero(filter_label,
+                                                            graph_info, type);
+  auto filter_vertices = maybe_filter_vertices_collection.value();
+}
+
+std::shared_ptr<graphar::VerticesCollection> LabelFilterFromSet(
+    const std::shared_ptr<graphar::GraphInfo>& graph_info,
+    const std::shared_ptr<VerticesCollection>& vertices_collection) {
+  std::string type = "organisation";
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  std::vector<std::string> filter_label = {"company", "public"};
+  auto maybe_filter_vertices_collection =
+      VerticesCollection::verticesWithMultipleLabels(filter_label,
+                                                     vertices_collection);
+  auto filter_vertices = maybe_filter_vertices_collection.value();
+  return filter_vertices;
+}
+
+BENCHMARK_DEFINE_F(BenchmarkFixture, SingleLabelFilter)
+(::benchmark::State& state) {  // NOLINT
+  for (auto _ : state) {
+    SingleLabelFilter(second_graph_info_);
+  }
+}
+
+BENCHMARK_DEFINE_F(BenchmarkFixture, SingleLabelFilterbyAcero)
+(::benchmark::State& state) {  // NOLINT
+  for (auto _ : state) {
+    SingleLabelFilterbyAcero(second_graph_info_);
+  }
+}
+
+BENCHMARK_DEFINE_F(BenchmarkFixture, MultiLabelFilter)
+(::benchmark::State& state) {  // NOLINT
+  for (auto _ : state) {
+    MultiLabelFilter(second_graph_info_);
+  }
+}
+
+BENCHMARK_DEFINE_F(BenchmarkFixture, MultiLabelFilterbyAcero)
+(::benchmark::State& state) {  // NOLINT
+  for (auto _ : state) {
+    MultiLabelFilterbyAcero(second_graph_info_);
+  }
+}
+
+BENCHMARK_DEFINE_F(BenchmarkFixture, LabelFilterFromSet)
+(::benchmark::State& state) {  // NOLINT
+  for (auto _ : state) {
+    state.PauseTiming();
+    auto vertices_collection = SingleLabelFilter(second_graph_info_);
+    auto vertices_collection_2 =
+        LabelFilterFromSet(second_graph_info_, vertices_collection);
+    state.ResumeTiming();
+    LabelFilterFromSet(second_graph_info_, vertices_collection_2);
+  }
+}
+
+BENCHMARK_REGISTER_F(BenchmarkFixture, SingleLabelFilter)->Iterations(10);
+BENCHMARK_REGISTER_F(BenchmarkFixture, SingleLabelFilterbyAcero)
+    ->Iterations(10);
+BENCHMARK_REGISTER_F(BenchmarkFixture, MultiLabelFilter)->Iterations(10);
+BENCHMARK_REGISTER_F(BenchmarkFixture, 
MultiLabelFilterbyAcero)->Iterations(10);
+BENCHMARK_REGISTER_F(BenchmarkFixture, LabelFilterFromSet)->Iterations(10);
+
+}  // namespace graphar
diff --git a/cpp/examples/label_filtering_example.cc 
b/cpp/examples/label_filtering_example.cc
new file mode 100644
index 00000000..e519bdda
--- /dev/null
+++ b/cpp/examples/label_filtering_example.cc
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <iostream>
+
+#include "arrow/api.h"
+#include "arrow/filesystem/api.h"
+
+#include "./config.h"
+#include "graphar/api/arrow_reader.h"
+#include "graphar/api/high_level_reader.h"
+
+void vertices_collection(
+    const std::shared_ptr<graphar::GraphInfo>& graph_info) {
+  std::string type = "organisation";
+  auto vertex_info = graph_info->GetVertexInfo("organisation");
+  auto labels = vertex_info->GetLabels();
+
+  std::cout << "Query vertices with a specific label" << std::endl;
+  std::cout << "--------------------------------------" << std::endl;
+
+  auto maybe_filter_vertices_collection =
+      graphar::VerticesCollection::verticesWithLabel(std::string("company"),
+                                                     graph_info, type);
+
+  ASSERT(!maybe_filter_vertices_collection.has_error());
+  auto filter_vertices = maybe_filter_vertices_collection.value();
+  std::cout << "valid vertices num: " << filter_vertices->size() << std::endl;
+
+  std::cout << std::endl;
+  std::cout << "Query vertices with specific label in a filtered vertices set"
+            << std::endl;
+  std::cout << "--------------------------------------" << std::endl;
+
+  auto maybe_filter_vertices_collection_2 =
+      graphar::VerticesCollection::verticesWithLabel(std::string("public"),
+                                                     filter_vertices);
+  ASSERT(!maybe_filter_vertices_collection_2.has_error());
+  auto filter_vertices_2 = maybe_filter_vertices_collection_2.value();
+  std::cout << "valid vertices num: " << filter_vertices_2->size() << 
std::endl;
+
+  std::cout << std::endl;
+  std::cout << "Test vertices with multi labels" << std::endl;
+  std::cout << "--------------------------------------" << std::endl;
+  auto maybe_filter_vertices_collection_3 =
+      graphar::VerticesCollection::verticesWithMultipleLabels(
+          {"company", "public"}, graph_info, type);
+  ASSERT(!maybe_filter_vertices_collection_3.has_error());
+  auto filter_vertices_3 = maybe_filter_vertices_collection_3.value();
+  std::cout << "valid vertices num: " << filter_vertices_3->size() << 
std::endl;
+
+  for (auto it = filter_vertices_3->begin(); it != filter_vertices_3->end();
+       ++it) {
+    // get a node's all labels
+    auto label_result = it.label();
+    std::cout << "id: " << it.id() << " ";
+    if (!label_result.has_error()) {
+      for (auto label : label_result.value()) {
+        std::cout << label << " ";
+      }
+    }
+    std::cout << "name: ";
+    auto property = it.property<std::string>("name").value();
+    std::cout << property << " ";
+    std::cout << std::endl;
+  }
+}
+
+int main(int argc, char* argv[]) {
+  // read file and construct graph info
+  std::string path = GetTestingResourceRoot() + "/ldbc/parquet/ldbc.graph.yml";
+  auto graph_info = graphar::GraphInfo::Load(path).value();
+
+  // vertices collection
+  std::cout << "Vertices collection" << std::endl;
+  std::cout << "-------------------" << std::endl;
+  vertices_collection(graph_info);
+  std::cout << std::endl;
+}
diff --git a/cpp/src/graphar/high-level/graph_reader.cc 
b/cpp/src/graphar/high-level/graph_reader.cc
index 9d9857d3..2cfe5b36 100644
--- a/cpp/src/graphar/high-level/graph_reader.cc
+++ b/cpp/src/graphar/high-level/graph_reader.cc
@@ -17,8 +17,14 @@
  * under the License.
  */
 
-#include "graphar/high-level/graph_reader.h"
+#include <algorithm>
+#include <unordered_set>
+
+#include "arrow/array.h"
+#include "graphar/api/arrow_reader.h"
 #include "graphar/convert_to_arrow_type.h"
+#include "graphar/high-level/graph_reader.h"
+#include "graphar/label.h"
 #include "graphar/types.h"
 
 namespace graphar {
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::hasLabel(const std::string& label) noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  column = util::GetArrowColumnByName(chunk_table, label);
+  if (column != nullptr) {
+    auto array = util::GetArrowArrayByChunkIndex(column, 0);
+    auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+    return bool_array->Value(0);
+  }
+  return Status::KeyError("label with name ", label,
+                          " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  std::vector<std::string> vertex_label;
+  if (is_filtered_)
+    label_reader_.seek(filtered_ids_[cur_offset_]);
+  else
+    label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  for (auto label : labels_) {
+    column = util::GetArrowColumnByName(chunk_table, label);
+    if (column != nullptr) {
+      auto array = util::GetArrowArrayByChunkIndex(column, 0);
+      auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+      if (bool_array->Value(0)) {
+        vertex_label.push_back(label);
+      }
+    }
+  }
+  return vertex_label;
+}
+
+static inline bool IsValid(bool* state, int column_number) {
+  for (int i = 0; i < column_number; ++i) {
+    // AND case
+    if (!state[i])
+      return false;
+    // OR case
+    // if (state[i]) return true;
+  }
+  // AND case
+  return true;
+  // OR case
+  // return false;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter(
+    std::vector<std::string> filter_labels,
+    std::vector<IdType>* new_valid_chunk) {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+  const int TOT_LABEL_NUM = labels_.size();
+  const int TESTED_LABEL_NUM = filter_labels.size();
+  std::vector<int> tested_label_ids;
+
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  if (tested_label_ids.empty())
+    return Status::KeyError(
+        "query label"
+        " does not exist in the vertex.");
+
+  uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1];
+  memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
+  int total_count = 0;
+  int row_num;
+
+  if (is_filtered_) {
+    for (int chunk_idx : valid_chunk_) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0 && new_valid_chunk != nullptr)
+        new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  } else {
+    for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
+         ++chunk_idx) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0)
+        valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  delete[] bitmap;
+
+  return indices64;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
+    std::vector<std::string> filter_labels) const {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+
+  std::vector<int> tested_label_ids;
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  int total_count = 0;
+  int row_num;
+  std::vector<std::shared_ptr<Expression>> filters;
+  std::shared_ptr<Expression> combined_filter = nullptr;
+
+  for (const auto& label : filter_labels) {
+    filters.emplace_back(
+        graphar::_Equal(graphar::_Property(label), graphar::_Literal(true)));
+  }
+
+  for (const auto& filter : filters) {
+    if (!combined_filter) {
+      combined_filter = graphar::_And(filter, filter);
+    } else {
+      combined_filter = graphar::_And(combined_filter, filter);
+    }
+  }
+
+  auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
+      vertex_info_, labels_, prefix_, {});
+  auto filter_reader = maybe_filter_reader.value();
+  filter_reader->Filter(combined_filter);
+  for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) {
+    auto filter_result = filter_reader->GetLabelChunk();
+    auto filter_table = filter_result.value();
+    total_count += filter_table->num_rows();
+    filter_reader->next_chunk();
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  return indices64;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabelbyAcero(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter_by_acero({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+    const std::string& filter_label,
+    const std::shared_ptr<VerticesCollection>& vertices_collection) {
+  auto new_vertices_collection = std::make_shared<VerticesCollection>(
+      vertices_collection->vertex_info_, vertices_collection->prefix_);
+  auto filtered_ids =
+      new_vertices_collection
+          ->filter({filter_label}, &new_vertices_collection->valid_chunk_)
+          .value();
+  if (vertices_collection->is_filtered_) {
+    std::unordered_set<IdType> origin_set(
+        vertices_collection->filtered_ids_.begin(),
+        vertices_collection->filtered_ids_.end());
+    std::unordered_set<int> intersection;
+    for (int num : filtered_ids) {
+      if (origin_set.count(num)) {
+        intersection.insert(num);
+      }
+    }
+    filtered_ids =
+        std::vector<IdType>(intersection.begin(), intersection.end());
+
+    new_vertices_collection->is_filtered_ = true;
+  }
+  new_vertices_collection->filtered_ids_ = filtered_ids;
+
+  return new_vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabels(
+    const std::vector<std::string>& filter_labels,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter(filter_labels).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabelsbyAcero(
+    const std::vector<std::string>& filter_labels,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter_by_acero(filter_labels).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabels(
+    const std::vector<std::string>& filter_labels,
+    const std::shared_ptr<VerticesCollection>& vertices_collection) {
+  auto new_vertices_collection = std::make_shared<VerticesCollection>(
+      vertices_collection->vertex_info_, vertices_collection->prefix_);
+  auto filtered_ids =
+      vertices_collection
+          ->filter(filter_labels, &new_vertices_collection->valid_chunk_)
+          .value();
+  if (vertices_collection->is_filtered_) {
+    std::unordered_set<IdType> origin_set(
+        vertices_collection->filtered_ids_.begin(),
+        vertices_collection->filtered_ids_.end());
+    std::unordered_set<int> intersection;
+    for (int num : filtered_ids) {
+      if (origin_set.count(num)) {
+        intersection.insert(num);
+      }
+    }
+    filtered_ids =
+        std::vector<IdType>(intersection.begin(), intersection.end());
+
+    new_vertices_collection->is_filtered_ = true;
+  }
+  new_vertices_collection->filtered_ids_ = filtered_ids;
+
+  return new_vertices_collection;
+}
+
 template <typename T>
 Result<T> Vertex::property(const std::string& property) const {
   if constexpr (std::is_final<T>::value) {
diff --git a/cpp/src/graphar/high-level/graph_reader.h 
b/cpp/src/graphar/high-level/graph_reader.h
index 20e7d8f8..19c8f716 100644
--- a/cpp/src/graphar/high-level/graph_reader.h
+++ b/cpp/src/graphar/high-level/graph_reader.h
@@ -74,6 +74,13 @@ class Vertex {
   template <typename T>
   Result<T> property(const std::string& property) const;
 
+  /**
+   * @brief Get the label of the vertex.
+   * @return Result: The label of the vertex.
+   */
+  template <typename T>
+  Result<T> label() const;
+
   /**
    * @brief Return true if value at the property is valid (not null).
    *
@@ -172,40 +179,80 @@ class VertexIter {
    * @param offset The current offset of the readers.
    */
   explicit VertexIter(const std::shared_ptr<VertexInfo>& vertex_info,
-                      const std::string& prefix, IdType offset) noexcept {
+                      const std::string& prefix, IdType offset,
+                      const std::vector<std::string>& labels,
+                      const bool& is_filtered = false,
+                      const std::vector<IdType>& filtered_ids = {}) noexcept {
+    if (!labels.empty()) {
+      labels_ = labels;
+      label_reader_ =
+          VertexPropertyArrowChunkReader(vertex_info, labels, prefix);
+    }
     for (const auto& pg : vertex_info->GetPropertyGroups()) {
       readers_.emplace_back(vertex_info, pg, prefix);
     }
+    is_filtered_ = is_filtered;
+    filtered_ids_ = filtered_ids;
     cur_offset_ = offset;
   }
 
   /** Copy constructor. */
   VertexIter(const VertexIter& other)
-      : readers_(other.readers_), cur_offset_(other.cur_offset_) {}
+      : readers_(other.readers_),
+        cur_offset_(other.cur_offset_),
+        labels_(other.labels_),
+        label_reader_(other.label_reader_),
+        is_filtered_(other.is_filtered_),
+        filtered_ids_(other.filtered_ids_) {}
 
   /** Construct and return the vertex of the current offset. */
   Vertex operator*() noexcept {
-    for (auto& reader : readers_) {
-      reader.seek(cur_offset_);
+    if (is_filtered_) {
+      for (auto& reader : readers_) {
+        reader.seek(filtered_ids_[cur_offset_]);
+      }
+    } else {
+      for (auto& reader : readers_) {
+        reader.seek(cur_offset_);
+      }
     }
+
     return Vertex(cur_offset_, readers_);
   }
 
   /** Get the vertex id of the current offset. */
-  IdType id() { return cur_offset_; }
+  IdType id() {
+    if (is_filtered_) {
+      return filtered_ids_[cur_offset_];
+    } else {
+      return cur_offset_;
+    }
+  }
 
   /** Get the value for a property of the current vertex. */
   template <typename T>
   Result<T> property(const std::string& property) noexcept {
     std::shared_ptr<arrow::ChunkedArray> column(nullptr);
-    for (auto& reader : readers_) {
-      reader.seek(cur_offset_);
-      GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk());
-      column = util::GetArrowColumnByName(chunk_table, property);
-      if (column != nullptr) {
-        break;
+    if (is_filtered_) {
+      for (auto& reader : readers_) {
+        reader.seek(filtered_ids_[cur_offset_]);
+        GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk());
+        column = util::GetArrowColumnByName(chunk_table, property);
+        if (column != nullptr) {
+          break;
+        }
+      }
+    } else {
+      for (auto& reader : readers_) {
+        reader.seek(cur_offset_);
+        GAR_ASSIGN_OR_RAISE(auto chunk_table, reader.GetChunk());
+        column = util::GetArrowColumnByName(chunk_table, property);
+        if (column != nullptr) {
+          break;
+        }
       }
     }
+
     if (column != nullptr) {
       auto array = util::GetArrowArrayByChunkIndex(column, 0);
       GAR_ASSIGN_OR_RAISE(auto data, util::GetArrowArrayData(array));
@@ -215,6 +262,12 @@ class VertexIter {
                             " does not exist in the vertex.");
   }
 
+  /** Determine whether a vertex has the input label. */
+  Result<bool> hasLabel(const std::string& label) noexcept;
+
+  /** Get the labels of the current vertex. */
+  Result<std::vector<std::string>> label() noexcept;
+
   /** The prefix increment operator. */
   VertexIter& operator++() noexcept {
     ++cur_offset_;
@@ -253,7 +306,11 @@ class VertexIter {
 
  private:
   std::vector<VertexPropertyArrowChunkReader> readers_;
+  VertexPropertyArrowChunkReader label_reader_;
+  std::vector<std::string> labels_;
   IdType cur_offset_;
+  bool is_filtered_;
+  std::vector<IdType> filtered_ids_;
 };
 
 /**
@@ -266,11 +323,18 @@ class VerticesCollection {
    * @brief Initialize the VerticesCollection.
    *
    * @param vertex_info The vertex info that describes the vertex type.
+   * @param labels The labels of the vertex.
    * @param prefix The absolute prefix.
    */
   explicit VerticesCollection(const std::shared_ptr<VertexInfo>& vertex_info,
-                              const std::string& prefix)
-      : vertex_info_(std::move(vertex_info)), prefix_(prefix) {
+                              const std::string& prefix,
+                              const bool is_filtered = false,
+                              const std::vector<IdType> filtered_ids = {})
+      : vertex_info_(std::move(vertex_info)),
+        prefix_(prefix),
+        labels_(vertex_info->GetLabels()),
+        is_filtered_(is_filtered),
+        filtered_ids_(filtered_ids) {
     // get the vertex num
     std::string base_dir;
     GAR_ASSIGN_OR_RAISE_ERROR(auto fs,
@@ -283,21 +347,104 @@ class VerticesCollection {
   }
 
   /** The iterator pointing to the first vertex. */
-  VertexIter begin() noexcept { return VertexIter(vertex_info_, prefix_, 0); }
+  VertexIter begin() noexcept {
+    return VertexIter(vertex_info_, prefix_, 0, labels_, is_filtered_,
+                      filtered_ids_);
+  }
 
   /** The iterator pointing to the past-the-end element. */
   VertexIter end() noexcept {
-    return VertexIter(vertex_info_, prefix_, vertex_num_);
+    if (is_filtered_)
+      return VertexIter(vertex_info_, prefix_, filtered_ids_.size(), labels_,
+                        is_filtered_, filtered_ids_);
+    return VertexIter(vertex_info_, prefix_, vertex_num_, labels_, 
is_filtered_,
+                      filtered_ids_);
   }
 
   /** The iterator pointing to the vertex with specific id. */
-  VertexIter find(IdType id) { return VertexIter(vertex_info_, prefix_, id); }
+  VertexIter find(IdType id) {
+    return VertexIter(vertex_info_, prefix_, id, labels_);
+  }
 
   /** Get the number of vertices in the collection. */
-  size_t size() const noexcept { return vertex_num_; }
+  size_t size() const noexcept {
+    if (is_filtered_)
+      return filtered_ids_.size();
+    else
+      return vertex_num_;
+  }
+
+  /** The vertex id list that satisfies the label filter condition. */
+  Result<std::vector<IdType>> filter(
+      std::vector<std::string> filter_labels,
+      std::vector<IdType>* new_valid_chunk = nullptr);
+
+  Result<std::vector<IdType>> filter_by_acero(
+      std::vector<std::string> filter_labels) const;
+
+  /**
+   * @brief Query vertices with a specific label
+   *
+   * @param filter_label The label to query vertices by
+   * @param graph_info A smart pointer to GraphInfo that contains details about
+   * the graph
+   * @param type The type of vertices to query
+   * @return A VerticesCollection containing all vertices that have the
+   * specified label
+   */
+  static Result<std::shared_ptr<VerticesCollection>> verticesWithLabel(
+      const std::string& filter_label,
+      const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
+
+  static Result<std::shared_ptr<VerticesCollection>> verticesWithLabelbyAcero(
+      const std::string& filter_label,
+      const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
+
+  /**
+   * @brief Query vertices with a specific label within a given collection
+   *
+   * @param filter_label The label to query vertices by
+   * @param vertices_collection The collection of vertices to search within
+   * @return A VerticesCollection containing all vertices from the specified
+   * collection that have the specified label
+   */
+  static Result<std::shared_ptr<VerticesCollection>> verticesWithLabel(
+      const std::string& filter_label,
+      const std::shared_ptr<VerticesCollection>& vertices_collection);
+
+  /**
+   * @brief Query vertices with multiple labels
+   *
+   * @param filter_labels A vector of labels to query vertices by
+   * @param graph_info A smart pointer to GraphInfo that contains details about
+   * the graph
+   * @param type The type of vertices to query
+   * @return A VerticesCollection containing all vertices that have all of the
+   * specified labels
+   */
+  static Result<std::shared_ptr<VerticesCollection>> 
verticesWithMultipleLabels(
+      const std::vector<std::string>& filter_labels,
+      const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
+
+  static Result<std::shared_ptr<VerticesCollection>>
+  verticesWithMultipleLabelsbyAcero(
+      const std::vector<std::string>& filter_labels,
+      const std::shared_ptr<GraphInfo>& graph_info, const std::string& type);
+
+  /**
+   * @brief Query vertices with multiple labels within a given collection
+   *
+   * @param filter_labels A vector of labels to query vertices by
+   * @param vertices_collection The collection of vertices to search within
+   * @return A VerticesCollection containing all vertices from the specified
+   * collection that have all of the specified labels
+   */
+  static Result<std::shared_ptr<VerticesCollection>> 
verticesWithMultipleLabels(
+      const std::vector<std::string>& filter_labels,
+      const std::shared_ptr<VerticesCollection>& vertices_collection);
 
   /**
-   * @brief Construct a VerticesCollection from graph info and vertex type.
+   * @brief Construct a VerticesCollection from graph info and vertex label.
    *
    * @param graph_info The graph info.
    * @param type The vertex type.
@@ -305,6 +452,7 @@ class VerticesCollection {
   static Result<std::shared_ptr<VerticesCollection>> Make(
       const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
     auto vertex_info = graph_info->GetVertexInfo(type);
+    auto labels = vertex_info->GetLabels();
     if (!vertex_info) {
       return Status::KeyError("The vertex ", type, " doesn't exist.");
     }
@@ -315,6 +463,10 @@ class VerticesCollection {
  private:
   std::shared_ptr<VertexInfo> vertex_info_;
   std::string prefix_;
+  std::vector<std::string> labels_;
+  bool is_filtered_;
+  std::vector<IdType> filtered_ids_;
+  std::vector<IdType> valid_chunk_;
   IdType vertex_num_;
 };
 
diff --git a/cpp/src/graphar/label.cc b/cpp/src/graphar/label.cc
new file mode 100644
index 00000000..e8b23797
--- /dev/null
+++ b/cpp/src/graphar/label.cc
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "graphar/label.h"
+
+#include <cassert>
+#include <cstring>
+#include <fstream>
+#include <iostream>
+#include <memory>
+#include <set>
+
+/// Read a parquet file by ParquetReader & get valid indices
+/// The first column_num labels are concerned.
+int read_parquet_file_and_get_valid_indices(
+    const char* parquet_filename, const int row_num, const int tot_label_num,
+    const int tested_label_num, std::vector<int> tested_label_ids,
+    const std::function<bool(bool*, int)>& IsValid, int chunk_idx,
+    int chunk_size, std::vector<int>* indices, uint64_t* bitmap,
+    const QUERY_TYPE query_type) {
+  // Create a ParquetReader instance
+  std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
+      parquet::ParquetFileReader::OpenFile(
+          parquet_filename + std::to_string(chunk_idx), false);
+
+  // Get the File MetaData
+  std::shared_ptr<parquet::FileMetaData> file_metadata =
+      parquet_reader->metadata();
+  int row_group_count = file_metadata->num_row_groups();
+  int num_columns = file_metadata->num_columns();
+
+  // Initialize the column row counts
+  std::vector<int> col_row_counts(num_columns, 0);
+  bool** value = new bool*[num_columns];
+  for (int i = 0; i < num_columns; i++) {
+    value[i] = new bool[row_num];
+  }
+
+  // Iterate over all the RowGroups in the file
+  for (int rg = 0; rg < row_group_count; ++rg) {
+    // Get the RowGroup Reader
+    std::shared_ptr<parquet::RowGroupReader> row_group_reader =
+        parquet_reader->RowGroup(rg);
+
+    int64_t values_read = 0;
+    int64_t rows_read = 0;
+    std::shared_ptr<parquet::ColumnReader> column_reader;
+
+    ARROW_UNUSED(rows_read);  // prevent warning in release build
+
+    // Read the label columns
+    for (int k = 0; k < tested_label_num; k++) {
+      int col_id = tested_label_ids[k];
+      // Get the Column Reader for the Bool column
+      column_reader = row_group_reader->Column(col_id);
+      parquet::BoolReader* bool_reader =
+          static_cast<parquet::BoolReader*>(column_reader.get());
+      // Read all the rows in the column
+      while (bool_reader->HasNext()) {
+        // Read BATCH_SIZE values at a time. The number of rows read is
+        // returned. values_read contains the number of non-null rows
+
+        rows_read = bool_reader->ReadBatch(BATCH_SIZE, nullptr, nullptr,
+                                           value[k] + col_row_counts[col_id],
+                                           &values_read);
+
+        // There are no NULL values in the rows written
+        col_row_counts[col_id] += rows_read;
+      }
+    }
+  }
+  const int kTotLabelNum = tot_label_num;
+  bool state[kTotLabelNum];
+  int count = 0;
+  int offset = chunk_idx * chunk_size;
+  for (int i = 0; i < row_num; i++) {
+    for (int j = 0; j < tested_label_num; j++) {
+      state[j] = value[j][i];
+    }
+    if (IsValid(state, tested_label_num)) {
+      count++;
+      if (query_type == QUERY_TYPE::INDEX)
+
+        indices->push_back(i + offset);
+      else if (query_type == QUERY_TYPE::BITMAP)
+        SetBitmap(bitmap, i);
+    }
+  }
+
+  // destroy the allocated space
+  for (int i = 0; i < num_columns; i++) {
+    delete[] value[i];
+  }
+  delete[] value;
+
+  return count;
+}
diff --git a/cpp/src/graphar/label.h b/cpp/src/graphar/label.h
new file mode 100644
index 00000000..145312e4
--- /dev/null
+++ b/cpp/src/graphar/label.h
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef CPP_SRC_GRAPHAR_LABEL_H_
+#define CPP_SRC_GRAPHAR_LABEL_H_
+
+#include <arrow/io/file.h>
+#include <arrow/util/logging.h>
+#include <parquet/api/reader.h>
+#include <parquet/api/writer.h>
+#include <parquet/properties.h>
+
+#include <iostream>
+#include <set>
+#include <vector>
+
+using parquet::ConvertedType;
+using parquet::Encoding;
+using parquet::Repetition;
+using parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+constexpr int BATCH_SIZE = 1024;  // the batch size
+
+/// The query type
+enum QUERY_TYPE {
+  COUNT,    // return the number of valid vertices
+  INDEX,    // return the indices of valid vertices
+  BITMAP,   // return the bitmap of valid vertices
+  ADAPTIVE  // adaptively return indices or bitmap
+};
+
+/// Set bit
+static inline void SetBitmap(uint64_t* bitmap, const int index) {
+  bitmap[index >> 6] |= (1ULL << (index & 63));
+}
+
+int read_parquet_file_and_get_valid_indices(
+    const char* parquet_filename, const int row_num, const int tot_label_num,
+    const int tested_label_num, std::vector<int> tested_label_ids,
+    const std::function<bool(bool*, int)>& IsValid, int chunk_idx,
+    int chunk_size, std::vector<int>* indices = nullptr,
+    uint64_t* bitmap = nullptr, const QUERY_TYPE query_type = COUNT);
+
+#endif  // CPP_SRC_GRAPHAR_LABEL_H_


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@graphar.apache.org
For additional commands, e-mail: commits-h...@graphar.apache.org


Reply via email to