Elssky commented on code in PR #654:
URL: https://github.com/apache/incubator-graphar/pull/654#discussion_r1831964768


##########
cpp/benchmarks/benchmark_util.h:
##########


Review Comment:
   use ldbc graph for benchmark label filtering efficiency



##########
cpp/benchmarks/label_filter_benchmark.cc:
##########


Review Comment:
   5 benchmark method: 
   1. single label filtering [1. graphar 2. Acero]
   2. multi label filtering [1. graphar 2. Acero]
   3. label filtering based on another filtering result
   
   



##########
cpp/src/graphar/high-level/graph_reader.cc:
##########
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::label(const std::string& label) noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  column = util::GetArrowColumnByName(chunk_table, label);
+  if (column != nullptr) {
+    auto array = util::GetArrowArrayByChunkIndex(column, 0);
+    auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+    return bool_array->Value(0);
+  }
+  return Status::KeyError("label with name ", label,
+                          " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  std::vector<std::string> vertex_label;
+  if (is_filtered_)
+    label_reader_.seek(filtered_ids_[cur_offset_]);
+  else
+    label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  for (auto label : labels_) {
+    column = util::GetArrowColumnByName(chunk_table, label);
+    if (column != nullptr) {
+      auto array = util::GetArrowArrayByChunkIndex(column, 0);
+      auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+      if (bool_array->Value(0)) {
+        vertex_label.push_back(label);
+      }
+    }
+  }
+  return vertex_label;
+}
+
+static inline bool IsValid(bool* state, int column_number) {
+  for (int i = 0; i < column_number; ++i) {
+    // AND case
+    if (!state[i])
+      return false;
+    // OR case
+    // if (state[i]) return true;
+  }
+  // AND case
+  return true;
+  // OR case
+  // return false;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter(

Review Comment:
   Read each chunk to find all vertices that meet the label filter 
conditions,read_parquet_file_and_get_valid_indices() is called internally



##########
cpp/src/graphar/high-level/graph_reader.cc:
##########
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::label(const std::string& label) noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  column = util::GetArrowColumnByName(chunk_table, label);
+  if (column != nullptr) {
+    auto array = util::GetArrowArrayByChunkIndex(column, 0);
+    auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+    return bool_array->Value(0);
+  }
+  return Status::KeyError("label with name ", label,
+                          " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  std::vector<std::string> vertex_label;
+  if (is_filtered_)
+    label_reader_.seek(filtered_ids_[cur_offset_]);
+  else
+    label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  for (auto label : labels_) {
+    column = util::GetArrowColumnByName(chunk_table, label);
+    if (column != nullptr) {
+      auto array = util::GetArrowArrayByChunkIndex(column, 0);
+      auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+      if (bool_array->Value(0)) {
+        vertex_label.push_back(label);
+      }
+    }
+  }
+  return vertex_label;
+}
+
+static inline bool IsValid(bool* state, int column_number) {
+  for (int i = 0; i < column_number; ++i) {
+    // AND case
+    if (!state[i])
+      return false;
+    // OR case
+    // if (state[i]) return true;
+  }
+  // AND case
+  return true;
+  // OR case
+  // return false;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter(
+    std::vector<std::string> filter_labels,
+    std::vector<IdType>* new_valid_chunk) {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+  const int TOT_LABEL_NUM = labels_.size();
+  const int TESTED_LABEL_NUM = filter_labels.size();
+  std::vector<int> tested_label_ids;
+
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  if (tested_label_ids.empty())
+    return Status::KeyError(
+        "query label"
+        " does not exist in the vertex.");
+
+  uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1];
+  memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
+  int total_count = 0;
+  int row_num;
+
+  if (is_filtered_) {
+    for (int chunk_idx : valid_chunk_) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0 && new_valid_chunk != nullptr)
+        new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  } else {
+    for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
+         ++chunk_idx) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0)
+        valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  delete[] bitmap;
+
+  return indices64;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
+    std::vector<std::string> filter_labels) const {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+
+  std::vector<int> tested_label_ids;
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  int total_count = 0;
+  int row_num;
+  std::vector<std::shared_ptr<Expression>> filters;
+  std::shared_ptr<Expression> combined_filter = nullptr;
+
+  for (const auto& label : filter_labels) {
+    filters.emplace_back(
+        graphar::_Equal(graphar::_Property(label), graphar::_Literal(true)));
+  }
+
+  for (const auto& filter : filters) {
+    if (!combined_filter) {
+      combined_filter = graphar::_And(filter, filter);
+    } else {
+      combined_filter = graphar::_And(combined_filter, filter);
+    }
+  }
+
+  auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
+      vertex_info_, labels_, prefix_, {});
+  auto filter_reader = maybe_filter_reader.value();
+  filter_reader->Filter(combined_filter);
+  for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) {
+    auto filter_result = filter_reader->GetLabelChunk();
+    auto filter_table = filter_result.value();
+    total_count += filter_table->num_rows();
+    filter_reader->next_chunk();
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  return indices64;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabelbyAcero(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter_by_acero({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+    const std::string& filter_label,
+    const std::shared_ptr<VerticesCollection>& vertices_collection) {
+  auto new_vertices_collection = std::make_shared<VerticesCollection>(
+      vertices_collection->vertex_info_, vertices_collection->prefix_);
+  auto filtered_ids =
+      new_vertices_collection
+          ->filter({filter_label}, &new_vertices_collection->valid_chunk_)
+          .value();
+  if (vertices_collection->is_filtered_) {
+    std::unordered_set<IdType> origin_set(
+        vertices_collection->filtered_ids_.begin(),
+        vertices_collection->filtered_ids_.end());
+    std::unordered_set<int> intersection;
+    for (int num : filtered_ids) {
+      if (origin_set.count(num)) {
+        intersection.insert(num);
+      }
+    }
+    filtered_ids =
+        std::vector<IdType>(intersection.begin(), intersection.end());
+
+    new_vertices_collection->is_filtered_ = true;
+  }
+  new_vertices_collection->filtered_ids_ = filtered_ids;
+
+  return new_vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabels(

Review Comment:
   Multi label filtering API



##########
cpp/src/graphar/high-level/graph_reader.cc:
##########
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::label(const std::string& label) noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  column = util::GetArrowColumnByName(chunk_table, label);
+  if (column != nullptr) {
+    auto array = util::GetArrowArrayByChunkIndex(column, 0);
+    auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+    return bool_array->Value(0);
+  }
+  return Status::KeyError("label with name ", label,
+                          " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  std::vector<std::string> vertex_label;
+  if (is_filtered_)
+    label_reader_.seek(filtered_ids_[cur_offset_]);
+  else
+    label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  for (auto label : labels_) {
+    column = util::GetArrowColumnByName(chunk_table, label);
+    if (column != nullptr) {
+      auto array = util::GetArrowArrayByChunkIndex(column, 0);
+      auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+      if (bool_array->Value(0)) {
+        vertex_label.push_back(label);
+      }
+    }
+  }
+  return vertex_label;
+}
+
+static inline bool IsValid(bool* state, int column_number) {
+  for (int i = 0; i < column_number; ++i) {
+    // AND case
+    if (!state[i])
+      return false;
+    // OR case
+    // if (state[i]) return true;
+  }
+  // AND case
+  return true;
+  // OR case
+  // return false;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter(
+    std::vector<std::string> filter_labels,
+    std::vector<IdType>* new_valid_chunk) {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+  const int TOT_LABEL_NUM = labels_.size();
+  const int TESTED_LABEL_NUM = filter_labels.size();
+  std::vector<int> tested_label_ids;
+
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  if (tested_label_ids.empty())
+    return Status::KeyError(
+        "query label"
+        " does not exist in the vertex.");
+
+  uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1];
+  memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
+  int total_count = 0;
+  int row_num;
+
+  if (is_filtered_) {
+    for (int chunk_idx : valid_chunk_) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0 && new_valid_chunk != nullptr)
+        new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  } else {
+    for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
+         ++chunk_idx) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0)
+        valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  delete[] bitmap;
+
+  return indices64;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter_by_acero(

Review Comment:
   Read each chunk to find all vertices that meet the label filter conditions,
   Acero filter() is called (Method used for comparison)



##########
cpp/src/graphar/high-level/graph_reader.cc:
##########
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::label(const std::string& label) noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  column = util::GetArrowColumnByName(chunk_table, label);
+  if (column != nullptr) {
+    auto array = util::GetArrowArrayByChunkIndex(column, 0);
+    auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+    return bool_array->Value(0);
+  }
+  return Status::KeyError("label with name ", label,
+                          " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  std::vector<std::string> vertex_label;
+  if (is_filtered_)
+    label_reader_.seek(filtered_ids_[cur_offset_]);
+  else
+    label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  for (auto label : labels_) {
+    column = util::GetArrowColumnByName(chunk_table, label);
+    if (column != nullptr) {
+      auto array = util::GetArrowArrayByChunkIndex(column, 0);
+      auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+      if (bool_array->Value(0)) {
+        vertex_label.push_back(label);
+      }
+    }
+  }
+  return vertex_label;
+}
+
+static inline bool IsValid(bool* state, int column_number) {
+  for (int i = 0; i < column_number; ++i) {
+    // AND case
+    if (!state[i])
+      return false;
+    // OR case
+    // if (state[i]) return true;
+  }
+  // AND case
+  return true;
+  // OR case
+  // return false;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter(
+    std::vector<std::string> filter_labels,
+    std::vector<IdType>* new_valid_chunk) {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+  const int TOT_LABEL_NUM = labels_.size();
+  const int TESTED_LABEL_NUM = filter_labels.size();
+  std::vector<int> tested_label_ids;
+
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  if (tested_label_ids.empty())
+    return Status::KeyError(
+        "query label"
+        " does not exist in the vertex.");
+
+  uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1];
+  memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
+  int total_count = 0;
+  int row_num;
+
+  if (is_filtered_) {
+    for (int chunk_idx : valid_chunk_) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0 && new_valid_chunk != nullptr)
+        new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  } else {
+    for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
+         ++chunk_idx) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0)
+        valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  delete[] bitmap;
+
+  return indices64;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
+    std::vector<std::string> filter_labels) const {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+
+  std::vector<int> tested_label_ids;
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  int total_count = 0;
+  int row_num;
+  std::vector<std::shared_ptr<Expression>> filters;
+  std::shared_ptr<Expression> combined_filter = nullptr;
+
+  for (const auto& label : filter_labels) {
+    filters.emplace_back(
+        graphar::_Equal(graphar::_Property(label), graphar::_Literal(true)));
+  }
+
+  for (const auto& filter : filters) {
+    if (!combined_filter) {
+      combined_filter = graphar::_And(filter, filter);
+    } else {
+      combined_filter = graphar::_And(combined_filter, filter);
+    }
+  }
+
+  auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
+      vertex_info_, labels_, prefix_, {});
+  auto filter_reader = maybe_filter_reader.value();
+  filter_reader->Filter(combined_filter);
+  for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) {
+    auto filter_result = filter_reader->GetLabelChunk();
+    auto filter_table = filter_result.value();
+    total_count += filter_table->num_rows();
+    filter_reader->next_chunk();
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  return indices64;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabelbyAcero(

Review Comment:
   Single label filtering API(Acero)



##########
cpp/src/graphar/high-level/graph_reader.cc:
##########
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::label(const std::string& label) noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  column = util::GetArrowColumnByName(chunk_table, label);
+  if (column != nullptr) {
+    auto array = util::GetArrowArrayByChunkIndex(column, 0);
+    auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+    return bool_array->Value(0);
+  }
+  return Status::KeyError("label with name ", label,
+                          " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  std::vector<std::string> vertex_label;
+  if (is_filtered_)
+    label_reader_.seek(filtered_ids_[cur_offset_]);
+  else
+    label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  for (auto label : labels_) {
+    column = util::GetArrowColumnByName(chunk_table, label);
+    if (column != nullptr) {
+      auto array = util::GetArrowArrayByChunkIndex(column, 0);
+      auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+      if (bool_array->Value(0)) {
+        vertex_label.push_back(label);
+      }
+    }
+  }
+  return vertex_label;
+}
+
+static inline bool IsValid(bool* state, int column_number) {
+  for (int i = 0; i < column_number; ++i) {
+    // AND case
+    if (!state[i])
+      return false;
+    // OR case
+    // if (state[i]) return true;
+  }
+  // AND case
+  return true;
+  // OR case
+  // return false;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter(
+    std::vector<std::string> filter_labels,
+    std::vector<IdType>* new_valid_chunk) {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+  const int TOT_LABEL_NUM = labels_.size();
+  const int TESTED_LABEL_NUM = filter_labels.size();
+  std::vector<int> tested_label_ids;
+
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  if (tested_label_ids.empty())
+    return Status::KeyError(
+        "query label"
+        " does not exist in the vertex.");
+
+  uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1];
+  memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
+  int total_count = 0;
+  int row_num;
+
+  if (is_filtered_) {
+    for (int chunk_idx : valid_chunk_) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0 && new_valid_chunk != nullptr)
+        new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  } else {
+    for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
+         ++chunk_idx) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0)
+        valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  delete[] bitmap;
+
+  return indices64;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
+    std::vector<std::string> filter_labels) const {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+
+  std::vector<int> tested_label_ids;
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  int total_count = 0;
+  int row_num;
+  std::vector<std::shared_ptr<Expression>> filters;
+  std::shared_ptr<Expression> combined_filter = nullptr;
+
+  for (const auto& label : filter_labels) {
+    filters.emplace_back(
+        graphar::_Equal(graphar::_Property(label), graphar::_Literal(true)));
+  }
+
+  for (const auto& filter : filters) {
+    if (!combined_filter) {
+      combined_filter = graphar::_And(filter, filter);
+    } else {
+      combined_filter = graphar::_And(combined_filter, filter);
+    }
+  }
+
+  auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
+      vertex_info_, labels_, prefix_, {});
+  auto filter_reader = maybe_filter_reader.value();
+  filter_reader->Filter(combined_filter);
+  for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) {
+    auto filter_result = filter_reader->GetLabelChunk();
+    auto filter_table = filter_result.value();
+    total_count += filter_table->num_rows();
+    filter_reader->next_chunk();
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  return indices64;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabelbyAcero(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter_by_acero({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+    const std::string& filter_label,
+    const std::shared_ptr<VerticesCollection>& vertices_collection) {
+  auto new_vertices_collection = std::make_shared<VerticesCollection>(
+      vertices_collection->vertex_info_, vertices_collection->prefix_);
+  auto filtered_ids =
+      new_vertices_collection
+          ->filter({filter_label}, &new_vertices_collection->valid_chunk_)
+          .value();
+  if (vertices_collection->is_filtered_) {
+    std::unordered_set<IdType> origin_set(
+        vertices_collection->filtered_ids_.begin(),
+        vertices_collection->filtered_ids_.end());
+    std::unordered_set<int> intersection;
+    for (int num : filtered_ids) {
+      if (origin_set.count(num)) {
+        intersection.insert(num);
+      }
+    }
+    filtered_ids =
+        std::vector<IdType>(intersection.begin(), intersection.end());
+
+    new_vertices_collection->is_filtered_ = true;
+  }
+  new_vertices_collection->filtered_ids_ = filtered_ids;
+
+  return new_vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabels(
+    const std::vector<std::string>& filter_labels,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter(filter_labels).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabelsbyAcero(
+    const std::vector<std::string>& filter_labels,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter_by_acero(filter_labels).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabels(

Review Comment:
   Multi label filtering API, input is VerticesCollection



##########
cpp/src/graphar/high-level/graph_reader.cc:
##########
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::label(const std::string& label) noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  column = util::GetArrowColumnByName(chunk_table, label);
+  if (column != nullptr) {
+    auto array = util::GetArrowArrayByChunkIndex(column, 0);
+    auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+    return bool_array->Value(0);
+  }
+  return Status::KeyError("label with name ", label,
+                          " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {

Review Comment:
   get all labels of this vertex



##########
cpp/src/graphar/high-level/graph_reader.cc:
##########
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::label(const std::string& label) noexcept {

Review Comment:
    determine whether this vertex have the specfic label



##########
cpp/examples/label_filtering_example.cc:
##########


Review Comment:
   this example shows how to use label filtering and correctness.



##########
cpp/src/graphar/high-level/graph_reader.cc:
##########
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::label(const std::string& label) noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  column = util::GetArrowColumnByName(chunk_table, label);
+  if (column != nullptr) {
+    auto array = util::GetArrowArrayByChunkIndex(column, 0);
+    auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+    return bool_array->Value(0);
+  }
+  return Status::KeyError("label with name ", label,
+                          " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  std::vector<std::string> vertex_label;
+  if (is_filtered_)
+    label_reader_.seek(filtered_ids_[cur_offset_]);
+  else
+    label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  for (auto label : labels_) {
+    column = util::GetArrowColumnByName(chunk_table, label);
+    if (column != nullptr) {
+      auto array = util::GetArrowArrayByChunkIndex(column, 0);
+      auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+      if (bool_array->Value(0)) {
+        vertex_label.push_back(label);
+      }
+    }
+  }
+  return vertex_label;
+}
+
+static inline bool IsValid(bool* state, int column_number) {
+  for (int i = 0; i < column_number; ++i) {
+    // AND case
+    if (!state[i])
+      return false;
+    // OR case
+    // if (state[i]) return true;
+  }
+  // AND case
+  return true;
+  // OR case
+  // return false;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter(
+    std::vector<std::string> filter_labels,
+    std::vector<IdType>* new_valid_chunk) {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+  const int TOT_LABEL_NUM = labels_.size();
+  const int TESTED_LABEL_NUM = filter_labels.size();
+  std::vector<int> tested_label_ids;
+
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  if (tested_label_ids.empty())
+    return Status::KeyError(
+        "query label"
+        " does not exist in the vertex.");
+
+  uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1];
+  memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
+  int total_count = 0;
+  int row_num;
+
+  if (is_filtered_) {
+    for (int chunk_idx : valid_chunk_) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0 && new_valid_chunk != nullptr)
+        new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  } else {
+    for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
+         ++chunk_idx) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0)
+        valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  delete[] bitmap;
+
+  return indices64;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
+    std::vector<std::string> filter_labels) const {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+
+  std::vector<int> tested_label_ids;
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  int total_count = 0;
+  int row_num;
+  std::vector<std::shared_ptr<Expression>> filters;
+  std::shared_ptr<Expression> combined_filter = nullptr;
+
+  for (const auto& label : filter_labels) {
+    filters.emplace_back(
+        graphar::_Equal(graphar::_Property(label), graphar::_Literal(true)));
+  }
+
+  for (const auto& filter : filters) {
+    if (!combined_filter) {
+      combined_filter = graphar::_And(filter, filter);
+    } else {
+      combined_filter = graphar::_And(combined_filter, filter);
+    }
+  }
+
+  auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
+      vertex_info_, labels_, prefix_, {});
+  auto filter_reader = maybe_filter_reader.value();
+  filter_reader->Filter(combined_filter);
+  for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) {
+    auto filter_result = filter_reader->GetLabelChunk();
+    auto filter_table = filter_result.value();
+    total_count += filter_table->num_rows();
+    filter_reader->next_chunk();
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  return indices64;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(

Review Comment:
   Single label filtering API



##########
cpp/src/graphar/high-level/graph_reader.cc:
##########
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::label(const std::string& label) noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  column = util::GetArrowColumnByName(chunk_table, label);
+  if (column != nullptr) {
+    auto array = util::GetArrowArrayByChunkIndex(column, 0);
+    auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+    return bool_array->Value(0);
+  }
+  return Status::KeyError("label with name ", label,
+                          " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  std::vector<std::string> vertex_label;
+  if (is_filtered_)
+    label_reader_.seek(filtered_ids_[cur_offset_]);
+  else
+    label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  for (auto label : labels_) {
+    column = util::GetArrowColumnByName(chunk_table, label);
+    if (column != nullptr) {
+      auto array = util::GetArrowArrayByChunkIndex(column, 0);
+      auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+      if (bool_array->Value(0)) {
+        vertex_label.push_back(label);
+      }
+    }
+  }
+  return vertex_label;
+}
+
+static inline bool IsValid(bool* state, int column_number) {
+  for (int i = 0; i < column_number; ++i) {
+    // AND case
+    if (!state[i])
+      return false;
+    // OR case
+    // if (state[i]) return true;
+  }
+  // AND case
+  return true;
+  // OR case
+  // return false;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter(
+    std::vector<std::string> filter_labels,
+    std::vector<IdType>* new_valid_chunk) {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+  const int TOT_LABEL_NUM = labels_.size();
+  const int TESTED_LABEL_NUM = filter_labels.size();
+  std::vector<int> tested_label_ids;
+
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  if (tested_label_ids.empty())
+    return Status::KeyError(
+        "query label"
+        " does not exist in the vertex.");
+
+  uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1];
+  memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
+  int total_count = 0;
+  int row_num;
+
+  if (is_filtered_) {
+    for (int chunk_idx : valid_chunk_) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0 && new_valid_chunk != nullptr)
+        new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  } else {
+    for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
+         ++chunk_idx) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0)
+        valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  delete[] bitmap;
+
+  return indices64;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
+    std::vector<std::string> filter_labels) const {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+
+  std::vector<int> tested_label_ids;
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  int total_count = 0;
+  int row_num;
+  std::vector<std::shared_ptr<Expression>> filters;
+  std::shared_ptr<Expression> combined_filter = nullptr;
+
+  for (const auto& label : filter_labels) {
+    filters.emplace_back(
+        graphar::_Equal(graphar::_Property(label), graphar::_Literal(true)));
+  }
+
+  for (const auto& filter : filters) {
+    if (!combined_filter) {
+      combined_filter = graphar::_And(filter, filter);
+    } else {
+      combined_filter = graphar::_And(combined_filter, filter);
+    }
+  }
+
+  auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
+      vertex_info_, labels_, prefix_, {});
+  auto filter_reader = maybe_filter_reader.value();
+  filter_reader->Filter(combined_filter);
+  for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) {
+    auto filter_result = filter_reader->GetLabelChunk();
+    auto filter_table = filter_result.value();
+    total_count += filter_table->num_rows();
+    filter_reader->next_chunk();
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  return indices64;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabelbyAcero(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter_by_acero({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>

Review Comment:
   Single label filtering API, input is VerticesCollection ( a result from 
another filtering)



##########
cpp/src/graphar/high-level/graph_reader.cc:
##########
@@ -94,6 +100,290 @@ Vertex::Vertex(IdType id,
   }
 }
 
+Result<bool> VertexIter::label(const std::string& label) noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  column = util::GetArrowColumnByName(chunk_table, label);
+  if (column != nullptr) {
+    auto array = util::GetArrowArrayByChunkIndex(column, 0);
+    auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+    return bool_array->Value(0);
+  }
+  return Status::KeyError("label with name ", label,
+                          " does not exist in the vertex.");
+}
+
+Result<std::vector<std::string>> VertexIter::label() noexcept {
+  std::shared_ptr<arrow::ChunkedArray> column(nullptr);
+  std::vector<std::string> vertex_label;
+  if (is_filtered_)
+    label_reader_.seek(filtered_ids_[cur_offset_]);
+  else
+    label_reader_.seek(cur_offset_);
+  GAR_ASSIGN_OR_RAISE(auto chunk_table, label_reader_.GetLabelChunk());
+  for (auto label : labels_) {
+    column = util::GetArrowColumnByName(chunk_table, label);
+    if (column != nullptr) {
+      auto array = util::GetArrowArrayByChunkIndex(column, 0);
+      auto bool_array = std::dynamic_pointer_cast<arrow::BooleanArray>(array);
+      if (bool_array->Value(0)) {
+        vertex_label.push_back(label);
+      }
+    }
+  }
+  return vertex_label;
+}
+
+static inline bool IsValid(bool* state, int column_number) {
+  for (int i = 0; i < column_number; ++i) {
+    // AND case
+    if (!state[i])
+      return false;
+    // OR case
+    // if (state[i]) return true;
+  }
+  // AND case
+  return true;
+  // OR case
+  // return false;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter(
+    std::vector<std::string> filter_labels,
+    std::vector<IdType>* new_valid_chunk) {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+  const int TOT_LABEL_NUM = labels_.size();
+  const int TESTED_LABEL_NUM = filter_labels.size();
+  std::vector<int> tested_label_ids;
+
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  if (tested_label_ids.empty())
+    return Status::KeyError(
+        "query label"
+        " does not exist in the vertex.");
+
+  uint64_t* bitmap = new uint64_t[TOT_ROWS_NUM / 64 + 1];
+  memset(bitmap, 0, sizeof(uint64_t) * (TOT_ROWS_NUM / 64 + 1));
+  int total_count = 0;
+  int row_num;
+
+  if (is_filtered_) {
+    for (int chunk_idx : valid_chunk_) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0 && new_valid_chunk != nullptr)
+        new_valid_chunk->emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  } else {
+    for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM;
+         ++chunk_idx) {
+      row_num = std::min(CHUNK_SIZE, TOT_ROWS_NUM - chunk_idx * CHUNK_SIZE);
+      std::string new_filename =
+          prefix_ + vertex_info_->GetPrefix() + "labels/chunk";
+      int count = read_parquet_file_and_get_valid_indices(
+          new_filename.c_str(), row_num, TOT_LABEL_NUM, TESTED_LABEL_NUM,
+          tested_label_ids, IsValid, chunk_idx, CHUNK_SIZE, &indices, bitmap,
+          QUERY_TYPE::INDEX);
+      if (count != 0)
+        valid_chunk_.emplace_back(static_cast<IdType>(chunk_idx));
+    }
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  delete[] bitmap;
+
+  return indices64;
+}
+
+Result<std::vector<IdType>> VerticesCollection::filter_by_acero(
+    std::vector<std::string> filter_labels) const {
+  std::vector<int> indices;
+  const int TOT_ROWS_NUM = vertex_num_;
+  const int CHUNK_SIZE = vertex_info_->GetChunkSize();
+
+  std::vector<int> tested_label_ids;
+  for (const auto& filter_label : filter_labels) {
+    auto it = std::find(labels_.begin(), labels_.end(), filter_label);
+    if (it != labels_.end()) {
+      tested_label_ids.push_back(std::distance(labels_.begin(), it));
+    }
+  }
+  int total_count = 0;
+  int row_num;
+  std::vector<std::shared_ptr<Expression>> filters;
+  std::shared_ptr<Expression> combined_filter = nullptr;
+
+  for (const auto& label : filter_labels) {
+    filters.emplace_back(
+        graphar::_Equal(graphar::_Property(label), graphar::_Literal(true)));
+  }
+
+  for (const auto& filter : filters) {
+    if (!combined_filter) {
+      combined_filter = graphar::_And(filter, filter);
+    } else {
+      combined_filter = graphar::_And(combined_filter, filter);
+    }
+  }
+
+  auto maybe_filter_reader = graphar::VertexPropertyArrowChunkReader::Make(
+      vertex_info_, labels_, prefix_, {});
+  auto filter_reader = maybe_filter_reader.value();
+  filter_reader->Filter(combined_filter);
+  for (int chunk_idx = 0; chunk_idx * CHUNK_SIZE < TOT_ROWS_NUM; ++chunk_idx) {
+    auto filter_result = filter_reader->GetLabelChunk();
+    auto filter_table = filter_result.value();
+    total_count += filter_table->num_rows();
+    filter_reader->next_chunk();
+  }
+  // std::cout << "Total valid count: " << total_count << std::endl;
+  std::vector<int64_t> indices64;
+
+  for (int value : indices) {
+    indices64.push_back(static_cast<int64_t>(value));
+  }
+
+  return indices64;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabelbyAcero(
+    const std::string& filter_label,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter_by_acero({filter_label}).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithLabel(
+    const std::string& filter_label,
+    const std::shared_ptr<VerticesCollection>& vertices_collection) {
+  auto new_vertices_collection = std::make_shared<VerticesCollection>(
+      vertices_collection->vertex_info_, vertices_collection->prefix_);
+  auto filtered_ids =
+      new_vertices_collection
+          ->filter({filter_label}, &new_vertices_collection->valid_chunk_)
+          .value();
+  if (vertices_collection->is_filtered_) {
+    std::unordered_set<IdType> origin_set(
+        vertices_collection->filtered_ids_.begin(),
+        vertices_collection->filtered_ids_.end());
+    std::unordered_set<int> intersection;
+    for (int num : filtered_ids) {
+      if (origin_set.count(num)) {
+        intersection.insert(num);
+      }
+    }
+    filtered_ids =
+        std::vector<IdType>(intersection.begin(), intersection.end());
+
+    new_vertices_collection->is_filtered_ = true;
+  }
+  new_vertices_collection->filtered_ids_ = filtered_ids;
+
+  return new_vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabels(
+    const std::vector<std::string>& filter_labels,
+    const std::shared_ptr<GraphInfo>& graph_info, const std::string& type) {
+  auto prefix = graph_info->GetPrefix();
+  auto vertex_info = graph_info->GetVertexInfo(type);
+  auto labels = vertex_info->GetLabels();
+  auto vertices_collection =
+      std::make_shared<VerticesCollection>(vertex_info, prefix);
+  vertices_collection->filtered_ids_ =
+      vertices_collection->filter(filter_labels).value();
+  vertices_collection->is_filtered_ = true;
+  return vertices_collection;
+}
+
+Result<std::shared_ptr<VerticesCollection>>
+VerticesCollection::verticesWithMultipleLabelsbyAcero(

Review Comment:
   Multi label filtering API (Acero)



##########
cpp/src/graphar/high-level/graph_reader.h:
##########
@@ -172,40 +179,80 @@ class VertexIter {
    * @param offset The current offset of the readers.
    */
   explicit VertexIter(const std::shared_ptr<VertexInfo>& vertex_info,
-                      const std::string& prefix, IdType offset) noexcept {
+                      const std::string& prefix, IdType offset,
+                      const std::vector<std::string>& labels,
+                      const bool& is_filtered = false,
+                      const std::vector<IdType>& filtered_ids = {}) noexcept {
+    if (!labels.empty()) {
+      labels_ = labels;
+      label_reader_ =
+          VertexPropertyArrowChunkReader(vertex_info, labels, prefix);
+    }
     for (const auto& pg : vertex_info->GetPropertyGroups()) {
       readers_.emplace_back(vertex_info, pg, prefix);
     }
+    is_filtered_ = is_filtered;
+    filtered_ids_ = filtered_ids;

Review Comment:
   the vector of vertex ids after filtering



##########
cpp/src/graphar/high-level/graph_reader.h:
##########
@@ -172,40 +179,80 @@ class VertexIter {
    * @param offset The current offset of the readers.
    */
   explicit VertexIter(const std::shared_ptr<VertexInfo>& vertex_info,
-                      const std::string& prefix, IdType offset) noexcept {
+                      const std::string& prefix, IdType offset,
+                      const std::vector<std::string>& labels,
+                      const bool& is_filtered = false,
+                      const std::vector<IdType>& filtered_ids = {}) noexcept {
+    if (!labels.empty()) {

Review Comment:
   label_reader is used for reading label chunks



##########
cpp/src/graphar/high-level/graph_reader.h:
##########
@@ -172,40 +179,80 @@ class VertexIter {
    * @param offset The current offset of the readers.
    */
   explicit VertexIter(const std::shared_ptr<VertexInfo>& vertex_info,
-                      const std::string& prefix, IdType offset) noexcept {
+                      const std::string& prefix, IdType offset,
+                      const std::vector<std::string>& labels,
+                      const bool& is_filtered = false,
+                      const std::vector<IdType>& filtered_ids = {}) noexcept {
+    if (!labels.empty()) {
+      labels_ = labels;
+      label_reader_ =
+          VertexPropertyArrowChunkReader(vertex_info, labels, prefix);
+    }
     for (const auto& pg : vertex_info->GetPropertyGroups()) {
       readers_.emplace_back(vertex_info, pg, prefix);
     }
+    is_filtered_ = is_filtered;

Review Comment:
   is_filtered_ is used for mark the collection/iter is or not a result from 
filtering



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@graphar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@graphar.apache.org
For additional commands, e-mail: commits-h...@graphar.apache.org

Reply via email to