This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 60dd322aba [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode (#10942) 60dd322aba is described below commit 60dd322abade619e0adaaf487dd01295ac64198b Author: Ashin Gau <ashin...@users.noreply.github.com> AuthorDate: Mon Jul 18 20:50:34 2022 +0800 [feature-wip](multi-catalog) Optimize threads and thrift interface of FileScanNode (#10942) FileScanNode in be will launch as many threads as the number of splits. The thrift interface of FileScanNode is excessive redundant. --- be/src/vec/exec/file_arrow_scanner.cpp | 2 +- be/src/vec/exec/file_scan_node.cpp | 65 +++++++++++++++++++--- be/src/vec/exec/file_scan_node.h | 1 - be/src/vec/exec/file_text_scanner.cpp | 8 +-- .../planner/external/ExternalFileScanNode.java | 16 +++--- gensrc/thrift/PlanNodes.thrift | 23 ++++---- 6 files changed, 82 insertions(+), 33 deletions(-) diff --git a/be/src/vec/exec/file_arrow_scanner.cpp b/be/src/vec/exec/file_arrow_scanner.cpp index 59a94399db..79c6037c36 100644 --- a/be/src/vec/exec/file_arrow_scanner.cpp +++ b/be/src/vec/exec/file_arrow_scanner.cpp @@ -55,7 +55,7 @@ Status FileArrowScanner::_open_next_reader() { const TFileRangeDesc& range = _ranges[_next_range++]; std::unique_ptr<FileReader> file_reader; FileReader* hdfs_reader = nullptr; - RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path, range.start_offset, &hdfs_reader)); file_reader.reset(new BufferedReader(_profile, hdfs_reader)); RETURN_IF_ERROR(file_reader->open()); diff --git a/be/src/vec/exec/file_scan_node.cpp b/be/src/vec/exec/file_scan_node.cpp index ff4989a033..ad79cc0536 100644 --- a/be/src/vec/exec/file_scan_node.cpp +++ b/be/src/vec/exec/file_scan_node.cpp @@ -17,12 +17,14 @@ #include "vec/exec/file_scan_node.h" +#include "common/config.h" #include "gen_cpp/PlanNodes_types.h" #include "runtime/mem_tracker.h" #include "runtime/runtime_state.h" #include "runtime/string_value.h" #include "runtime/tuple.h" #include "runtime/tuple_row.h" +#include "util/priority_thread_pool.hpp" #include "util/runtime_profile.h" #include "util/thread.h" #include "util/types.h" @@ -100,9 +102,37 @@ Status FileScanNode::start_scanners() { } _scanners_status.resize(_scan_ranges.size()); - for (int i = 0; i < _scan_ranges.size(); i++) { - _scanner_threads.emplace_back(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(), - std::ref(_scanners_status[i])); + ThreadPoolToken* thread_token = _runtime_state->get_query_fragments_ctx()->get_token(); + PriorityThreadPool* thread_pool = _runtime_state->exec_env()->scan_thread_pool(); + for (int i = 0; i < _scan_ranges.size(); ++i) { + Status submit_status = Status::OK(); + if (thread_token != nullptr) { + submit_status = thread_token->submit_func(std::bind(&FileScanNode::scanner_worker, this, + i, _scan_ranges.size(), + std::ref(_scanners_status[i]))); + } else { + PriorityThreadPool::WorkFunction task = + std::bind(&FileScanNode::scanner_worker, this, i, _scan_ranges.size(), + std::ref(_scanners_status[i])); + if (!thread_pool->offer(task)) { + submit_status = Status::Cancelled("Failed to submit scan task"); + } + } + if (!submit_status.ok()) { + LOG(WARNING) << "Failed to assign file scanner task to thread pool! " + << submit_status.get_error_msg(); + _scanners_status[i].set_value(submit_status); + for (int j = i + 1; j < _scan_ranges.size(); ++j) { + _scanners_status[j].set_value(Status::Cancelled("Cancelled")); + } + { + std::lock_guard<std::mutex> l(_batch_queue_lock); + update_status(submit_status); + _num_running_scanners -= _scan_ranges.size() - i; + } + _queue_writer_cond.notify_all(); + break; + } } return Status::OK(); } @@ -205,8 +235,9 @@ Status FileScanNode::close(RuntimeState* state) { _scan_finished.store(true); _queue_writer_cond.notify_all(); _queue_reader_cond.notify_all(); - for (int i = 0; i < _scanner_threads.size(); ++i) { - _scanner_threads[i].join(); + { + std::unique_lock<std::mutex> l(_batch_queue_lock); + _queue_reader_cond.wait(l, [this] { return _num_running_scanners == 0; }); } for (int i = 0; i < _scanners_status.size(); i++) { std::future<Status> f = _scanners_status[i].get_future(); @@ -308,7 +339,7 @@ void FileScanNode::scanner_worker(int start_idx, int length, std::promise<Status std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& scan_range, ScannerCounter* counter) { FileScanner* scan = nullptr; - switch (scan_range.ranges[0].format_type) { + switch (scan_range.params.format_type) { case TFileFormatType::FORMAT_PARQUET: scan = new VFileParquetScanner(_runtime_state, runtime_profile(), scan_range.params, scan_range.ranges, _pre_filter_texprs, counter); @@ -329,7 +360,27 @@ std::unique_ptr<FileScanner> FileScanNode::create_scanner(const TFileScanRange& // This function is called after plan node has been prepared. Status FileScanNode::set_scan_ranges(const std::vector<TScanRangeParams>& scan_ranges) { - _scan_ranges = scan_ranges; + int max_scanners = config::doris_scanner_thread_pool_thread_num; + if (scan_ranges.size() <= max_scanners) { + _scan_ranges = scan_ranges; + } else { + // There is no need for the number of scanners to exceed the number of threads in thread pool. + _scan_ranges.clear(); + auto range_iter = scan_ranges.begin(); + for (int i = 0; i < max_scanners && range_iter != scan_ranges.end(); ++i, ++range_iter) { + _scan_ranges.push_back(*range_iter); + } + for (int i = 0; range_iter != scan_ranges.end(); ++i, ++range_iter) { + if (i == max_scanners) { + i = 0; + } + auto& ranges = _scan_ranges[i].scan_range.ext_scan_range.file_scan_range.ranges; + auto& merged_ranges = range_iter->scan_range.ext_scan_range.file_scan_range.ranges; + ranges.insert(ranges.end(), merged_ranges.begin(), merged_ranges.end()); + } + _scan_ranges.shrink_to_fit(); + LOG(INFO) << "Merge " << scan_ranges.size() << " scan ranges to " << _scan_ranges.size(); + } return Status::OK(); } diff --git a/be/src/vec/exec/file_scan_node.h b/be/src/vec/exec/file_scan_node.h index 2d7deb9626..93a8916c0e 100644 --- a/be/src/vec/exec/file_scan_node.h +++ b/be/src/vec/exec/file_scan_node.h @@ -105,7 +105,6 @@ private: Status _process_status; - std::vector<std::thread> _scanner_threads; std::vector<std::promise<Status>> _scanners_status; int _max_buffered_batches; diff --git a/be/src/vec/exec/file_text_scanner.cpp b/be/src/vec/exec/file_text_scanner.cpp index d541768a2f..883cbd5040 100644 --- a/be/src/vec/exec/file_text_scanner.cpp +++ b/be/src/vec/exec/file_text_scanner.cpp @@ -156,7 +156,7 @@ Status FileTextScanner::_open_file_reader() { const TFileRangeDesc& range = _ranges[_next_range]; FileReader* hdfs_reader = nullptr; - RETURN_IF_ERROR(HdfsReaderWriter::create_reader(range.hdfs_params, range.path, + RETURN_IF_ERROR(HdfsReaderWriter::create_reader(_params.hdfs_params, range.path, range.start_offset, &hdfs_reader)); _cur_file_reader.reset(new BufferedReader(_profile, hdfs_reader)); return _cur_file_reader->open(); @@ -171,7 +171,7 @@ Status FileTextScanner::_open_line_reader() { const TFileRangeDesc& range = _ranges[_next_range]; int64_t size = range.size; if (range.start_offset != 0) { - if (range.format_type != TFileFormatType::FORMAT_CSV_PLAIN) { + if (_params.format_type != TFileFormatType::FORMAT_CSV_PLAIN) { std::stringstream ss; ss << "For now we do not support split compressed file"; return Status::InternalError(ss.str()); @@ -182,14 +182,14 @@ Status FileTextScanner::_open_line_reader() { } // open line reader - switch (range.format_type) { + switch (_params.format_type) { case TFileFormatType::FORMAT_CSV_PLAIN: _cur_line_reader = new PlainTextLineReader(_profile, _cur_file_reader.get(), nullptr, size, _line_delimiter, _line_delimiter_length); break; default: { std::stringstream ss; - ss << "Unknown format type, cannot init line reader, type=" << range.format_type; + ss << "Unknown format type, cannot init line reader, type=" << _params.format_type; return Status::InternalError(ss.str()); } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java index 34c224de82..00051837ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/external/ExternalFileScanNode.java @@ -285,6 +285,14 @@ public class ExternalFileScanNode extends ExternalScanNode { String fullPath = ((FileSplit) inputSplits[0]).getPath().toUri().toString(); String filePath = ((FileSplit) inputSplits[0]).getPath().toUri().getPath(); String fsName = fullPath.replace(filePath, ""); + context.params.setFileType(scanProvider.getTableFileType()); + context.params.setFormatType(scanProvider.getTableFormatType()); + // set hdfs params for hdfs file type. + if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) { + THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties()); + tHdfsParams.setFsName(fsName); + context.params.setHdfsParams(tHdfsParams); + } TScanRangeLocations curLocations = newLocations(context.params); @@ -298,7 +306,6 @@ public class ExternalFileScanNode extends ExternalScanNode { partitionKeys); TFileRangeDesc rangeDesc = createFileRangeDesc(fileSplit, partitionValuesFromPath); - rangeDesc.getHdfsParams().setFsName(fsName); curLocations.getScanRange().getExtScanRange().getFileScanRange().addToRanges(rangeDesc); Log.debug("Assign to backend " + curLocations.getLocations().get(0).getBackendId() @@ -346,17 +353,10 @@ public class ExternalFileScanNode extends ExternalScanNode { FileSplit fileSplit, List<String> columnsFromPath) throws DdlException, MetaNotFoundException { TFileRangeDesc rangeDesc = new TFileRangeDesc(); - rangeDesc.setFileType(scanProvider.getTableFileType()); - rangeDesc.setFormatType(scanProvider.getTableFormatType()); rangeDesc.setPath(fileSplit.getPath().toUri().getPath()); rangeDesc.setStartOffset(fileSplit.getStart()); rangeDesc.setSize(fileSplit.getLength()); rangeDesc.setColumnsFromPath(columnsFromPath); - // set hdfs params for hdfs file type. - if (scanProvider.getTableFileType() == TFileType.FILE_HDFS) { - THdfsParams tHdfsParams = BrokerUtil.generateHdfsParam(scanProvider.getTableProperties()); - rangeDesc.setHdfsParams(tHdfsParams); - } return rangeDesc; } diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 2469ebd4e2..b6d0a3b19a 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -226,29 +226,28 @@ struct TFileScanSlotInfo { } struct TFileScanRangeParams { + 1: optional Types.TFileType file_type; + 2: optional TFileFormatType format_type; // use src_tuple_id to get all slots from src table include both file slot and partition slot. - 1: optional Types.TTupleId src_tuple_id; + 3: optional Types.TTupleId src_tuple_id; // num_of_columns_from_file can spilt the all_file_slot and all_partition_slot - 2: optional i32 num_of_columns_from_file; + 4: optional i32 num_of_columns_from_file; // all selected slots which may compose from file and partiton value. - 3: optional list<TFileScanSlotInfo> required_slots; + 5: optional list<TFileScanSlotInfo> required_slots; - 4: optional TFileTextScanRangeParams text_params; + 6: optional THdfsParams hdfs_params; + 7: optional TFileTextScanRangeParams text_params; } struct TFileRangeDesc { - 1: optional Types.TFileType file_type; - 2: optional TFileFormatType format_type; // Path of this range - 3: optional string path; + 1: optional string path; // Offset of this file start - 4: optional i64 start_offset; + 2: optional i64 start_offset; // Size of this range, if size = -1, this means that will read to the end of file - 5: optional i64 size; + 3: optional i64 size; // columns parsed from file path should be after the columns read from file - 6: optional list<string> columns_from_path; - - 7: optional THdfsParams hdfs_params; + 4: optional list<string> columns_from_path; } // HDFS file scan range --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org