This is an automated email from the ASF dual-hosted git repository.

stigahuang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit a486305a922d672f77ff23b5f42e604a720597fd
Author: Csaba Ringhofer <[email protected]>
AuthorDate: Thu Jul 11 08:48:43 2024 +0200

    IMPALA-13209: Optimize ConvertRowBatchTime in ExchangeNode
    
    The patch optimizes the most common case when the src and dst
    RowBatches have the same number of tuples per row.
    
    ConvertRowBatchTime is decreased from >600ms to <100ms in a query
    with busy exchange node:
    set mt_dop=8;
    select straight_join count(*) from tpcds_parquet.store_sales s1
      join /*+broadcast*/ tpcds_parquet.store_sales16 s2
      on s1.ss_customer_sk = s2.ss_customer_sk;
    
    TPCDS-20 showed minor improvement (0.77%). The affect is likely to
    be larger if more nodes are involved.
    
    Testing:
    - passed core tests
    
    Change-Id: Iab94315364e8886da1ae01cf6af623812a2da9cb
    Reviewed-on: http://gerrit.cloudera.org:8080/21571
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/exchange-node.cc | 33 +++++++++++++++++----------------
 be/src/runtime/row-batch.cc  | 25 +++++++++++++++++++++++++
 be/src/runtime/row-batch.h   |  6 ++++++
 3 files changed, 48 insertions(+), 16 deletions(-)

diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index b605ba0a7..7faa02261 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -202,23 +202,24 @@ Status ExchangeNode::GetNext(RuntimeState* state, 
RowBatch* output_batch, bool*
       SCOPED_TIMER(convert_row_batch_timer_);
       RETURN_IF_CANCELLED(state);
       RETURN_IF_ERROR(QueryMaintenance(state));
-      // copy rows until we hit the limit/capacity or until we exhaust 
input_batch_
-      while (!ReachedLimit() && !output_batch->AtCapacity()
-          && input_batch_ != NULL && next_row_idx_ < input_batch_->num_rows()) 
{
-        TupleRow* src = input_batch_->GetRow(next_row_idx_);
-        ++next_row_idx_;
-        int j = output_batch->AddRow();
-        TupleRow* dest = output_batch->GetRow(j);
-        // if the input row is shorter than the output row, make sure not to 
leave
-        // uninitialized Tuple* around
-        output_batch->ClearRow(dest);
-        // this works as expected if rows from input_batch form a prefix of
-        // rows in output_batch
-        input_batch_->CopyRow(src, dest);
-        output_batch->CommitLastRow();
-        IncrementNumRowsReturned(1);
+      if (input_batch_ != nullptr) {
+        // copy rows until we hit the limit/capacity or until we exhaust 
input_batch_
+        int available_in_input = input_batch_->num_rows() - next_row_idx_;
+        int free_in_output = output_batch->capacity() - 
output_batch->num_rows();
+        int rows_to_copy = std::min(available_in_input, free_in_output);
+        if (limit_ != -1) {
+          rows_to_copy =
+              static_cast<int>(std::min<int64_t>(rows_to_copy, limit_ - 
rows_returned()));
+        }
+        if (rows_to_copy > 0) {
+          int dst_offset = output_batch->AddRows(rows_to_copy);
+          output_batch->CopyRows(input_batch_, rows_to_copy, next_row_idx_, 
dst_offset);
+          next_row_idx_ += rows_to_copy;
+          output_batch->CommitRows(rows_to_copy);
+          IncrementNumRowsReturned(rows_to_copy);
+          COUNTER_SET(rows_returned_counter_, rows_returned());
+        }
       }
-      COUNTER_SET(rows_returned_counter_, rows_returned());
 
       if (ReachedLimit()) {
         ReleaseRecvrResources(output_batch);
diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc
index a8b4a7730..848fcdca7 100644
--- a/be/src/runtime/row-batch.cc
+++ b/be/src/runtime/row-batch.cc
@@ -545,4 +545,29 @@ void RowBatch::VLogRows(const string& context) {
   }
 }
 
+void RowBatch::CopyRows(RowBatch* src, int num_rows, int src_offset, int 
dst_offset) {
+  DCHECK_GT(num_rows, 0);
+  DCHECK_GE(num_tuples_per_row_, src->num_tuples_per_row_);
+  DCHECK_GE(src_offset, 0);
+  DCHECK_GE(dst_offset, 0);
+  DCHECK_GE(capacity_, num_rows + dst_offset);
+  DCHECK_GE(src->num_rows_, num_rows + src_offset);
+  bool same_layout = num_tuples_per_row_ == src->num_tuples_per_row_;
+  if (same_layout) {
+    // Fast path, single copy.
+    TupleRow* dst_row = GetRow(dst_offset);
+    TupleRow* src_row = src->GetRow(src_offset);
+    memcpy(dst_row, src_row, num_rows * num_tuples_per_row_ * sizeof(Tuple*));
+    return;
+  }
+  // Slow path, null tuples and copy prefixes.
+  DCHECK_GT(num_tuples_per_row_, src->num_tuples_per_row_);
+  for (int i = 0; i < num_rows; i++) {
+    TupleRow* dst_row = GetRow(dst_offset + i);
+    TupleRow* src_row = src->GetRow(src_offset + i);
+    ClearRow(dst_row);
+    src->CopyRow(src_row, dst_row);
+  }
+}
+
 } // namespace impala
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 3626d7dce..76c580319 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -292,6 +292,12 @@ class RowBatch {
     memcpy(dest, src, num_tuples_per_row_ * sizeof(Tuple*));
   }
 
+  /// Copies tuple pointers from another row batch. It is allowed for 'src' to 
have
+  /// less tuples per row - in this case the prefix is copied and remaining 
tuples are
+  /// initialized to 0. The caller must ensure that src has enough rows / dst 
has enough
+  /// capacity.
+  void CopyRows(RowBatch* src, int num_rows, int src_offset, int dst_offset);
+
   /// Copy 'num_rows' rows from 'src' to 'dest' within the batch. Useful for 
exec
   /// nodes that skip an offset and copied more than necessary.
   void CopyRows(int64_t dest, int64_t src, int64_t num_rows) {

Reply via email to