This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit c44affb43f5bc4cfa6ee0256d8e41f72f402b3ff Author: wangbo <wan...@apache.org> AuthorDate: Mon May 27 11:42:47 2024 +0800 Add downgrade scan thread num by column num (#35351) --- be/src/vec/exec/scan/scanner_context.cpp | 22 ++++++++++++++++++++++ .../java/org/apache/doris/qe/SessionVariable.java | 10 ++++++++++ gensrc/thrift/PaloInternalService.thrift | 2 ++ 3 files changed, 34 insertions(+) diff --git a/be/src/vec/exec/scan/scanner_context.cpp b/be/src/vec/exec/scan/scanner_context.cpp index 81e4dacba57..c2de00830fc 100644 --- a/be/src/vec/exec/scan/scanner_context.cpp +++ b/be/src/vec/exec/scan/scanner_context.cpp @@ -80,6 +80,28 @@ ScannerContext::ScannerContext(RuntimeState* state, const TupleDescriptor* outpu : state->query_parallel_instance_num()); _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num; _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size()); + + // when user not specify scan_thread_num, so we can try downgrade _max_thread_num. + // becaue we found in a table with 5k columns, column reader may ocuppy too much memory. + // you can refer https://github.com/apache/doris/issues/35340 for details. + int32_t max_column_reader_num = state->query_options().max_column_reader_num; + if (_max_thread_num != 1 && max_column_reader_num > 0) { + int32_t scan_column_num = _output_tuple_desc->slots().size(); + int32_t current_column_num = scan_column_num * _max_thread_num; + if (current_column_num > max_column_reader_num) { + int32_t new_max_thread_num = max_column_reader_num / scan_column_num; + new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num; + if (new_max_thread_num < _max_thread_num) { + int32_t origin_max_thread_num = _max_thread_num; + _max_thread_num = new_max_thread_num; + LOG(INFO) << "downgrade query:" << print_id(state->query_id()) + << " scan's max_thread_num from " << origin_max_thread_num << " to " + << _max_thread_num << ",column num: " << scan_column_num + << ", max_column_reader_num: " << max_column_reader_num; + } + } + } + // 1. Calculate max concurrency // For select * from table limit 10; should just use one thread. if ((_parent && _parent->should_run_serial()) || diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index b17d6b3a417..d826af50a75 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -555,6 +555,8 @@ public class SessionVariable implements Serializable, Writable { public static final String BYPASS_WORKLOAD_GROUP = "bypass_workload_group"; + public static final String MAX_COLUMN_READER_NUM = "max_column_reader_num"; + public static final List<String> DEBUG_VARIABLES = ImmutableList.of( SKIP_DELETE_PREDICATE, SKIP_DELETE_BITMAP, @@ -682,6 +684,9 @@ public class SessionVariable implements Serializable, Writable { "whether bypass workload group's limitation, currently only support bypass query queue"}) public boolean bypassWorkloadGroup = false; + @VariableMgr.VarAttr(name = MAX_COLUMN_READER_NUM) + public int maxColumnReaderNum = 20000; + @VariableMgr.VarAttr(name = RESOURCE_VARIABLE) public String resourceGroup = ""; @@ -2346,6 +2351,10 @@ public class SessionVariable implements Serializable, Writable { return this.bypassWorkloadGroup; } + public int getMaxColumnReaderNum() { + return this.maxColumnReaderNum; + } + public String getResourceGroup() { return resourceGroup; } @@ -3141,6 +3150,7 @@ public class SessionVariable implements Serializable, Writable { tResult.setScanQueueMemLimit(maxScanQueueMemByte); tResult.setNumScannerThreads(numScannerThreads); tResult.setScannerScaleUpRatio(scannerScaleUpRatio); + tResult.setMaxColumnReaderNum(maxColumnReaderNum); // TODO chenhao, reservation will be calculated by cost tResult.setMinReservation(0); diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index a9f6dbae190..dcfa72ee930 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -291,6 +291,8 @@ struct TQueryOptions { 110: optional bool enable_parquet_filter_by_min_max = true 111: optional bool enable_orc_filter_by_min_max = true + + 112: optional i32 max_column_reader_num = 0 // For cloud, to control if the content would be written into file cache 1000: optional bool disable_file_cache = false --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org