This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git

commit c44affb43f5bc4cfa6ee0256d8e41f72f402b3ff
Author: wangbo <wan...@apache.org>
AuthorDate: Mon May 27 11:42:47 2024 +0800

    Add downgrade scan thread num by column num (#35351)
---
 be/src/vec/exec/scan/scanner_context.cpp           | 22 ++++++++++++++++++++++
 .../java/org/apache/doris/qe/SessionVariable.java  | 10 ++++++++++
 gensrc/thrift/PaloInternalService.thrift           |  2 ++
 3 files changed, 34 insertions(+)

diff --git a/be/src/vec/exec/scan/scanner_context.cpp 
b/be/src/vec/exec/scan/scanner_context.cpp
index 81e4dacba57..c2de00830fc 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -80,6 +80,28 @@ ScannerContext::ScannerContext(RuntimeState* state, const 
TupleDescriptor* outpu
                                                       : 
state->query_parallel_instance_num());
     _max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
     _max_thread_num = std::min(_max_thread_num, (int32_t)scanners.size());
+
+    // when user not specify scan_thread_num, so we can try downgrade 
_max_thread_num.
+    // becaue we found in a table with 5k columns, column reader may ocuppy 
too much memory.
+    // you can refer https://github.com/apache/doris/issues/35340 for details.
+    int32_t max_column_reader_num = 
state->query_options().max_column_reader_num;
+    if (_max_thread_num != 1 && max_column_reader_num > 0) {
+        int32_t scan_column_num = _output_tuple_desc->slots().size();
+        int32_t current_column_num = scan_column_num * _max_thread_num;
+        if (current_column_num > max_column_reader_num) {
+            int32_t new_max_thread_num = max_column_reader_num / 
scan_column_num;
+            new_max_thread_num = new_max_thread_num <= 0 ? 1 : 
new_max_thread_num;
+            if (new_max_thread_num < _max_thread_num) {
+                int32_t origin_max_thread_num = _max_thread_num;
+                _max_thread_num = new_max_thread_num;
+                LOG(INFO) << "downgrade query:" << print_id(state->query_id())
+                          << " scan's max_thread_num from " << 
origin_max_thread_num << " to "
+                          << _max_thread_num << ",column num: " << 
scan_column_num
+                          << ", max_column_reader_num: " << 
max_column_reader_num;
+            }
+        }
+    }
+
     // 1. Calculate max concurrency
     // For select * from table limit 10; should just use one thread.
     if ((_parent && _parent->should_run_serial()) ||
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index b17d6b3a417..d826af50a75 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -555,6 +555,8 @@ public class SessionVariable implements Serializable, 
Writable {
 
     public static final String BYPASS_WORKLOAD_GROUP = "bypass_workload_group";
 
+    public static final String MAX_COLUMN_READER_NUM = "max_column_reader_num";
+
     public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
             SKIP_DELETE_PREDICATE,
             SKIP_DELETE_BITMAP,
@@ -682,6 +684,9 @@ public class SessionVariable implements Serializable, 
Writable {
             "whether bypass workload group's limitation, currently only 
support bypass query queue"})
     public boolean bypassWorkloadGroup = false;
 
+    @VariableMgr.VarAttr(name = MAX_COLUMN_READER_NUM)
+    public int maxColumnReaderNum = 20000;
+
     @VariableMgr.VarAttr(name = RESOURCE_VARIABLE)
     public String resourceGroup = "";
 
@@ -2346,6 +2351,10 @@ public class SessionVariable implements Serializable, 
Writable {
         return this.bypassWorkloadGroup;
     }
 
+    public int getMaxColumnReaderNum() {
+        return this.maxColumnReaderNum;
+    }
+
     public String getResourceGroup() {
         return resourceGroup;
     }
@@ -3141,6 +3150,7 @@ public class SessionVariable implements Serializable, 
Writable {
         tResult.setScanQueueMemLimit(maxScanQueueMemByte);
         tResult.setNumScannerThreads(numScannerThreads);
         tResult.setScannerScaleUpRatio(scannerScaleUpRatio);
+        tResult.setMaxColumnReaderNum(maxColumnReaderNum);
 
         // TODO chenhao, reservation will be calculated by cost
         tResult.setMinReservation(0);
diff --git a/gensrc/thrift/PaloInternalService.thrift 
b/gensrc/thrift/PaloInternalService.thrift
index a9f6dbae190..dcfa72ee930 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -291,6 +291,8 @@ struct TQueryOptions {
   
   110: optional bool enable_parquet_filter_by_min_max = true
   111: optional bool enable_orc_filter_by_min_max = true
+
+  112: optional i32 max_column_reader_num = 0
   
   // For cloud, to control if the content would be written into file cache
   1000: optional bool disable_file_cache = false


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to