This is an automated email from the ASF dual-hosted git repository.

joemcdonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


The following commit(s) were added to refs/heads/master by this push:
     new 88e0e4e8b IMPALA-13334: Fix test_sort.py DCHECK hit when 
max_sort_run_size>0
88e0e4e8b is described below

commit 88e0e4e8baa97f7fded12230b14232dc85cf6d79
Author: Noemi Pap-Takacs <[email protected]>
AuthorDate: Mon Sep 9 16:18:56 2024 +0200

    IMPALA-13334: Fix test_sort.py DCHECK hit when max_sort_run_size>0
    
    test_sort.py declared 'max_sort_run_size' query option, but it
    silently did not exercise it. Fixing the query option declaration
    in IMPALA-13349 using helper function add_exec_option_dimension()
    revealed a DCHECK failure in sorter.cc. In some cases the length
    of an in-memory run could exceed 'max_sort_run_size' by 1 page.
    
    This patch fixed the DCHECK failure by strictly enforcing the
    'max_sort_run_size' limit.
    Memory limits were also adjusted in test_sort.py according to
    the memory usage of the different sort run sizes.
    
    Additionally, the 'MAX_SORT_RUN_SIZE' query option's valid range was
    relaxed. Instead of throwing an error, negative values also disable
    the run size limitation, just as the default: '0'.
    
    Testing:
    - E2E tests in sort.py
    - set test
    
    Change-Id: I943d8edcc87df168448a174d6c9c6b46fe960eae
    Reviewed-on: http://gerrit.cloudera.org:8080/21777
    Reviewed-by: Impala Public Jenkins <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/runtime/sorter-internal.h                   |  6 +++
 be/src/runtime/sorter.cc                           | 28 ++++++++---
 be/src/service/query-options-test.cc               | 10 ++++
 be/src/service/query-options.cc                    |  6 +--
 common/thrift/ImpalaService.thrift                 | 14 +++---
 .../functional-query/queries/QueryTest/set.test    |  5 ++
 .../sort-reservation-usage-single-node.test        |  4 +-
 tests/query_test/test_sort.py                      | 56 ++++++++++++++--------
 8 files changed, 91 insertions(+), 38 deletions(-)

diff --git a/be/src/runtime/sorter-internal.h b/be/src/runtime/sorter-internal.h
index dba50f109..f746b5bf7 100644
--- a/be/src/runtime/sorter-internal.h
+++ b/be/src/runtime/sorter-internal.h
@@ -335,6 +335,12 @@ class Sorter::Run {
   bool ConvertOffsetsForCollectionChildren(const CollectionValue& cv,
       const SlotDescriptor& slot_desc) WARN_UNUSED_RESULT;
 
+  /// Only initial in-memory runs' size can be limited by the 
'MAX_SORT_RUN_SIZE' query
+  /// option. Returns true if the initial in-memory run reached its maximum 
capacity in
+  /// pages (fixed-len + var-len pages).
+  template <bool INITIAL_RUN>
+  bool IR_ALWAYS_INLINE MaxSortRunSizeReached();
+
   int NumOpenPages(const vector<Page>& pages);
 
   /// Close all open pages and clear vector.
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index 07aecc66a..dee4c749d 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -125,8 +125,8 @@ Sorter::Run::Run(Sorter* parent, TupleDescriptor* 
sort_tuple_desc, bool initial_
     max_num_of_pages_(initial_run ? parent->inmem_run_max_pages_ : 0) {}
 
 Status Sorter::Run::Init() {
-  int num_to_create = 1 + has_var_len_slots_ + (has_var_len_slots_ && 
initial_run_ &&
-      (sorter_->enable_spilling_ && max_num_of_pages_ == 0));
+  int num_to_create = 1 + has_var_len_slots_
+      + (has_var_len_slots_ && initial_run_ && sorter_->enable_spilling_);
   int64_t required_mem = num_to_create * sorter_->page_len_;
   if (!sorter_->buffer_pool_client_->IncreaseReservationToFit(required_mem)) {
     return Status(Substitute(
@@ -243,8 +243,14 @@ Status Sorter::Run::AddBatchInternal(
         DCHECK_GT(var_len_pages_.size(), 0);
         Page* cur_var_len_page = &var_len_pages_.back();
         if (cur_var_len_page->BytesRemaining() < total_var_len) {
-          bool added;
-          RETURN_IF_ERROR(TryAddPage(add_mode, &var_len_pages_, &added));
+          bool added = false;
+          if (MaxSortRunSizeReached<INITIAL_RUN>()) {
+            cur_fixed_len_page->FreeBytes(sort_tuple_size_);
+            *allocation_failed = false;
+            return Status::OK();
+          } else {
+            RETURN_IF_ERROR(TryAddPage(add_mode, &var_len_pages_, &added));
+          }
           if (added) {
             cur_var_len_page = &var_len_pages_.back();
           } else {
@@ -272,7 +278,7 @@ Status Sorter::Run::AddBatchInternal(
 
     // If there are still rows left to process, get a new page for the 
fixed-length
     // tuples. If the run is already too long, return.
-    if (INITIAL_RUN && max_num_of_pages_ > 0 && run_size() >= 
max_num_of_pages_){
+    if (MaxSortRunSizeReached<INITIAL_RUN>()) {
       *allocation_failed = false;
       return Status::OK();
     }
@@ -290,6 +296,13 @@ Status Sorter::Run::AddBatchInternal(
   return Status::OK();
 }
 
+template <bool INITIAL_RUN>
+bool Sorter::Run::MaxSortRunSizeReached() {
+  DCHECK_EQ(INITIAL_RUN, initial_run_);
+  DCHECK(!INITIAL_RUN || max_num_of_pages_ == 0 || run_size() <= 
max_num_of_pages_);
+  return (INITIAL_RUN && max_num_of_pages_ > 0 && run_size() == 
max_num_of_pages_);
+}
+
 bool IsValidStructInSortingTuple(const ColumnType& struct_type) {
   DCHECK(struct_type.IsStructType());
   for (const ColumnType& child_type : struct_type.children) {
@@ -1016,8 +1029,7 @@ Sorter::Sorter(const TupleRowComparatorConfig& 
tuple_row_comparator_config,
     in_mem_sort_timer_(nullptr),
     in_mem_merge_timer_(nullptr),
     sorted_data_size_(nullptr),
-    run_sizes_(nullptr),
-    inmem_run_max_pages_(state->query_options().max_sort_run_size) {
+    run_sizes_(nullptr) {
   switch (tuple_row_comparator_config.sorting_order_) {
     case TSortingOrder::LEXICAL:
       compare_less_than_.reset(
@@ -1029,6 +1041,8 @@ Sorter::Sorter(const TupleRowComparatorConfig& 
tuple_row_comparator_config,
     default:
       DCHECK(false);
   }
+  int max_sort_run_size = state->query_options().max_sort_run_size;
+  inmem_run_max_pages_ = max_sort_run_size >= 2 ? max_sort_run_size : 0;
   if (estimated_input_size > 0) ComputeSpillEstimate(estimated_input_size);
 }
 
diff --git a/be/src/service/query-options-test.cc 
b/be/src/service/query-options-test.cc
index ca2072a45..3be966ed8 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -446,6 +446,16 @@ TEST(QueryOptions, SetSpecialOptions) {
     TestError("-0.1");
     TestError("Not a number!");
   }
+  // MAX_SORT_RUN_SIZE should not be 1.
+  {
+    OptionDef<int32_t> key_def = MAKE_OPTIONDEF(max_sort_run_size);
+    auto TestOk = MakeTestOkFn(options, key_def);
+    auto TestError = MakeTestErrFn(options, key_def);
+    TestOk("-1", -1);
+    TestOk("0", 0);
+    TestError("1");
+    TestOk("2", 2);
+  }
 }
 
 void VerifyFilterTypes(const set<TRuntimeFilterType::type>& types,
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index beb104893..72b4f39c3 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1168,10 +1168,8 @@ Status impala::SetQueryOption(TImpalaQueryOptions::type 
option, const string& va
       }
       case TImpalaQueryOptions::MAX_SORT_RUN_SIZE: {
         int32_t int32_t_val = 0;
-        RETURN_IF_ERROR(QueryOptionParser::ParseAndCheckNonNegative<int32_t>(
-            option, value, &int32_t_val));
-        RETURN_IF_ERROR(
-            QueryOptionValidator<int32_t>::NotEquals(option, int32_t_val, 1));
+        RETURN_IF_ERROR(QueryOptionParser::Parse<int32_t>(option, value, 
&int32_t_val));
+        RETURN_IF_ERROR(QueryOptionValidator<int32_t>::NotEquals(option, 
int32_t_val, 1));
         query_options->__set_max_sort_run_size(int32_t_val);
         break;
       }
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 5c70d1019..ca7b7f25b 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -797,13 +797,13 @@ enum TImpalaQueryOptions {
   // Valid values are in [1, 128]. Default to 128.
   MAX_FRAGMENT_INSTANCES_PER_NODE = 156
 
-  // Configures the in-memory sort algorithm used in the sorter. Determines the
-  // maximum number of pages in an initial in-memory run (fixed + variable 
length).
-  // 0 means unlimited, which will create 1 big run with no in-memory merge 
phase.
-  // Setting any other other value can create multiple miniruns which leads to 
an
-  // in-memory merge phase. The minimum value in that case is 2.
-  // Generally, with larger workloads the recommended value is 10 or more to 
avoid
-  // high fragmentation of variable length data.
+  // Configures the in-memory sort algorithm used in the sorter. Determines 
the maximum
+  // number of pages in an initial in-memory run (fixed + variable length).
+  // Maximizing the sort run size can help mitigate back-pressure in the 
sorter. It
+  // creates multiple miniruns and merges them in-memory. The run size must be 
at least 2,
+  // but 10 or more are recommended to avoid high fragmentation of variable 
length data.
+  // Setting 0 or a negative value disables the run size limitation.
+  // Defaults to 0 (disabled).
   MAX_SORT_RUN_SIZE = 157;
 
   // Allowing implicit casts with loss of precision, adds the capability to use
diff --git a/testdata/workloads/functional-query/queries/QueryTest/set.test 
b/testdata/workloads/functional-query/queries/QueryTest/set.test
index b042bd937..68d6941af 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/set.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/set.test
@@ -305,3 +305,8 @@ set PARQUET_LATE_MATERIALIZATION_THRESHOLD=0;
 ---- CATCH
 Invalid value for query option PARQUET_LATE_MATERIALIZATION_THRESHOLD: Value 
can't be 0
 ====
+---- QUERY
+set MAX_SORT_RUN_SIZE=1;
+---- CATCH
+Invalid value for query option MAX_SORT_RUN_SIZE: Value can't be 1, actual 
value: 1
+====
\ No newline at end of file
diff --git 
a/testdata/workloads/tpch/queries/sort-reservation-usage-single-node.test 
b/testdata/workloads/tpch/queries/sort-reservation-usage-single-node.test
index 167278eaa..19100d7ee 100644
--- a/testdata/workloads/tpch/queries/sort-reservation-usage-single-node.test
+++ b/testdata/workloads/tpch/queries/sort-reservation-usage-single-node.test
@@ -5,8 +5,10 @@
 # does not give up memory to the second sort. Scans the text formatted file so 
that
 # the scan uses less reservation.
 # num_nodes = 1 is set for this file by the python test.
+set mt_dop=1;
+set max_scan_range_length=2MB;
 set scratch_limit=0;
-set buffer_pool_limit=35m;
+set buffer_pool_limit=$BUFFER_POOL_LIMIT;
 set default_spillable_buffer_size=64kb;
 SELECT *
 FROM   (SELECT
diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py
index 000329d1b..b6df5087c 100644
--- a/tests/query_test/test_sort.py
+++ b/tests/query_test/test_sort.py
@@ -22,13 +22,20 @@ from copy import copy
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.skip import SkipIfNotHdfsMinicluster
 from tests.common.test_dimensions import (
-    # TODO: uncomment once IMPALA-13334 resolved.
-    # add_exec_option_dimension,
+    add_exec_option_dimension,
     create_exec_option_dimension)
 
-# Run sizes (number of pages per run) in sorter
-# TODO: uncomment once IMPALA-13334 resolved.
-# MAX_SORT_RUN_SIZE = [0, 2, 20]
+"""Run sizes (number of pages per run) in sorter.
+Values:
+  0: there is no limit on the size of an in-memory run. The sorter will 
allocate memory
+    to fit the data until it encounters some memory limit.
+  2: an in-memory run can be at most 2 pages. This is the smallest possible 
size of a run:
+    at least 1 page for fix-len data and 1 page for var-len data.
+  20: an in-memory run can be at most 20 pages.
+Too small in-memory runs with var-len data can cause memory fragmentation, 
therefore
+different memory or spilling limits are needed to trigger the same scenarios 
in some test
+cases."""
+MAX_SORT_RUN_SIZE = [0, 2, 20]
 
 
 def split_result_rows(result):
@@ -65,8 +72,7 @@ class TestQueryFullSort(ImpalaTestSuite):
   def add_test_dimensions(cls):
     super(TestQueryFullSort, cls).add_test_dimensions()
     
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(cluster_sizes=[1]))
-    # TODO: uncomment once IMPALA-13334 resolved.
-    # add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
+    add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
 
     if cls.exploration_strategy() == 'core':
       cls.ImpalaTestMatrix.add_constraint(lambda v:
@@ -104,9 +110,11 @@ class TestQueryFullSort(ImpalaTestSuite):
        a query is, the more sort runs are likely to be produced and spilled.
        Case 1 : 0 SpilledRuns, because all rows fit within the maximum 
reservation.
                 sort_run_bytes_limit is not enforced.
-       Case 2 : 4 SpilledRuns, because sort node estimate that spill is 
inevitable.
+       Case 2 : 4 or 5 SpilledRuns, because sort node estimates that spill is 
inevitable.
                 So all runs are capped to 130m, including the first one."""
-    options = [('2g', '100m', '0'), ('400m', '130m', '4')]
+    # max_sort_run_size > 0 will spill more in Case 2.
+    options = [('2g', '100m', '0'),
+           ('400m', '130m', ('5' if exec_option['max_sort_run_size'] > 0 else 
'4'))]
     for (mem_limit, sort_run_bytes_limit, spilled_runs) in options:
       exec_option['mem_limit'] = mem_limit
       exec_option['sort_run_bytes_limit'] = sort_run_bytes_limit
@@ -156,7 +164,12 @@ class TestQueryFullSort(ImpalaTestSuite):
 
     exec_option = copy(vector.get_value('exec_option'))
     exec_option['disable_outermost_topn'] = 1
-    exec_option['mem_limit'] = "134m"
+    # With max_sort_run_size=2 (2 pages per run) the varlen data is more 
fragmented and
+    # requires a higher limit to maintain "TotalMergesPerformed: 1" assertion.
+    if exec_option['max_sort_run_size'] == 2:
+      exec_option['mem_limit'] = "144m"
+    else:
+      exec_option['mem_limit'] = "134m"
     table_format = vector.get_value('table_format')
 
     query_result = self.execute_query(query, exec_option, 
table_format=table_format)
@@ -228,8 +241,15 @@ class TestQueryFullSort(ImpalaTestSuite):
   @SkipIfNotHdfsMinicluster.tuned_for_minicluster
   def test_sort_reservation_usage(self, vector):
     """Tests for sorter reservation usage.
-    Run with num_nodes=1 to make execution more deterministic."""
-    self.run_test_case('sort-reservation-usage-single-node', vector)
+    If max_sort_run_size > 0, the larger the run size, the sooner the sorter 
can give up
+    memory to the next node."""
+    if vector.get_value('exec_option')['max_sort_run_size'] == 2:
+      # Increase buffer_limit to maintain such that query never spill.
+      buffer_pool_limit = '27m'
+    else:
+      buffer_pool_limit = '14m'
+    self.run_test_case('sort-reservation-usage-single-node', vector,
+                       test_file_vars={'$BUFFER_POOL_LIMIT': 
buffer_pool_limit})
 
 
 class TestRandomSort(ImpalaTestSuite):
@@ -240,8 +260,7 @@ class TestRandomSort(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestRandomSort, cls).add_test_dimensions()
-    # TODO: uncomment once IMPALA-13334 resolved.
-    # add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
+    add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
 
     if cls.exploration_strategy() == 'core':
       cls.ImpalaTestMatrix.add_constraint(lambda v:
@@ -296,8 +315,7 @@ class TestPartialSort(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestPartialSort, cls).add_test_dimensions()
-    # TODO: uncomment once IMPALA-13334 resolved.
-    # add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
+    add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
 
     if cls.exploration_strategy() == 'core':
       cls.ImpalaTestMatrix.add_constraint(lambda v:
@@ -354,8 +372,7 @@ class TestArraySort(ImpalaTestSuite):
   def add_test_dimensions(cls):
     super(TestArraySort, cls).add_test_dimensions()
     
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(cluster_sizes=[1]))
-    # TODO: uncomment once IMPALA-13334 resolved.
-    # add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
+    add_exec_option_dimension(cls, 'max_sort_run_size', MAX_SORT_RUN_SIZE)
 
     # The table we use is a parquet table.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
@@ -384,7 +401,8 @@ class TestArraySort(ImpalaTestSuite):
     table_format = vector.get_value('table_format')
 
     query_result = self.execute_query(query, exec_option, 
table_format=table_format)
-    assert "SpilledRuns: 3" in query_result.runtime_profile
+    # Check that spilling was successful.
+    assert re.search(r'\s+\- SpilledRuns: [1-9]', query_result.runtime_profile)
 
     # Split result rows (strings) into columns.
     result = split_result_rows(query_result.data)

Reply via email to