This is an automated email from the ASF dual-hosted git repository. michaelsmith pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/impala.git
commit fd0d88d8dcd0beca668af2c89c77ab1a30db79d0 Author: Riza Suminto <[email protected]> AuthorDate: Wed Jan 5 19:30:40 2022 -0800 IMPALA-11068: Add query option to reduce scanner thread launch. Under heavy decompression workload, Impala running with scanner thread parallelism (MT_DOP=0) can still hit OOM error due to launching too many threads too soon. We have logic in ScannerMemLimiter to limit the number of scanner threads by calculating the thread's memory requirement and estimating the memory growth rate of all threads. However, it does not prevent a scanner node from quickly launching many threads and immediately reaching the memtracker's spare capacity. Even after ScannerMemLimiter rejects a new thread launch, some existing threads might continue increasing their non-reserved memory for decompression work until the memory limit exceeded. IMPALA-7096 adds hdfs_scanner_thread_max_estimated_bytes flag as a heuristic to count for non-reserved memory growth. Increasing this flag value can help reduce thread count, but might severely regress other queries that do not have heavy decompression characteristics. Similarly with lowering the NUM_SCANNER_THREADS query option. This patch adds one more query option as an alternative to mitigate OOM called HDFS_SCANNER_NON_RESERVED_BYTES. This option is intended to offer the same control as hdfs_scanner_thread_max_estimated_bytes, but as a query option such that tuning can be done at per query granularity. If this query option not set, set to 0, or negative value, backend will revert to use the value of hdfs_scanner_thread_max_estimated_bytes flag. Testing: - Add test case in query-options-test.cc and TestScanMemLimit::test_hdfs_scanner_thread_mem_scaling. Change-Id: I03cadf1230eed00d69f2890c82476c6861e37466 Reviewed-on: http://gerrit.cloudera.org:8080/18126 Reviewed-by: Csaba Ringhofer <[email protected]> Tested-by: Impala Public Jenkins <[email protected]> --- be/src/exec/hdfs-scan-node.cc | 13 ++++-- be/src/exec/hdfs-scan-node.h | 2 +- be/src/service/query-options-test.cc | 1 + be/src/service/query-options.cc | 7 +++ be/src/service/query-options.h | 7 ++- common/thrift/ImpalaService.thrift | 9 ++++ common/thrift/Query.thrift | 3 ++ .../QueryTest/hdfs-scanner-thread-mem-scaling.test | 53 ++++++++++++++++++++++ 8 files changed, 89 insertions(+), 6 deletions(-) diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc index 055d1080a..0c23ecbe4 100644 --- a/be/src/exec/hdfs-scan-node.cc +++ b/be/src/exec/hdfs-scan-node.cc @@ -150,7 +150,7 @@ Status HdfsScanNode::GetNextInternal( Status HdfsScanNode::Prepare(RuntimeState* state) { SCOPED_TIMER(runtime_profile_->total_time_counter()); RETURN_IF_ERROR(HdfsScanNodeBase::Prepare(state)); - thread_state_.Prepare(this, EstimateScannerThreadMemConsumption()); + thread_state_.Prepare(this, EstimateScannerThreadMemConsumption(state)); scanner_thread_reservations_denied_counter_ = ADD_COUNTER(runtime_profile(), "NumScannerThreadReservationsDenied", TUnit::UNIT); scanner_thread_workless_loops_counter_ = @@ -210,17 +210,24 @@ Status HdfsScanNode::AddDiskIoRanges(const vector<ScanRange*>& ranges, return Status::OK(); } -int64_t HdfsScanNode::EstimateScannerThreadMemConsumption() const { +int64_t HdfsScanNode::EstimateScannerThreadMemConsumption(RuntimeState* state) const { // Start with the minimum I/O buffer requirement. int64_t est_total_bytes = resource_profile_.min_reservation; // Next add in the other memory that we estimate the scanner thread will use, // e.g. decompression buffers, tuple buffers, etc. // For compressed text, we estimate this based on the file size (since the whole file - // will need to be decompressed at once). For all other formats, we use a constant. + // will need to be decompressed at once). For all other formats, we use a constant from + // either of the HDFS_SCANNER_NON_RESERVED_BYTES query option or the + // hdfs_scanner_thread_max_estimated_bytes flag, with the query option taking + // precedence over the flag if the query option is set to a positive value. // Note: this is crude and we could try to refine it by factoring in the number of // columns, etc, but it is unclear how beneficial this would be. int64_t est_non_reserved_bytes = FLAGS_hdfs_scanner_thread_max_estimated_bytes; + if (state->query_options().__isset.hdfs_scanner_non_reserved_bytes + && state->query_options().hdfs_scanner_non_reserved_bytes > 0) { + est_non_reserved_bytes = state->query_options().hdfs_scanner_non_reserved_bytes; + } auto it = shared_state_->per_type_files().find(THdfsFileFormat::TEXT); if (it != shared_state_->per_type_files().end()) { for (HdfsFileDesc* file : it->second) { diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h index db6a8f663..d35c09407 100644 --- a/be/src/exec/hdfs-scan-node.h +++ b/be/src/exec/hdfs-scan-node.h @@ -155,7 +155,7 @@ class HdfsScanNode : public HdfsScanNodeBase { /// Compute the estimated memory consumption of a scanner thread in bytes for the /// purposes of deciding whether to start a new scanner thread. - int64_t EstimateScannerThreadMemConsumption() const; + int64_t EstimateScannerThreadMemConsumption(RuntimeState* state) const; /// Tries to spin up as many scanner threads as the quota allows. Called explicitly /// (e.g., when adding new ranges) or when threads are available for this scan node. diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc index e1065789e..0e57c33e0 100644 --- a/be/src/service/query-options-test.cc +++ b/be/src/service/query-options-test.cc @@ -161,6 +161,7 @@ TEST(QueryOptions, SetByteOptions) { {MAKE_OPTIONDEF(max_spilled_result_spooling_mem), {-1, I64_MAX}}, {MAKE_OPTIONDEF(large_agg_mem_threshold), {-1, I64_MAX}}, {MAKE_OPTIONDEF(mem_limit_coordinators), {-1, I64_MAX}}, + {MAKE_OPTIONDEF(hdfs_scanner_non_reserved_bytes), {-1, I64_MAX}}, }; vector<pair<OptionDef<int32_t>, Range<int32_t>>> case_set_i32{ {MAKE_OPTIONDEF(runtime_filter_min_size), diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc index 901acddbb..9f54083a9 100644 --- a/be/src/service/query-options.cc +++ b/be/src/service/query-options.cc @@ -1144,11 +1144,18 @@ Status impala::SetQueryOption(const string& key, const string& value, MemSpec mem_spec_val{}; RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val)); query_options->__set_mem_limit_coordinators(mem_spec_val.value); + break; } case TImpalaQueryOptions::ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING: { query_options->__set_iceberg_predicate_pushdown_subsetting(IsTrue(value)); break; } + case TImpalaQueryOptions::HDFS_SCANNER_NON_RESERVED_BYTES: { + MemSpec mem_spec_val{}; + RETURN_IF_ERROR(QueryOptionParser::Parse<MemSpec>(option, value, &mem_spec_val)); + query_options->__set_hdfs_scanner_non_reserved_bytes(mem_spec_val.value); + break; + }; default: if (IsRemovedQueryOption(key)) { LOG(WARNING) << "Ignoring attempt to set removed query option '" << key << "'"; diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h index 654d5199e..5ac047b63 100644 --- a/be/src/service/query-options.h +++ b/be/src/service/query-options.h @@ -50,7 +50,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> // time we add or remove a query option to/from the enum TImpalaQueryOptions. #define QUERY_OPTS_TABLE \ DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(), \ - TImpalaQueryOptions::ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING + 1); \ + TImpalaQueryOptions::HDFS_SCANNER_NON_RESERVED_BYTES + 1); \ REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED) \ QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR) \ REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS) \ @@ -307,7 +307,10 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type> QUERY_OPT_FN(mem_limit_coordinators, MEM_LIMIT_COORDINATORS, \ TQueryOptionLevel::ADVANCED) \ QUERY_OPT_FN(iceberg_predicate_pushdown_subsetting, \ - ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING, TQueryOptionLevel::DEVELOPMENT); + ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING, TQueryOptionLevel::DEVELOPMENT) \ + QUERY_OPT_FN(hdfs_scanner_non_reserved_bytes, HDFS_SCANNER_NON_RESERVED_BYTES, \ + TQueryOptionLevel::ADVANCED) \ + ; /// Enforce practical limits on some query options to avoid undesired query state. static const int64_t SPILLABLE_BUFFER_LIMIT = 1LL << 40; // 1 TB diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 660501513..6323d2510 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -845,6 +845,15 @@ enum TImpalaQueryOptions { // Enables predicate subsetting for Iceberg plan nodes. If enabled, expressions // evaluated by Iceberg are not pushed down the scanner node. ICEBERG_PREDICATE_PUSHDOWN_SUBSETTING = 165; + + // Amount of memory that we approximate a scanner thread will use not including I/O + // buffers. The memory used does not vary considerably between file formats (just a + // couple of MBs). This amount of memory is not reserved by the planner and only + // considered in the old multi-threaded scanner mode (non-MT_DOP) for 2nd and + // subsequent additional scanner threads. If this option is not set to a positive + // value, the value of flag hdfs_scanner_thread_max_estimated_bytes will be used + // (which defaults to 32MB). The default value of this option is -1 (not set). + HDFS_SCANNER_NON_RESERVED_BYTES = 166 } // The summary of a DML statement. diff --git a/common/thrift/Query.thrift b/common/thrift/Query.thrift index edd827765..bc51123ab 100644 --- a/common/thrift/Query.thrift +++ b/common/thrift/Query.thrift @@ -663,6 +663,9 @@ struct TQueryOptions { // See comment in ImpalaService.thrift 166: optional bool iceberg_predicate_pushdown_subsetting = true; + + // See comment in ImpalaService.thrift + 167: optional i64 hdfs_scanner_non_reserved_bytes = -1 } // Impala currently has three types of sessions: Beeswax, HiveServer2 and external diff --git a/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test b/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test index 72ec20473..b55d848b2 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test +++ b/testdata/workloads/functional-query/queries/QueryTest/hdfs-scanner-thread-mem-scaling.test @@ -33,3 +33,56 @@ BIGINT,STRING ---- RUNTIME_PROFILE aggregation(SUM, NumScannerThreadsStarted): 1 ==== +---- QUERY +# IMPALA-11068: without tuning hdfs_scanner_non_reserved_bytes, this query can launch +# up to 3 threads (compressed_text_est_bytes ~ 211MB). +set num_nodes=1; +set mem_limit=750m; +select l_orderkey, l_shipmode from tpch_text_gzip.lineitem +where l_comment = 'telets. quickly '; +---- TYPES +BIGINT,STRING +---- RESULTS: VERIFY_IS_EQUAL_SORTED +49824,'RAIL' +1380737,'AIR' +2981252,'TRUCK' +3415170,'MAIL' +---- RUNTIME_PROFILE +aggregation(SUM, NumScannerThreadsStarted): 3 +==== +---- QUERY +# IMPALA-11068: raising hdfs_scanner_non_reserved_bytes above compressed_text_est_bytes +# will reduce NumScannerThreadsStarted. +set num_nodes=1; +set mem_limit=750m; +set hdfs_scanner_non_reserved_bytes=320m; +select l_orderkey, l_shipmode from tpch_text_gzip.lineitem +where l_comment = 'telets. quickly '; +---- TYPES +BIGINT,STRING +---- RESULTS: VERIFY_IS_EQUAL_SORTED +49824,'RAIL' +1380737,'AIR' +2981252,'TRUCK' +3415170,'MAIL' +---- RUNTIME_PROFILE +aggregation(SUM, NumScannerThreadsStarted): 2 +==== +---- QUERY +# IMPALA-11068: high hdfs_scanner_non_reserved_bytes does not impact the first scanner +# thread. +set num_nodes=1; +set mem_limit=750m; +set hdfs_scanner_non_reserved_bytes=2g; +select l_orderkey, l_shipmode from tpch_text_gzip.lineitem +where l_comment = 'telets. quickly '; +---- TYPES +BIGINT,STRING +---- RESULTS: VERIFY_IS_EQUAL_SORTED +49824,'RAIL' +1380737,'AIR' +2981252,'TRUCK' +3415170,'MAIL' +---- RUNTIME_PROFILE +aggregation(SUM, NumScannerThreadsStarted): 1 +====
