This is an automated email from the ASF dual-hosted git repository. stigahuang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit a486305a922d672f77ff23b5f42e604a720597fd Author: Csaba Ringhofer <[email protected]> AuthorDate: Thu Jul 11 08:48:43 2024 +0200 IMPALA-13209: Optimize ConvertRowBatchTime in ExchangeNode The patch optimizes the most common case when the src and dst RowBatches have the same number of tuples per row. ConvertRowBatchTime is decreased from >600ms to <100ms in a query with busy exchange node: set mt_dop=8; select straight_join count(*) from tpcds_parquet.store_sales s1 join /*+broadcast*/ tpcds_parquet.store_sales16 s2 on s1.ss_customer_sk = s2.ss_customer_sk; TPCDS-20 showed minor improvement (0.77%). The affect is likely to be larger if more nodes are involved. Testing: - passed core tests Change-Id: Iab94315364e8886da1ae01cf6af623812a2da9cb Reviewed-on: http://gerrit.cloudera.org:8080/21571 Reviewed-by: Impala Public Jenkins <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/exchange-node.cc | 33 +++++++++++++++++---------------- be/src/runtime/row-batch.cc | 25 +++++++++++++++++++++++++ be/src/runtime/row-batch.h | 6 ++++++ 3 files changed, 48 insertions(+), 16 deletions(-) diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc index b605ba0a7..7faa02261 100644 --- a/be/src/exec/exchange-node.cc +++ b/be/src/exec/exchange-node.cc @@ -202,23 +202,24 @@ Status ExchangeNode::GetNext(RuntimeState* state, RowBatch* output_batch, bool* SCOPED_TIMER(convert_row_batch_timer_); RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); - // copy rows until we hit the limit/capacity or until we exhaust input_batch_ - while (!ReachedLimit() && !output_batch->AtCapacity() - && input_batch_ != NULL && next_row_idx_ < input_batch_->num_rows()) { - TupleRow* src = input_batch_->GetRow(next_row_idx_); - ++next_row_idx_; - int j = output_batch->AddRow(); - TupleRow* dest = output_batch->GetRow(j); - // if the input row is shorter than the output row, make sure not to leave - // uninitialized Tuple* around - output_batch->ClearRow(dest); - // this works as expected if rows from input_batch form a prefix of - // rows in output_batch - input_batch_->CopyRow(src, dest); - output_batch->CommitLastRow(); - IncrementNumRowsReturned(1); + if (input_batch_ != nullptr) { + // copy rows until we hit the limit/capacity or until we exhaust input_batch_ + int available_in_input = input_batch_->num_rows() - next_row_idx_; + int free_in_output = output_batch->capacity() - output_batch->num_rows(); + int rows_to_copy = std::min(available_in_input, free_in_output); + if (limit_ != -1) { + rows_to_copy = + static_cast<int>(std::min<int64_t>(rows_to_copy, limit_ - rows_returned())); + } + if (rows_to_copy > 0) { + int dst_offset = output_batch->AddRows(rows_to_copy); + output_batch->CopyRows(input_batch_, rows_to_copy, next_row_idx_, dst_offset); + next_row_idx_ += rows_to_copy; + output_batch->CommitRows(rows_to_copy); + IncrementNumRowsReturned(rows_to_copy); + COUNTER_SET(rows_returned_counter_, rows_returned()); + } } - COUNTER_SET(rows_returned_counter_, rows_returned()); if (ReachedLimit()) { ReleaseRecvrResources(output_batch); diff --git a/be/src/runtime/row-batch.cc b/be/src/runtime/row-batch.cc index a8b4a7730..848fcdca7 100644 --- a/be/src/runtime/row-batch.cc +++ b/be/src/runtime/row-batch.cc @@ -545,4 +545,29 @@ void RowBatch::VLogRows(const string& context) { } } +void RowBatch::CopyRows(RowBatch* src, int num_rows, int src_offset, int dst_offset) { + DCHECK_GT(num_rows, 0); + DCHECK_GE(num_tuples_per_row_, src->num_tuples_per_row_); + DCHECK_GE(src_offset, 0); + DCHECK_GE(dst_offset, 0); + DCHECK_GE(capacity_, num_rows + dst_offset); + DCHECK_GE(src->num_rows_, num_rows + src_offset); + bool same_layout = num_tuples_per_row_ == src->num_tuples_per_row_; + if (same_layout) { + // Fast path, single copy. + TupleRow* dst_row = GetRow(dst_offset); + TupleRow* src_row = src->GetRow(src_offset); + memcpy(dst_row, src_row, num_rows * num_tuples_per_row_ * sizeof(Tuple*)); + return; + } + // Slow path, null tuples and copy prefixes. + DCHECK_GT(num_tuples_per_row_, src->num_tuples_per_row_); + for (int i = 0; i < num_rows; i++) { + TupleRow* dst_row = GetRow(dst_offset + i); + TupleRow* src_row = src->GetRow(src_offset + i); + ClearRow(dst_row); + src->CopyRow(src_row, dst_row); + } +} + } // namespace impala diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h index 3626d7dce..76c580319 100644 --- a/be/src/runtime/row-batch.h +++ b/be/src/runtime/row-batch.h @@ -292,6 +292,12 @@ class RowBatch { memcpy(dest, src, num_tuples_per_row_ * sizeof(Tuple*)); } + /// Copies tuple pointers from another row batch. It is allowed for 'src' to have + /// less tuples per row - in this case the prefix is copied and remaining tuples are + /// initialized to 0. The caller must ensure that src has enough rows / dst has enough + /// capacity. + void CopyRows(RowBatch* src, int num_rows, int src_offset, int dst_offset); + /// Copy 'num_rows' rows from 'src' to 'dest' within the batch. Useful for exec /// nodes that skip an offset and copied more than necessary. void CopyRows(int64_t dest, int64_t src, int64_t num_rows) {
