This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 42d3156881d8740ba0aa1bfa8a20a6fdc0a22846
Author: Zoltan Borok-Nagy <[email protected]>
AuthorDate: Fri Aug 23 16:42:03 2024 +0200

    IMPALA-13325: Use RowBatch::CopyRows in IcebergDeleteNode
    
    Typically there are much more data records than delete records in a
    healthy Iceberg table. This means it is suboptimal to copy probe rows
    one by one in the IcebergDeleteNode. With this patch we switch to
    RowBatch::CopyRows method to copy tuple rows in batches.
    
    We also switch to an iterator based approach when we test the
    deleted rows which seem to be more efficient than ContainsBulk().
    
    I measured the Avg Time of DELETE EVENTS ICEBERG DELETE operator.
    
    Local Measurements
    +--------------+----------------+--------------------+--------------------+
    | Data records | Delete records | Old implementation | New implementation |
    +--------------+----------------+--------------------+--------------------+
    | 2 Billion    | 1 Billion      | 15.82s             | 14.73s             |
    | 1.2 Billion  | 70 Million     | 5.64s              | 2.4s               |
    +--------------+----------------+--------------------+--------------------+
    
    Large scale measurements
    1 Coordinator, 10 executors.
    +--------------+----------------+--------------------+--------------------+
    | Data records | Delete records | Old implementation | New implementation |
    +--------------+----------------+--------------------+--------------------+
    | 405 Billion  | 68.5 Billion   | 87.30s             | 54.76s             |
    | 301 Billion  | 18 Billion     | 67.38s             | 25.31s             |
    +--------------+----------------+--------------------+--------------------+
    
    1 Coordinator, 40 executors.
    +--------------+----------------+--------------------+--------------------+
    | Data records | Delete records | Old implementation | New implementation |
    +--------------+----------------+--------------------+--------------------+
    | 405 Billion  | 68.5 Billion   | 23.18s             | 14.72s             |
    | 301 Billion  | 18 Billion     | 16.52s             | 6.09s              |
    +--------------+----------------+--------------------+--------------------+
    
    Testing
     * added unit tests for the new methods of RoaringBitmap
    
    Change-Id: I46487fefa300027e9df6cd7fb36c78af01dd56c1
    Reviewed-on: http://gerrit.cloudera.org:8080/21718
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/iceberg-delete-node.cc | 119 ++++++++++++++++++-------------------
 be/src/exec/iceberg-delete-node.h  |  18 +++---
 be/src/runtime/row-batch.h         |  15 ++++-
 be/src/util/roaring-bitmap-test.cc | 117 +++++++++++++++++++++++++++++++++++-
 be/src/util/roaring-bitmap.h       | 108 +++++++++++++++++++++++++++++++++
 5 files changed, 303 insertions(+), 74 deletions(-)

diff --git a/be/src/exec/iceberg-delete-node.cc 
b/be/src/exec/iceberg-delete-node.cc
index 8aabbc133..d99c6f730 100644
--- a/be/src/exec/iceberg-delete-node.cc
+++ b/be/src/exec/iceberg-delete-node.cc
@@ -168,7 +168,6 @@ Status IcebergDeleteNode::NextProbeRowBatchFromChild(
       return Status::OK();
     }
     if (probe_side_eos_) {
-      current_probe_row_ = nullptr;
       probe_batch_pos_ = -1;
       *eos = true;
       return Status::OK();
@@ -218,8 +217,7 @@ Status IcebergDeleteNode::GetNext(RuntimeState* state, 
RowBatch* out_batch, bool
       case ProbeState::PROBING_IN_BATCH: {
         // Finish processing rows in the current probe batch.
         RETURN_IF_ERROR(ProcessProbeBatch(out_batch));
-        if (probe_batch_pos_ == probe_batch_->num_rows()
-            && current_probe_row_ == nullptr) {
+        if (probe_batch_pos_ == probe_batch_->num_rows()) {
           probe_state_ = ProbeState::PROBING_END_BATCH;
         }
         break;
@@ -290,89 +288,86 @@ string IcebergDeleteNode::NodeDebugString() const {
   return ss.str();
 }
 
-bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRow(
-    const RoaringBitmap64& deletes, RoaringBitmap64::BulkContext* context,
-    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
-  DCHECK(current_probe_row_ != nullptr);
-  uint64_t current_probe_pos = std::make_unsigned_t<int64_t>(
-      *current_probe_row_->GetTuple(0)->GetBigIntSlot(pos_offset_));
-  TupleRow* out_row = out_batch_iterator->Get();
-  if (!deletes.ContainsBulk(current_probe_pos, context)) {
-    out_batch_iterator->parent()->CopyRow(current_probe_row_, out_row);
-    matched_probe_ = true;
-    --(*remaining_capacity);
-    if (*remaining_capacity == 0) return false;
-    out_row = out_batch_iterator->Next();
-  }
-  return true;
+uint64_t IR_ALWAYS_INLINE IcebergDeleteNode::ProbeFilePosition(
+    const RowBatch::Iterator& probe_it) const {
+  DCHECK(!probe_it.AtEnd());
+  const TupleRow* current_probe_row = probe_it.Get();
+  return *current_probe_row->GetTuple(0)->GetBigIntSlot(pos_offset_);
 }
 
-bool IR_ALWAYS_INLINE IcebergDeleteNode::ProcessProbeRowNoCheck(
-    RowBatch::Iterator* out_batch_iterator, int* remaining_capacity) {
-  DCHECK(current_probe_row_ != nullptr);
-  TupleRow* out_row = out_batch_iterator->Get();
-  out_batch_iterator->parent()->CopyRow(current_probe_row_, out_row);
-  matched_probe_ = true;
-  --(*remaining_capacity);
-  if (*remaining_capacity == 0) return false;
-  out_row = out_batch_iterator->Next();
-  return true;
+int IR_ALWAYS_INLINE IcebergDeleteNode::CountRowsToCopy(const RoaringBitmap64* 
deletes,
+    int remaining_capacity, RoaringBitmap64::Iterator* deletes_it,
+    RowBatch::Iterator* probe_it) {
+  DCHECK(!probe_it->AtEnd());
+  int rows_to_copy = 0;
+  int rows_to_copy_max = std::min(probe_it->RemainingRows(), 
remaining_capacity);
+  DCHECK_GT(rows_to_copy_max, 0);
+  if (deletes == nullptr) {
+    probe_it->Advance(rows_to_copy_max);
+    return rows_to_copy_max;
+  }
+
+  uint64_t current_probe_pos = ProbeFilePosition(*probe_it);
+  probe_it->Next();
+  uint64_t next_deleted_pos = deletes_it->GetEqualOrLarger(current_probe_pos);
+  while (current_probe_pos < next_deleted_pos) {
+    ++rows_to_copy;
+    if (rows_to_copy == rows_to_copy_max) break;
+    current_probe_pos = ProbeFilePosition(*probe_it);
+    probe_it->Next();
+    // If the rows are filtered in the SCAN then the position values may 
increase
+    // by more than one, so let's adjust 'next_deleted_pos'.
+    if (current_probe_pos > next_deleted_pos) {
+      next_deleted_pos = deletes_it->GetEqualOrLarger(current_probe_pos);
+    }
+  }
+  return rows_to_copy;
 }
 
 int IcebergDeleteNode::ProcessProbeBatch(
     TPrefetchMode::type prefetch_mode, RowBatch* out_batch) {
   DCHECK(!out_batch->AtCapacity());
   DCHECK_GE(probe_batch_pos_, 0);
-  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->AddRow());
   const int max_rows = out_batch->capacity() - out_batch->num_rows();
-  // Note that 'probe_batch_pos_' is the row no. of the row after 
'current_probe_row_'.
   RowBatch::Iterator probe_batch_iterator(probe_batch_.get(), 
probe_batch_pos_);
+  RowBatch::Iterator out_batch_iterator(out_batch, out_batch->AddRow());
   if (probe_batch_iterator.AtEnd()) return 0;
 
   int remaining_capacity = max_rows;
 
-  impala::StringValue* file_path =
+  StringValue* file_path =
       
probe_batch_iterator.Get()->GetTuple(0)->GetStringSlot(file_path_offset_);
   const IcebergDeleteBuilder::DeleteRowHashTable& delete_rows_table =
       builder_->deleted_rows();
-  auto it = delete_rows_table.find(*file_path);
 
-  bool needs_check;
-  if (it == delete_rows_table.end()) {
-    needs_check = false;
-  } else {
-    needs_check = NeedToCheckBatch(it->second);
+  const RoaringBitmap64* deletes = nullptr;
+  RoaringBitmap64::Iterator deletes_it;
+  auto it = delete_rows_table.find(*file_path);
+  if (it != delete_rows_table.end() && NeedToCheckBatch(it->second)) {
+    deletes = &it->second;
+    deletes_it.Init(*deletes);
   }
 
-  if (!needs_check) {
-    while (!probe_batch_iterator.AtEnd() && remaining_capacity > 0) {
-      current_probe_row_ = probe_batch_iterator.Get();
-      if (!ProcessProbeRowNoCheck(&out_batch_iterator, &remaining_capacity)) {
-        DCHECK_EQ(remaining_capacity, 0);
-      }
-      probe_batch_iterator.Next();
-    }
-  } else {
-    const RoaringBitmap64& deletes = it->second;
-    RoaringBitmap64::BulkContext context;
-    while (!probe_batch_iterator.AtEnd() && remaining_capacity > 0) {
-      current_probe_row_ = probe_batch_iterator.Get();
-      if (!ProcessProbeRow(deletes, &context, &out_batch_iterator, 
&remaining_capacity)) {
-        DCHECK_EQ(remaining_capacity, 0);
-      }
-      probe_batch_iterator.Next();
-    }
+  while (!probe_batch_iterator.AtEnd() && remaining_capacity > 0) {
+    DCHECK_EQ(*file_path,
+        
*probe_batch_iterator.Get()->GetTuple(0)->GetStringSlot(file_path_offset_));
+    int probe_offset = probe_batch_iterator.RowNum();
+    int rows_to_copy = CountRowsToCopy(
+        deletes, remaining_capacity, &deletes_it,
+        &probe_batch_iterator);
+    if (rows_to_copy == 0) continue;
+
+    int dst_offset = out_batch_iterator.RowNum();
+    out_batch->CopyRows(
+        probe_batch_.get(), rows_to_copy, probe_offset, dst_offset);
+    out_batch_iterator.Advance(rows_to_copy);
+    remaining_capacity -= rows_to_copy;
   }
+
   // Update where we are in the probe batch.
-  probe_batch_pos_ = (probe_batch_iterator.Get() - probe_batch_->GetRow(0));
+  probe_batch_pos_ = probe_batch_iterator.RowNum();
   int num_rows_added = max_rows - remaining_capacity;
 
-  // Clear state as ascending order of row ids are not guaranteed between 
probe row
-  // batches
-  if (probe_batch_iterator.AtEnd()) {
-    current_probe_row_ = nullptr;
-  }
-
   DCHECK_GE(probe_batch_pos_, 0);
   DCHECK_LE(probe_batch_pos_, probe_batch_->capacity());
   DCHECK_LE(num_rows_added, max_rows);
diff --git a/be/src/exec/iceberg-delete-node.h 
b/be/src/exec/iceberg-delete-node.h
index 388596651..0355a7f3f 100644
--- a/be/src/exec/iceberg-delete-node.h
+++ b/be/src/exec/iceberg-delete-node.h
@@ -114,16 +114,6 @@ class IcebergDeleteNode : public BlockingJoinNode {
     EOS,
   };
 
-  /// Probes 'current_probe_row_' against the hash tables and append outputs
-  /// to output batch.
-  bool inline ProcessProbeRow(const RoaringBitmap64& deletes,
-      RoaringBitmap64::BulkContext* context, RowBatch::Iterator* 
out_batch_iterator,
-      int* remaining_capacity) WARN_UNUSED_RESULT;
-
-  /// Append outputs to output batch.
-  bool inline ProcessProbeRowNoCheck(RowBatch::Iterator* out_batch_iterator,
-      int* remaining_capacity) WARN_UNUSED_RESULT;
-
   /// Process probe rows from probe_batch_. Returns either if out_batch is 
full or
   /// probe_batch_ is entirely consumed.
   /// Returns the number of rows added to out_batch; -1 on error (and *status 
will
@@ -153,6 +143,12 @@ class IcebergDeleteNode : public BlockingJoinNode {
 
   bool NeedToCheckBatch(const RoaringBitmap64& deletes);
 
+  /// Returns number of rows required to copy from the probe batch to the 
output batch.
+  /// 'probe_iterator' is modified during the call.
+  int CountRowsToCopy(const RoaringBitmap64* deletes, int remaining_capacity,
+      RoaringBitmap64::Iterator* deletes_iterator,
+      RowBatch::Iterator* probe_iterator);
+
   /// Prepares for probing the next batch. Called after populating 
'probe_batch_'
   /// with rows and entering 'probe_state_' PROBING_IN_BATCH.
   inline void ResetForProbe() {
@@ -161,6 +157,8 @@ class IcebergDeleteNode : public BlockingJoinNode {
     matched_probe_ = true;
   }
 
+  uint64_t ProbeFilePosition(const RowBatch::Iterator& probe_it) const;
+
   std::string NodeDebugString() const;
 
   RuntimeState* runtime_state_;
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 76c580319..13c91d5e6 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -191,6 +191,11 @@ class RowBatch {
     /// Return the current row pointed to by the row pointer.
     TupleRow* IR_ALWAYS_INLINE Get() { return 
reinterpret_cast<TupleRow*>(row_); }
 
+    /// Return the current row pointed to by the row pointer.
+    const TupleRow* IR_ALWAYS_INLINE Get() const {
+      return reinterpret_cast<const TupleRow*>(row_);
+    }
+
     /// Increment the row pointer and return the next row.
     TupleRow* IR_ALWAYS_INLINE Next() {
       row_ += num_tuples_per_row_;
@@ -198,14 +203,22 @@ class RowBatch {
       return Get();
     }
 
+    void IR_ALWAYS_INLINE Advance(int n) {
+      row_ += n * num_tuples_per_row_;
+      DCHECK_LE((row_ - parent_->tuple_ptrs_) / num_tuples_per_row_, 
parent_->capacity_);
+    }
+
     /// Returns the index in the RowBatch of the current row. This does an 
integer
     /// division and so should not be used in hot inner loops.
     int RowNum() { return (row_ - parent_->tuple_ptrs_) / num_tuples_per_row_; 
}
 
+    /// Returns number of rows remaining.
+    int RemainingRows() { return (row_batch_end_ - row_) / 
num_tuples_per_row_; }
+
     /// Returns true if the iterator is beyond the last row for read iterators.
     /// Useful for read iterators to determine the limit. Write iterators 
should use
     /// RowBatch::AtCapacity() instead.
-    bool IR_ALWAYS_INLINE AtEnd() { return row_ >= row_batch_end_; }
+    bool IR_ALWAYS_INLINE AtEnd() const { return row_ >= row_batch_end_; }
 
     /// Returns the row batch which this iterator is iterating through.
     RowBatch* parent() { return parent_; }
diff --git a/be/src/util/roaring-bitmap-test.cc 
b/be/src/util/roaring-bitmap-test.cc
index 195b270c4..255338332 100644
--- a/be/src/util/roaring-bitmap-test.cc
+++ b/be/src/util/roaring-bitmap-test.cc
@@ -27,12 +27,20 @@ using namespace std;
 
 namespace impala {
 
-TEST(RoaringBitmap64Test, Basic) {
+TEST(RoaringBitmap64Test, Empty) {
   RoaringBitmap64 rbm;
   RoaringBitmap64::BulkContext context;
   ASSERT_TRUE(rbm.IsEmpty());
   ASSERT_EQ(rbm.Max(), numeric_limits<uint64_t>::min());
   ASSERT_EQ(rbm.Min(), numeric_limits<uint64_t>::max());
+  RoaringBitmap64::Iterator it(rbm);
+  ASSERT_EQ(it.GetEqualOrLarger(0), numeric_limits<uint64_t>::max());
+  ASSERT_EQ(it.GetEqualOrLarger(1000000), numeric_limits<uint64_t>::max());
+}
+
+TEST(RoaringBitmap64Test, Basic) {
+  RoaringBitmap64 rbm;
+  RoaringBitmap64::BulkContext context;
   rbm.AddElements({11, 29});
   ASSERT_FALSE(rbm.IsEmpty());
   for (int i = 0; i < 100; ++i) {
@@ -67,8 +75,115 @@ TEST(RoaringBitmap64Test, AddMany) {
       ASSERT_FALSE(rbm.ContainsBulk(i, &context));
     }
   }
+  RoaringBitmap64::Iterator it(rbm);
+  ASSERT_EQ(it.GetEqualOrLarger(0), 3);
+  ASSERT_EQ(it.GetEqualOrLarger(3), 3);
+  ASSERT_EQ(it.GetEqualOrLarger(13), 13);
+  // Querying the same element repeatedly shouldn't be an issue, as
+  // the iterator should remain intact.
+  ASSERT_EQ(it.GetEqualOrLarger(13), 13);
+  ASSERT_EQ(it.GetEqualOrLarger(13), 13);
+  ASSERT_EQ(it.GetEqualOrLarger(14), 15);
+  ASSERT_EQ(it.GetEqualOrLarger(14), 15);
+  ASSERT_EQ(it.GetEqualOrLarger(3000000), 3000000);
+  ASSERT_EQ(it.GetEqualOrLarger(3000001), 
std::numeric_limits<uint64_t>::max());
   ASSERT_EQ(rbm.Max(), 3000000);
   ASSERT_EQ(rbm.Min(), 3);
+
+  // Iterate with larger jumps
+  RoaringBitmap64::Iterator jump_it(rbm);
+  ASSERT_EQ(jump_it.GetEqualOrLarger(99), 100);
+  ASSERT_EQ(jump_it.GetEqualOrLarger(3000001), 
std::numeric_limits<uint64_t>::max());
+}
+
+TEST(RoaringBitmap64Test, DenseIterationOverDenseMap) {
+  constexpr int SIZE = 100000;
+  RoaringBitmap64 rbm;
+  vector<uint64_t> elements_to_add;
+  elements_to_add.reserve(SIZE);
+  for (int i = 0; i < SIZE; ++i) {
+    if (i % 4 != 0) {
+      elements_to_add.push_back(i);
+    }
+  }
+  rbm.AddElements(elements_to_add);
+  RoaringBitmap64::Iterator it(rbm);
+  for (int i = 0; i < SIZE; ++i) {
+    if (i % 4 != 0) {
+      ASSERT_EQ(it.GetEqualOrLarger(i), i);
+    } else {
+      ASSERT_EQ(it.GetEqualOrLarger(i), i+1);
+    }
+  }
+}
+
+TEST(RoaringBitmap64Test, SparseIterationOverDenseMap) {
+  constexpr int SIZE = 100000;
+  RoaringBitmap64 rbm;
+  vector<uint64_t> elements_to_add;
+  elements_to_add.reserve(SIZE);
+  for (int i = 0; i < SIZE; ++i) {
+    if (i % 4 != 0) {
+      elements_to_add.push_back(i);
+    }
+  }
+  rbm.AddElements(elements_to_add);
+  RoaringBitmap64::Iterator it(rbm);
+  for (int i = 0; i < SIZE; i += 51) {
+    if (i % 4 != 0) {
+      ASSERT_EQ(it.GetEqualOrLarger(i), i);
+    } else {
+      ASSERT_EQ(it.GetEqualOrLarger(i), i+1);
+    }
+  }
+}
+
+TEST(RoaringBitmap64Test, DenseIterationOverSparseMap) {
+  constexpr int SIZE = 1000000;
+  constexpr int DIV = 97;
+  RoaringBitmap64 rbm;
+  vector<uint64_t> elements_to_add;
+  elements_to_add.reserve(SIZE);
+  for (int i = 0; i < SIZE; i += DIV) {
+    elements_to_add.push_back(i);
+  }
+  rbm.AddElements(elements_to_add);
+  RoaringBitmap64::Iterator it(rbm);
+  for (int i = 0; i < SIZE; ++i) {
+    if (i % DIV == 0) {
+      ASSERT_EQ(it.GetEqualOrLarger(i), i);
+    } else if (i - (i % DIV) + DIV >= SIZE) {
+      // The next number divisible by DIV is not in the bitmap.
+      ASSERT_EQ(it.GetEqualOrLarger(i), std::numeric_limits<uint64_t>::max());
+    } else {
+      // This could be used in the first branch, but it's a bit clearer what 
we check.
+      ASSERT_EQ(it.GetEqualOrLarger(i), i - (i % DIV) + DIV);
+    }
+  }
+}
+
+TEST(RoaringBitmap64Test, SparseIterationOverSparseMap) {
+  constexpr int SIZE = 1000000;
+  constexpr int DIV = 83;
+  RoaringBitmap64 rbm;
+  vector<uint64_t> elements_to_add;
+  elements_to_add.reserve(SIZE);
+  for (int i = 0; i < SIZE; i += DIV) {
+    elements_to_add.push_back(i);
+  }
+  rbm.AddElements(elements_to_add);
+  RoaringBitmap64::Iterator it(rbm);
+  for (int i = 0; i < SIZE; i += 41) {
+    if (i % DIV == 0) {
+      ASSERT_EQ(it.GetEqualOrLarger(i), i);
+    } else if (i - (i % DIV) + DIV >= SIZE) {
+      // The next number divisible by DIV is not in the bitmap.
+      ASSERT_EQ(it.GetEqualOrLarger(i), std::numeric_limits<uint64_t>::max());
+    } else {
+      // This could be used in the first branch, but it's a bit clearer what 
we check.
+      ASSERT_EQ(it.GetEqualOrLarger(i), i - (i % DIV) + DIV);
+    }
+  }
 }
 
 } // namespace impala
diff --git a/be/src/util/roaring-bitmap.h b/be/src/util/roaring-bitmap.h
index 5eca10a39..fce4c482b 100644
--- a/be/src/util/roaring-bitmap.h
+++ b/be/src/util/roaring-bitmap.h
@@ -17,6 +17,9 @@
 
 #pragma once
 
+#include <limits>
+#include "common/logging.h"
+
 #include "thirdparty/roaring/roaring.h"
 
 namespace impala {
@@ -43,6 +46,111 @@ public:
       roaring64_bulk_context_t context_ = {};
   };
 
+  class Iterator {
+    public:
+      Iterator() {};
+
+      // No copy, no move, no cry.
+      Iterator(const Iterator&) = delete;
+      Iterator &operator=(const Iterator&) = delete;
+      Iterator(Iterator&& other) noexcept = delete;
+      Iterator &operator=(Iterator&& other) noexcept = delete;
+
+      Iterator(const RoaringBitmap64& roaring_bitmap) {
+        Init(roaring_bitmap);
+      }
+
+      void Init(const RoaringBitmap64& roaring_bitmap) {
+        DCHECK(it_ == nullptr);
+        it_ = roaring64_iterator_create(roaring_bitmap.rbitmap_);
+      }
+
+      // Returns true if the iterator currently points to a value in the 
bitmap.
+      // New iterators point to the first (minimum) element.
+      // It returns false when we iterated over all elements (or bitmap is 
empty).
+      bool HasValue() const {
+        return roaring64_iterator_has_value(it_);
+      }
+
+      uint64_t Value() const {
+        DCHECK(HasValue());
+        return roaring64_iterator_value(it_);
+      }
+
+      void Advance() {
+        DCHECK(HasValue());
+        roaring64_iterator_advance(it_);
+      }
+
+      // Sets iterator to the next element that is equal or larger to 'x', and
+      // returns the element. If no such element is found then return 
INT64_MAX.
+      // Incoming 'x' values must be in ascending order.
+      uint64_t GetEqualOrLarger(uint64_t x) {
+        // MOVE_THRESHOLD was chosen empirically. If 10% of the data is 
deleted and
+        // if the distance is 30 then AdvanceAndGetEqualOrLarger() should find 
the next
+        // element in ~3 iterations. The point of it is we shouldn't use
+        // AdvanceAndGetEqualOrLarger() when the distance is large and the 
bitmap is
+        // dense. The distance is typically large when we start processing a 
new
+        // probe batch (we are in the middle of a file, but we have a new 
iterator).
+        constexpr int MOVE_THRESHOLD = 30;
+        // We need to choose between MoveAndGetEqualOrLarger() and
+        // AdvanceAndGetEqualOrLarger(). MoveAndGetEqualOrLarger() is more 
efficient
+        // when the distance between 'x' and iterator's current element is 
large.
+        // AdvanceAndGetEqualOrLarger() is more efficient when the distance is 
small.
+        // There are 5 possible cases:
+        // - We start a new probe batch:
+        //   => For the first probe batch we likely choose 
AdvanceAndGetEqualOrLarger()
+        //      (unless it starts with a high file position), but we choose
+        //      MoveAndGetEqualOrLarger() for subsequent probe batches. Both 
should be
+        //      efficient.
+        // - Incoming x values are sparse, Bitmap is sparse:
+        //   => MoveAndGetEqualOrLarger() gets chosen most of the time. This 
sould give
+        //      us OK performance, also this method won't be invoked that much 
in this
+        //      case.
+        // - Incoming x values are dense, Bitmap is sparse:
+        //   => AdvanceAndGetEqualOrLarger() gets chosen most of the time 
which should
+        //      find the next element in 1-2 iterations (as input is dense, 
bitmap is
+        //      sparse). Also, this won't be invoked that much since the 
bitmap is sparse.
+        // - Incoming x values are sparse, Bitmap is dense
+        //   => MoveAndGetEqualOrLarger() gets chosen which is the efficient 
one for
+        //      this case.
+        // - Incoming x values are dense, Bitmap is dense
+        //   => AdvanceAndGetEqualOrLarger() gets chosen which is the optimal 
choice
+        //      for this case.
+        if (HasValue() && x - Value() > MOVE_THRESHOLD) {
+          return MoveAndGetEqualOrLarger(x);
+        } else {
+          return AdvanceAndGetEqualOrLarger(x);
+        }
+      }
+
+      ~Iterator() {
+        roaring64_iterator_free(it_);
+      }
+
+    private:
+      static constexpr uint64_t MAX_VALUE = 
std::numeric_limits<uint64_t>::max();
+
+      // Moves 'this' to an element that is equal or larger to 'x'.
+      uint64_t MoveAndGetEqualOrLarger(uint64_t x) {
+        if (roaring64_iterator_move_equalorlarger(it_, x)) {
+          return Value();
+        } else {
+          return MAX_VALUE;
+        }
+      }
+
+      // Advances 'this' until we find an element which is equal or larger to 
'x'.
+      uint64_t AdvanceAndGetEqualOrLarger(uint64_t x) {
+        while (HasValue() && Value() < x) {
+          Advance();
+        }
+        return HasValue() ? Value() : MAX_VALUE;
+      }
+
+      roaring64_iterator_t *it_ = nullptr;
+  };
+
   RoaringBitmap64() = default;
   ~RoaringBitmap64() { roaring64_bitmap_free(rbitmap_); }
 

Reply via email to