This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 4be5fd8896dcd445a6379bdcda4bdcf318f24511
Author: Yida Wu <[email protected]>
AuthorDate: Fri Mar 29 08:41:23 2024 -0700

    IMPALA-12960: Fix Incorrect RowsPassedThrough Metric in Streaming 
Aggregation
    
    This patch fixes a bug in the RowsPassedThrough metric within the
    query profile while using Streaming Aggregation. The issue is from
    the AddBatchStreaming() function's logic, where the number of rows
    in the output batch isn't necessarily initialized to 0, while the
    function uses num_rows() of the output batch directly to be the
    actual number of rows returned and passed through of this specific
    aggregator. This discrepancy can significantly impact the accuracy
    of the returned and passed through numbers, as well as the
    calculation of reduction rates during hash table expansion in
    Streaming Aggregation. Huge differences can be observed especially
    when using the rollup function.
    
    The solution is to calculate the actual number of rows added
    to the output batch within each round of the AddBatchStreaming()
    function.
    
    Tests:
    Passed exhaustive tests.
    Added a corresponding case in tpch-passthrough-aggregations.test.
    
    Change-Id: I59205a4b06824ee1607a25e906db1f96dc4eda9f
    Reviewed-on: http://gerrit.cloudera.org:8080/21235
    Reviewed-by: Wenzhe Zhou <[email protected]>
    Reviewed-by: Riza Suminto <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/grouping-aggregator.cc                 |  5 +++--
 .../queries/tpch-passthrough-aggregations.test     | 24 ++++++++++++++++++++++
 2 files changed, 27 insertions(+), 2 deletions(-)

diff --git a/be/src/exec/grouping-aggregator.cc 
b/be/src/exec/grouping-aggregator.cc
index 074167dc0..61ef2d948 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -505,6 +505,7 @@ Status GroupingAggregator::AddBatchStreaming(
   }
 
   TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
+  int64_t num_row_out_batch_old = out_batch->num_rows();
   GroupingAggregatorConfig::AddBatchStreamingImplFn fn
       = add_batch_streaming_impl_fn_.load();
   if (fn != nullptr) {
@@ -515,8 +516,8 @@ Status GroupingAggregator::AddBatchStreaming(
         child_batch, out_batch, ht_ctx_.get(), remaining_capacity));
   }
   *eos = (streaming_idx_ == 0);
-
-  num_rows_returned_ += out_batch->num_rows();
+  DCHECK_GE(out_batch->num_rows(), num_row_out_batch_old);
+  num_rows_returned_ += out_batch->num_rows() - num_row_out_batch_old;
   COUNTER_SET(num_passthrough_rows_, num_rows_returned_);
   return Status::OK();
 }
diff --git a/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test 
b/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test
index 5b9b1c5c9..8c25faf47 100644
--- a/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test
+++ b/testdata/workloads/tpch/queries/tpch-passthrough-aggregations.test
@@ -50,6 +50,30 @@ bigint,bigint,bigint,string
 row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\)
 ====
 ---- QUERY
+# IMPALA-12960: Test the case of rollup.
+select l_orderkey, l_partkey, count(*)
+from tpch.lineitem
+group by rollup(1, 2)
+order by 3 desc, 1, 2
+limit 10;
+---- RESULTS
+NULL,NULL,6001215
+7,NULL,7
+68,NULL,7
+129,NULL,7
+164,NULL,7
+194,NULL,7
+225,NULL,7
+226,NULL,7
+322,NULL,7
+326,NULL,7
+---- TYPES
+bigint,bigint,bigint
+---- RUNTIME_PROFILE
+# Verify that at least one passthrough number should be 0 for rollup.
+row_regex: .*RowsPassedThrough: 0
+====
+---- QUERY
 # Test for preaggregation passthrough optimization: two-phase count distinct 
aggregation.
 select count(distinct p_comment)
 from part

Reply via email to