This is an automated email from the ASF dual-hosted git repository.

michaelsmith pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit fd0d88d8dcd0beca668af2c89c77ab1a30db79d0
Author: Riza Suminto <[email protected]>
AuthorDate: Wed Jan 5 19:30:40 2022 -0800

    IMPALA-11068: Add query option to reduce scanner thread launch.
    
    Under heavy decompression workload, Impala running with scanner thread
    parallelism (MT_DOP=0) can still hit OOM error due to launching too many
    threads too soon. We have logic in ScannerMemLimiter to limit the number
    of scanner threads by calculating the thread's memory requirement and
    estimating the memory growth rate of all threads. However, it does not
    prevent a scanner node from quickly launching many threads and
    immediately reaching the memtracker's spare capacity. Even after
    ScannerMemLimiter rejects a new thread launch, some existing threads
    might continue increasing their non-reserved memory for decompression
    work until the memory limit exceeded.
    
    IMPALA-7096 adds hdfs_scanner_thread_max_estimated_bytes flag as a
    heuristic to count for non-reserved memory growth. Increasing this flag
    value can help reduce thread count, but might severely regress other
    queries that do not have heavy decompression characteristics. Similarly
    with lowering the NUM_SCANNER_THREADS query option.
    
    This patch adds one more query option as an alternative to mitigate OOM
    called HDFS_SCANNER_NON_RESERVED_BYTES. This option is intended to offer
    the same control as hdfs_scanner_thread_max_estimated_bytes, but as a
    query option such that tuning can be done at per query granularity. If
    this query option not set, set to 0, or negative value, backend will
    revert to use the value of hdfs_scanner_thread_max_estimated_bytes flag.
    
    Testing:
    - Add test case in query-options-test.cc and
      TestScanMemLimit::test_hdfs_scanner_thread_mem_scaling.
    
    Change-Id: I03cadf1230eed00d69f2890c82476c6861e37466
    Reviewed-on: http://gerrit.cloudera.org:8080/18126
    Reviewed-by: Csaba Ringhofer <[email protected]>
    Tested-by: Impala Public Jenkins <[email protected]>
---
 be/src/exec/hdfs-scan-node.cc                      | 13 ++++--
 be/src/exec/hdfs-scan-node.h                       |  2 +-
 be/src/service/query-options-test.cc               |  1 +
 be/src/service/query-options.cc                    |  7 +++
 be/src/service/query-options.h                     |  7 ++-
 common/thrift/ImpalaService.thrift                 |  9 ++++
 common/thrift/Query.thrift                         |  3 ++
 .../QueryTest/hdfs-scanner-thread-mem-scaling.test | 53 ++++++++++++++++++++++
 8 files changed, 89 insertions(+), 6 deletions(-)

diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 055d1080a..0c23ecbe4 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -150,7 +150,7 @@ Status HdfsScanNode::GetNextInternal(
 Status HdfsScanNode::Prepare(RuntimeState* state) {
   SCOPED_TIMER(runtime_profile_->total_time_counter());
   RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state));
-  thread_state_.Prepare(this, EstimateScannerThreadMemConsumption());
+  thread_state_.Prepare(this, EstimateScannerThreadMemConsumption(state));
   scanner_thread_reservations_denied_counter_ =
       ADD_COUNTER(runtime_profile(), "NumScannerThreadReservationsDenied", 
TUnit::UNIT);
   scanner_thread_workless_loops_counter_ =
@@ -210,17 +210,24 @@ Status HdfsScanNode::AddDiskIoRanges(const 
vector<ScanRange*>& ranges,
   return Status::OK();
 }
 
-int64_t HdfsScanNode::EstimateScannerThreadMemConsumption() const {
+int64_t HdfsScanNode::EstimateScannerThreadMemConsumption(RuntimeState* state) 
const {
   // Start with the minimum I/O buffer requirement.
   int64_t est_total_bytes = resource_profile_.min_reservation;
 
   // Next add in the other memory that we estimate the scanner thread will use,
   // e.g. decompression buffers, tuple buffers, etc.
   // For compressed text, we estimate this based on the file size (since the 
whole file
-  // will need to be decompressed at once). For all other formats, we use a 
constant.
+  // will need to be decompressed at once). For all other formats, we use a 
constant from
+  // either of the HDFS_SCANNER_NON_RESERVED_BYTES query option or the
+  // hdfs_scanner_thread_max_estimated_bytes flag, with the query option taking
+  // precedence over the flag if the query option is set to a positive value.
   // Note: this is crude and we could try to refine it by factoring in the 
number of
   // columns, etc, but it is unclear how beneficial this would be.
   int64_t est_non_reserved_bytes = 
FLAGS_hdfs_scanner_thread_max_estimated_bytes;
+  if (state->query_options().__isset.hdfs_scanner_non_reserved_bytes
+      && state->query_options().hdfs_scanner_non_reserved_bytes > 0) {
+    est_non_reserved_bytes = 
state->query_options().hdfs_scanner_non_reserved_bytes;
+  }
   auto it = shared_state_->per_type_files().find(THdfsFileFormat::TEXT);
   if (it != shared_state_->per_type_files().end()) {
     for (HdfsFileDesc* file : it->second) {
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index db6a8f663..d35c09407 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -155,7 +155,7 @@ class HdfsScanNode : public HdfsScanNodeBase {
 
   /// Compute the estimated memory consumption of a scanner thread in bytes 
for the
   /// purposes of deciding whether to start a new scanner thread.
-  int64_t EstimateScannerThreadMemConsumption() const;
+  int64_t EstimateScannerThreadMemConsumption(RuntimeState* state) const;
 
   /// Tries to spin up as many scanner threads as the quota allows. Called 
explicitly
   /// (e.g., when adding new ranges) or when threads are available for this 
scan node.
diff --git a/be/src/service/query-options-test.cc 
b/be/src/service/query-options-test.cc
index e1065789e..0e57c33e0 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -161,6 +161,7 @@ TEST(QueryOptions, SetByteOptions) {
       {MAKE_OPTIONDEF(max_spilled_result_spooling_mem), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(large_agg_mem_threshold), {-1, I64_MAX}},
       {MAKE_OPTIONDEF(mem_limit_coordinators), {-1, I64_MAX}},
+      {MAKE_OPTIONDEF(hdfs_scanner_non_reserved_bytes), {-1, I64_MAX}},
   };
   vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{
       {MAKE_OPTIONDEF(runtime_filter_min_size),
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 901acddbb..9f54083a9 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -1144,11 +1144,18 @@ Status impala::SetQueryOption(const string& key, const 
string& value,
         MemSpec mem_spec_val{};
         RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, 
&mem_spec_val));
         query_options->__set_mem_limit_coordinators(mem_spec_val.value);
+        break;
       }
       case TImpalaQueryOptions::ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING: {
         
query_options->__set_iceberg_predicate_pushdown_subsetting(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::HDFS_SCANNER_NON_RESERVED_BYTES: {
+        MemSpec mem_spec_val{};
+        RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, 
&mem_spec_val));
+        
query_options->__set_hdfs_scanner_non_reserved_bytes(mem_spec_val.value);
+        break;
+      };
       default:
         if (IsRemovedQueryOption(key)) {
           LOG(WARNING) << "Ignoring attempt to set removed query option '" << 
key << "'";
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index 654d5199e..5ac047b63 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -50,7 +50,7 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE                                                       
          \
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),                       
          \
-      TImpalaQueryOptions::ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING + 1);         
          \
+      TImpalaQueryOptions::HDFS_SCANNER_NON_RESERVED_BYTES + 1);               
          \
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, 
ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)     
          \
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)   
          \
@@ -307,7 +307,10 @@ typedef std::unordered_map<string, 
beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(mem_limit_coordinators, MEM_LIMIT_COORDINATORS,                 
          \
       TQueryOptionLevel::ADVANCED)                                             
          \
   QUERY_OPT_FN(iceberg_predicate_pushdown_subsetting,                          
          \
-      ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING, TQueryOptionLevel::DEVELOPMENT);
+      ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING, TQueryOptionLevel::DEVELOPMENT)   
          \
+  QUERY_OPT_FN(hdfs_scanner_non_reserved_bytes, 
HDFS_SCANNER_NON_RESERVED_BYTES,         \
+      TQueryOptionLevel::ADVANCED)                                             
          \
+  ;
 
 /// Enforce practical limits on some query options to avoid undesired query 
state.
 static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB
diff --git a/common/thrift/ImpalaService.thrift 
b/common/thrift/ImpalaService.thrift
index 660501513..6323d2510 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -845,6 +845,15 @@ enum TImpalaQueryOptions {
   // Enables predicate subsetting for Iceberg plan nodes. If enabled, 
expressions
   // evaluated by Iceberg are not pushed down the scanner node.
   ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING = 165;
+
+  // Amount of memory that we approximate a scanner thread will use not 
including I/O
+  // buffers. The memory used does not vary considerably between file formats 
(just a
+  // couple of MBs). This amount of memory is not reserved by the planner and 
only
+  // considered in the old multi-threaded scanner mode (non-MT_DOP) for 2nd and
+  // subsequent additional scanner threads. If this option is not set to a 
positive
+  // value, the value of flag hdfs_scanner_thread_max_estimated_bytes will be 
used
+  // (which defaults to 32MB). The default value of this option is -1 (not 
set).
+  HDFS_SCANNER_NON_RESERVED_BYTES = 166
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift
index edd827765..bc51123ab 100644
--- a/common/thrift/Query.thrift
+++ b/common/thrift/Query.thrift
@@ -663,6 +663,9 @@ struct TQueryOptions {
 
   // See comment in ImpalaService.thrift
   166: optional bool iceberg_predicate_pushdown_subsetting = true;
+
+  // See comment in ImpalaService.thrift
+  167: optional i64 hdfs_scanner_non_reserved_bytes = -1
 }
 
 // Impala currently has three types of sessions: Beeswax, HiveServer2 and 
external
diff --git 
a/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test
 
b/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test
index 72ec20473..b55d848b2 100644
--- 
a/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test
+++ 
b/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test
@@ -33,3 +33,56 @@ BIGINT,STRING
 ---- RUNTIME_PROFILE
 aggregation(SUM, NumScannerThreadsStarted): 1
 ====
+---- QUERY
+# IMPALA-11068: without tuning hdfs_scanner_non_reserved_bytes, this query can 
launch
+# up to 3 threads (compressed_text_est_bytes ~ 211MB).
+set num_nodes=1;
+set mem_limit=750m;
+select l_orderkey, l_shipmode from tpch_text_gzip.lineitem
+where l_comment = 'telets. quickly ';
+---- TYPES
+BIGINT,STRING
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+49824,'RAIL'
+1380737,'AIR'
+2981252,'TRUCK'
+3415170,'MAIL'
+---- RUNTIME_PROFILE
+aggregation(SUM, NumScannerThreadsStarted): 3
+====
+---- QUERY
+# IMPALA-11068: raising hdfs_scanner_non_reserved_bytes above 
compressed_text_est_bytes
+# will reduce NumScannerThreadsStarted.
+set num_nodes=1;
+set mem_limit=750m;
+set hdfs_scanner_non_reserved_bytes=320m;
+select l_orderkey, l_shipmode from tpch_text_gzip.lineitem
+where l_comment = 'telets. quickly ';
+---- TYPES
+BIGINT,STRING
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+49824,'RAIL'
+1380737,'AIR'
+2981252,'TRUCK'
+3415170,'MAIL'
+---- RUNTIME_PROFILE
+aggregation(SUM, NumScannerThreadsStarted): 2
+====
+---- QUERY
+# IMPALA-11068: high hdfs_scanner_non_reserved_bytes does not impact the first 
scanner
+# thread.
+set num_nodes=1;
+set mem_limit=750m;
+set hdfs_scanner_non_reserved_bytes=2g;
+select l_orderkey, l_shipmode from tpch_text_gzip.lineitem
+where l_comment = 'telets. quickly ';
+---- TYPES
+BIGINT,STRING
+---- RESULTS: VERIFY_IS_EQUAL_SORTED
+49824,'RAIL'
+1380737,'AIR'
+2981252,'TRUCK'
+3415170,'MAIL'
+---- RUNTIME_PROFILE
+aggregation(SUM, NumScannerThreadsStarted): 1
+====

Reply via email to