This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
commit b177b26d3988efc33b76efa0d76fadc57aa52960 Author: zclllyybb <[email protected]> AuthorDate: Wed Feb 28 22:35:41 2024 +0800 [branch-2.1](tracing) Pick pipeline tracing and relative bugfix (#31367) * [Feature](pipeline) Trace pipeline scheduling (part I) (#31027) * [fix](compile) Fix performance compile fail #31305 * [fix](compile) Fix macOS compilation issues for PURE macro and CPU core identification (#31357) * [fix](compile) Correct PURE macro definition to fix compilation on macOS * 2 --------- Co-authored-by: zy-kkk <[email protected]> --- be/src/common/compiler_util.h | 2 + be/src/common/config.cpp | 1 + be/src/common/config.h | 1 + be/src/exec/data_sink.cpp | 19 --- be/src/exec/data_sink.h | 3 - .../action/adjust_tracing_dump.cpp} | 47 ++---- .../action/adjust_tracing_dump.h} | 32 +--- be/src/io/fs/file_system.h | 13 +- be/src/io/fs/local_file_system.h | 13 +- be/src/io/fs/local_file_writer.h | 9 +- be/src/pipeline/pipeline.cpp | 13 +- be/src/pipeline/pipeline.h | 18 ++- be/src/pipeline/pipeline_fragment_context.cpp | 32 +--- be/src/pipeline/pipeline_fragment_context.h | 11 +- be/src/pipeline/pipeline_task.h | 11 +- be/src/pipeline/pipeline_tracing.cpp | 162 +++++++++++++++++++++ be/src/pipeline/pipeline_tracing.h | 83 +++++++++++ .../pipeline_x/pipeline_x_fragment_context.cpp | 63 ++++---- .../pipeline_x/pipeline_x_fragment_context.h | 2 - be/src/pipeline/task_scheduler.cpp | 84 +++++++---- be/src/pipeline/task_scheduler.h | 3 - be/src/runtime/exec_env.h | 25 ++-- be/src/runtime/exec_env_init.cpp | 2 + be/src/runtime/fragment_mgr.cpp | 2 +- be/src/runtime/query_context.cpp | 24 ++- be/src/runtime/query_context.h | 7 +- be/src/service/http_service.cpp | 14 +- be/src/util/thrift_util.cpp | 34 ++++- be/src/util/thrift_util.h | 20 +-- build.sh | 1 + 30 files changed, 501 insertions(+), 250 deletions(-) diff --git a/be/src/common/compiler_util.h b/be/src/common/compiler_util.h index 4b684659ed9..79588cade91 100644 --- a/be/src/common/compiler_util.h +++ b/be/src/common/compiler_util.h @@ -49,3 +49,5 @@ #define MAY_ALIAS __attribute__((__may_alias__)) #define ALIGN_CACHE_LINE __attribute__((aligned(CACHE_LINE_SIZE))) + +#define PURE __attribute__((pure)) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 46c98bb763b..b49b95e8e5d 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -199,6 +199,7 @@ DEFINE_mInt32(download_low_speed_time, "300"); // log dir DEFINE_String(sys_log_dir, "${DORIS_HOME}/log"); DEFINE_String(user_function_dir, "${DORIS_HOME}/lib/udf"); +DEFINE_String(pipeline_tracing_log_dir, "${DORIS_HOME}/log/tracing"); // INFO, WARNING, ERROR, FATAL DEFINE_mString(sys_log_level, "INFO"); // TIME-DAY, TIME-HOUR, SIZE-MB-nnn diff --git a/be/src/common/config.h b/be/src/common/config.h index 8878cba0e64..c82d42a9ef4 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -247,6 +247,7 @@ DECLARE_mInt32(download_low_speed_time); // log dir DECLARE_String(sys_log_dir); DECLARE_String(user_function_dir); +DECLARE_String(pipeline_tracing_log_dir); // INFO, WARNING, ERROR, FATAL DECLARE_String(sys_log_level); // TIME-DAY, TIME-HOUR, SIZE-MB-nnn diff --git a/be/src/exec/data_sink.cpp b/be/src/exec/data_sink.cpp index 5809912f4a2..5047ae8fc78 100644 --- a/be/src/exec/data_sink.cpp +++ b/be/src/exec/data_sink.cpp @@ -346,23 +346,4 @@ Status DataSink::init(const TDataSink& thrift_sink) { Status DataSink::prepare(RuntimeState* state) { return Status::OK(); } - -bool DataSink::_has_inverted_index_or_partial_update(TOlapTableSink sink) { - OlapTableSchemaParam schema; - if (!schema.init(sink.schema).ok()) { - return false; - } - if (schema.is_partial_update()) { - return true; - } - for (const auto& index_schema : schema.indexes()) { - for (const auto& index : index_schema->indexes) { - if (index->index_type() == INVERTED) { - return true; - } - } - } - return false; -} - } // namespace doris diff --git a/be/src/exec/data_sink.h b/be/src/exec/data_sink.h index e08d40ab023..5258929ba79 100644 --- a/be/src/exec/data_sink.h +++ b/be/src/exec/data_sink.h @@ -106,9 +106,6 @@ public: std::shared_ptr<QueryStatistics> get_query_statistics_ptr(); -private: - static bool _has_inverted_index_or_partial_update(TOlapTableSink sink); - protected: // Set to true after close() has been called. subclasses should check and set this in // close(). diff --git a/be/src/io/fs/local_file_writer.h b/be/src/http/action/adjust_tracing_dump.cpp similarity index 52% copy from be/src/io/fs/local_file_writer.h copy to be/src/http/action/adjust_tracing_dump.cpp index b138879e358..55d1526d82b 100644 --- a/be/src/io/fs/local_file_writer.h +++ b/be/src/http/action/adjust_tracing_dump.cpp @@ -15,38 +15,23 @@ // specific language governing permissions and limitations // under the License. -#pragma once +#include "adjust_tracing_dump.h" -#include <cstddef> - -#include "common/status.h" -#include "io/fs/file_system.h" -#include "io/fs/file_writer.h" -#include "io/fs/path.h" -#include "util/slice.h" +#include "common/logging.h" +#include "http/http_channel.h" +#include "http/http_request.h" +#include "http/http_status.h" +#include "runtime/exec_env.h" namespace doris { -namespace io { - -class LocalFileWriter final : public FileWriter { -public: - LocalFileWriter(Path path, int fd, FileSystemSPtr fs, bool sync_data = true); - LocalFileWriter(Path path, int fd); - ~LocalFileWriter() override; - - Status close() override; - Status appendv(const Slice* data, size_t data_cnt) override; - Status finalize() override; - -private: - void _abort(); - Status _close(bool sync); - -private: - int _fd; // owned - bool _dirty = false; - const bool _sync_data; -}; - -} // namespace io +void AdjustTracingDump::handle(HttpRequest* req) { + auto* ctx = ExecEnv::GetInstance()->pipeline_tracer_context(); + auto* params = req->params(); + if (auto status = ctx->change_record_params(*params); status.ok()) { + HttpChannel::send_reply(req, "change record type succeed!\n"); // ok + } else { // not ok + LOG(WARNING) << "adjust pipeline tracing dump method failed:" << status.msg() << '\n'; + HttpChannel::send_reply(req, HttpStatus::NOT_FOUND, status.msg().data()); + } +} } // namespace doris diff --git a/be/src/io/fs/local_file_writer.h b/be/src/http/action/adjust_tracing_dump.h similarity index 56% copy from be/src/io/fs/local_file_writer.h copy to be/src/http/action/adjust_tracing_dump.h index b138879e358..d82d8b1e55b 100644 --- a/be/src/io/fs/local_file_writer.h +++ b/be/src/http/action/adjust_tracing_dump.h @@ -17,36 +17,18 @@ #pragma once -#include <cstddef> - -#include "common/status.h" -#include "io/fs/file_system.h" -#include "io/fs/file_writer.h" -#include "io/fs/path.h" -#include "util/slice.h" +#include "http/http_handler.h" namespace doris { -namespace io { -class LocalFileWriter final : public FileWriter { -public: - LocalFileWriter(Path path, int fd, FileSystemSPtr fs, bool sync_data = true); - LocalFileWriter(Path path, int fd); - ~LocalFileWriter() override; +class HttpRequest; - Status close() override; - Status appendv(const Slice* data, size_t data_cnt) override; - Status finalize() override; +class AdjustTracingDump : public HttpHandler { +public: + AdjustTracingDump() = default; -private: - void _abort(); - Status _close(bool sync); + ~AdjustTracingDump() override = default; -private: - int _fd; // owned - bool _dirty = false; - const bool _sync_data; + void handle(HttpRequest* req) override; }; - -} // namespace io } // namespace doris diff --git a/be/src/io/fs/file_system.h b/be/src/io/fs/file_system.h index 82d3911ae9d..a8cdf5f4eb6 100644 --- a/be/src/io/fs/file_system.h +++ b/be/src/io/fs/file_system.h @@ -19,8 +19,8 @@ #include <butil/macros.h> #include <glog/logging.h> -#include <stdint.h> +#include <cstdint> #include <memory> #include <string> #include <vector> @@ -29,8 +29,7 @@ #include "io/fs/file_reader_writer_fwd.h" #include "io/fs/path.h" -namespace doris { -namespace io { +namespace doris::io { #ifndef FILESYSTEM_M #define FILESYSTEM_M(stmt) \ @@ -83,7 +82,6 @@ public: std::shared_ptr<FileSystem> getSPtr() { return shared_from_this(); } -public: // the root path of this fs. // if not empty, all given Path will be "_root_path/path" const Path& root_path() const { return _root_path; } @@ -97,7 +95,8 @@ public: virtual ~FileSystem() = default; // Each derived class should implement create method to create fs. - DISALLOW_COPY_AND_ASSIGN(FileSystem); + FileSystem(const FileSystem&) = delete; + const FileSystem& operator=(const FileSystem&) = delete; protected: /// create file and return a FileWriter @@ -152,7 +151,6 @@ protected: return _root_path / path; } -protected: FileSystem(Path&& root_path, std::string&& id, FileSystemType type) : _root_path(std::move(root_path)), _id(std::move(id)), _type(type) {} @@ -163,5 +161,4 @@ protected: using FileSystemSPtr = std::shared_ptr<FileSystem>; -} // namespace io -} // namespace doris +} // namespace doris::io diff --git a/be/src/io/fs/local_file_system.h b/be/src/io/fs/local_file_system.h index 705f0ed36e9..8578b9f5ac2 100644 --- a/be/src/io/fs/local_file_system.h +++ b/be/src/io/fs/local_file_system.h @@ -17,9 +17,8 @@ #pragma once -#include <stdint.h> -#include <time.h> - +#include <cstdint> +#include <ctime> #include <functional> #include <memory> #include <string> @@ -29,8 +28,7 @@ #include "io/fs/file_system.h" #include "io/fs/path.h" -namespace doris { -namespace io { +namespace doris::io { class LocalFileSystem final : public FileSystem { public: @@ -106,7 +104,6 @@ private: LocalFileSystem(Path&& root_path, std::string&& id = ""); }; -const std::shared_ptr<LocalFileSystem>& global_local_filesystem(); +PURE const std::shared_ptr<LocalFileSystem>& global_local_filesystem(); -} // namespace io -} // namespace doris +} // namespace doris::io diff --git a/be/src/io/fs/local_file_writer.h b/be/src/io/fs/local_file_writer.h index b138879e358..8c8d436d54e 100644 --- a/be/src/io/fs/local_file_writer.h +++ b/be/src/io/fs/local_file_writer.h @@ -25,8 +25,7 @@ #include "io/fs/path.h" #include "util/slice.h" -namespace doris { -namespace io { +namespace doris::io { class LocalFileWriter final : public FileWriter { public: @@ -42,11 +41,9 @@ private: void _abort(); Status _close(bool sync); -private: int _fd; // owned bool _dirty = false; - const bool _sync_data; + const bool _sync_data = false; }; -} // namespace io -} // namespace doris +} // namespace doris::io diff --git a/be/src/pipeline/pipeline.cpp b/be/src/pipeline/pipeline.cpp index 9a30cee5ab9..9c9d7cd6099 100644 --- a/be/src/pipeline/pipeline.cpp +++ b/be/src/pipeline/pipeline.cpp @@ -17,7 +17,8 @@ #include "pipeline.h" -#include <ostream> +#include <memory> +#include <string> #include <utility> #include "pipeline/exec/operator.h" @@ -26,10 +27,13 @@ namespace doris::pipeline { void Pipeline::_init_profile() { auto s = fmt::format("Pipeline (pipeline id={})", _pipeline_id); - _pipeline_profile.reset(new RuntimeProfile(std::move(s))); + _pipeline_profile = std::make_unique<RuntimeProfile>(std::move(s)); } Status Pipeline::build_operators() { + _name.reserve(_operator_builders.size() * 10); + _name.append(std::to_string(id())); + OperatorPtr pre; for (auto& operator_t : _operator_builders) { auto o = operator_t->build_operator(); @@ -37,6 +41,11 @@ Status Pipeline::build_operators() { static_cast<void>(o->set_child(pre)); } _operators.emplace_back(o); + + _name.push_back('-'); + _name.append(std::to_string(operator_t->id())); + _name.append(o->get_name()); + pre = std::move(o); } return Status::OK(); diff --git a/be/src/pipeline/pipeline.h b/be/src/pipeline/pipeline.h index 148191f4e2d..ab5b7e36bc2 100644 --- a/be/src/pipeline/pipeline.h +++ b/be/src/pipeline/pipeline.h @@ -18,11 +18,11 @@ #pragma once #include <glog/logging.h> -#include <stdint.h> -#include <algorithm> -#include <atomic> +#include <cstdint> #include <memory> +#include <string_view> +#include <utility> #include <vector> #include "common/status.h" @@ -42,18 +42,19 @@ using PipelineId = uint32_t; class Pipeline : public std::enable_shared_from_this<Pipeline> { friend class PipelineTask; friend class PipelineXTask; + friend class PipelineXFragmentContext; public: Pipeline() = delete; explicit Pipeline(PipelineId pipeline_id, int num_tasks, std::weak_ptr<PipelineFragmentContext> context) - : _pipeline_id(pipeline_id), _context(context), _num_tasks(num_tasks) { + : _pipeline_id(pipeline_id), _context(std::move(context)), _num_tasks(num_tasks) { _init_profile(); } void add_dependency(std::shared_ptr<Pipeline>& pipeline) { - pipeline->_parents.push_back({_operator_builders.size(), weak_from_this()}); - _dependencies.push_back({_operator_builders.size(), pipeline}); + pipeline->_parents.emplace_back(_operator_builders.size(), weak_from_this()); + _dependencies.emplace_back(_operator_builders.size(), pipeline); } // If all dependencies are finished, this pipeline task should be scheduled. @@ -192,6 +193,11 @@ private: std::weak_ptr<PipelineFragmentContext> _context; int _previous_schedule_id = -1; + // pipline id + operator names. init when: + // build_operators(), if pipeline; + // _build_pipelines() and _create_tree_helper(), if pipelineX. + std::string _name; + std::unique_ptr<RuntimeProfile> _pipeline_profile; // Operators for pipelineX. All pipeline tasks share operators from this. diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 358086e94eb..239173fa781 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -117,10 +117,10 @@ namespace doris::pipeline { bvar::Adder<int64_t> g_pipeline_tasks_count("doris_pipeline_tasks_count"); PipelineFragmentContext::PipelineFragmentContext( - const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id, - int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env, + const TUniqueId& query_id, const TUniqueId& instance_id, int fragment_id, int backend_num, + std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env, const std::function<void(RuntimeState*, Status*)>& call_back, - const report_status_callback& report_status_cb) + report_status_callback report_status_cb) : _query_id(query_id), _fragment_instance_id(instance_id), _fragment_id(fragment_id), @@ -129,7 +129,7 @@ PipelineFragmentContext::PipelineFragmentContext( _query_ctx(std::move(query_ctx)), _call_back(call_back), _is_report_on_cancel(true), - _report_status_cb(report_status_cb), + _report_status_cb(std::move(report_status_cb)), _create_time(MonotonicNanos()) { _fragment_watcher.start(); } @@ -951,31 +951,13 @@ Status PipelineFragmentContext::send_report(bool done) { _fragment_instance_id, _backend_num, _runtime_state.get(), - [this](auto&& PH1) { return update_status(std::forward<decltype(PH1)>(PH1)); }, - [this](auto&& PH1, auto&& PH2) { - cancel(std::forward<decltype(PH1)>(PH1), std::forward<decltype(PH2)>(PH2)); + [this](Status st) { return update_status(st); }, + [this](const PPlanFragmentCancelReason& reason, const std::string& msg) { + cancel(reason, msg); }}, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this())); } -bool PipelineFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink sink) { - OlapTableSchemaParam schema; - if (!schema.init(sink.schema).ok()) { - return false; - } - if (schema.is_partial_update()) { - return true; - } - for (const auto& index_schema : schema.indexes()) { - for (const auto& index : index_schema->indexes) { - if (index->index_type() == INVERTED) { - return true; - } - } - } - return false; -} - std::string PipelineFragmentContext::debug_string() { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "PipelineFragmentContext Info: QueryId = {}\n", diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 4c805b50582..9925689cb2c 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -19,13 +19,11 @@ #include <gen_cpp/Types_types.h> #include <gen_cpp/types.pb.h> -#include <stddef.h> -#include <stdint.h> #include <atomic> -#include <condition_variable> +#include <cstddef> +#include <cstdint> #include <functional> -#include <future> #include <memory> #include <mutex> #include <string> @@ -62,10 +60,10 @@ public: using report_status_callback = std::function<Status( const ReportStatusRequest, std::shared_ptr<pipeline::PipelineFragmentContext>&&)>; PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& instance_id, - const int fragment_id, int backend_num, + int fragment_id, int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env, const std::function<void(RuntimeState*, Status*)>& call_back, - const report_status_callback& report_status_cb); + report_status_callback report_status_cb); ~PipelineFragmentContext() override; @@ -210,7 +208,6 @@ protected: int _num_instances = 1; private: - static bool _has_inverted_index_or_partial_update(TOlapTableSink sink); std::vector<std::unique_ptr<PipelineTask>> _tasks; uint64_t _create_time; diff --git a/be/src/pipeline/pipeline_task.h b/be/src/pipeline/pipeline_task.h index b54a6de593d..517d6b8a8de 100644 --- a/be/src/pipeline/pipeline_task.h +++ b/be/src/pipeline/pipeline_task.h @@ -17,11 +17,10 @@ #pragma once -#include <stdint.h> - +#include <cstdint> #include <memory> #include <string> -#include <vector> +#include <string_view> #include "common/config.h" #include "common/status.h" @@ -266,7 +265,7 @@ public: } // If enable_debug_log_timeout_secs <= 0, then disable the log if (_pipeline_task_watcher.elapsed_time() > - config::enable_debug_log_timeout_secs * 1000l * 1000l * 1000l) { + config::enable_debug_log_timeout_secs * 1000L * 1000L * 1000L) { _has_exceed_timeout = true; return true; } @@ -286,7 +285,9 @@ public: } } - RuntimeState* runtime_state() { return _state; } + RuntimeState* runtime_state() const { return _state; } + + std::string task_name() const { return fmt::format("task{}({})", _index, _pipeline->_name); } protected: void _finish_p_dependency() { diff --git a/be/src/pipeline/pipeline_tracing.cpp b/be/src/pipeline/pipeline_tracing.cpp new file mode 100644 index 00000000000..94675f77f63 --- /dev/null +++ b/be/src/pipeline/pipeline_tracing.cpp @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline_tracing.h" + +#include <absl/time/clock.h> +#include <fcntl.h> + +#include <boost/algorithm/string/predicate.hpp> +#include <chrono> +#include <cstdint> +#include <mutex> +#include <string> + +#include "common/config.h" +#include "common/exception.h" +#include "common/status.h" +#include "io/fs/local_file_writer.h" +#include "util/time.h" + +namespace doris::pipeline { + +void PipelineTracerContext::record(ScheduleRecord record) { + if (_dump_type == RecordType::None) [[unlikely]] { + return; + } + if (_datas.contains(record.query_id)) { + _datas[record.query_id].enqueue(record); + } else { + std::unique_lock<std::mutex> l(_data_lock); // add new item, may rehash + _datas[record.query_id].enqueue(record); + } +} + +void PipelineTracerContext::end_query(TUniqueId query_id, uint64_t task_group) { + { + std::unique_lock<std::mutex> l(_tg_lock); + _id_to_taskgroup[query_id] = task_group; + } + if (_dump_type == RecordType::PerQuery) { + _dump(query_id); + } else if (_dump_type == RecordType::Periodic) { + auto now = MonotonicSeconds(); + auto interval = now - _last_dump_time; + if (interval > _dump_interval_s) { + _dump(query_id); + } + } +} + +Status PipelineTracerContext::change_record_params( + const std::map<std::string, std::string>& params) { + bool effective = false; + if (auto it = params.find("type"); it != params.end()) { + if (boost::iequals(it->second, "disable") || boost::iequals(it->second, "none")) { + _dump_type = RecordType::None; + effective = true; + } else if (boost::iequals(it->second, "per_query") || + boost::iequals(it->second, "perquery")) { + _dump_type = RecordType::PerQuery; + effective = true; + } else if (boost::iequals(it->second, "periodic")) { + _dump_type = RecordType::Periodic; + _last_dump_time = MonotonicSeconds(); + effective = true; + } + } + + if (auto it = params.find("dump_interval"); it != params.end()) { + _dump_interval_s = std::stoll(it->second); // s as unit + effective = true; + } + + return effective ? Status::OK() + : Status::InvalidArgument( + "No qualified param in changing tracing record method"); +} + +void PipelineTracerContext::_dump(TUniqueId query_id) { + if (_dump_type == RecordType::None) { + return; + } + + //TODO: when dump, now could append records but can't add new query. try use better grained locks. + std::unique_lock<std::mutex> l(_data_lock); // can't rehash + if (_dump_type == RecordType::PerQuery) { + auto path = _dir / fmt::format("query{}", to_string(query_id)); + int fd = ::open( + path.c_str(), O_CREAT | O_WRONLY | O_TRUNC, + S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); + if (fd < 0) [[unlikely]] { + throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>( + "create tracing log file {} failed", path.c_str())); + } + auto writer = io::LocalFileWriter {path, fd}; + + ScheduleRecord record; + while (_datas[query_id].try_dequeue(record)) { + uint64_t v = 0; + { + std::unique_lock<std::mutex> l(_tg_lock); + v = _id_to_taskgroup[query_id]; + } + auto tmp_str = record.to_string(v); + auto text = Slice {tmp_str}; + THROW_IF_ERROR(writer.appendv(&text, 1)); + } + + THROW_IF_ERROR(writer.finalize()); + THROW_IF_ERROR(writer.close()); + } else if (_dump_type == RecordType::Periodic) { + auto path = _dir / fmt::format("until{}", + std::chrono::steady_clock::now().time_since_epoch().count()); + int fd = ::open( + path.c_str(), O_CREAT | O_WRONLY | O_TRUNC, + S_ISGID | S_ISUID | S_IWUSR | S_IRUSR | S_IWGRP | S_IRGRP | S_IWOTH | S_IROTH); + if (fd < 0) [[unlikely]] { + throw Exception(Status::Error<ErrorCode::CREATE_FILE_ERROR>( + "create tracing log file {} failed", path.c_str())); + } + auto writer = io::LocalFileWriter {path, fd}; + + for (auto& [id, trace] : _datas) { + ScheduleRecord record; + while (trace.try_dequeue(record)) { + uint64_t v = 0; + { + std::unique_lock<std::mutex> l(_tg_lock); + v = _id_to_taskgroup[query_id]; + } + auto tmp_str = record.to_string(v); + auto text = Slice {tmp_str}; + THROW_IF_ERROR(writer.appendv(&text, 1)); + } + } + THROW_IF_ERROR(writer.finalize()); + THROW_IF_ERROR(writer.close()); + + _last_dump_time = MonotonicSeconds(); + } + + _datas.erase(query_id); + { + std::unique_lock<std::mutex> l(_tg_lock); + _id_to_taskgroup.erase(query_id); + } +} +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_tracing.h b/be/src/pipeline/pipeline_tracing.h new file mode 100644 index 00000000000..3160148c570 --- /dev/null +++ b/be/src/pipeline/pipeline_tracing.h @@ -0,0 +1,83 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <concurrentqueue.h> +#include <fmt/core.h> +#include <gen_cpp/Types_types.h> +#include <parallel_hashmap/phmap.h> + +#include <cstdint> +#include <filesystem> + +#include "common/config.h" +#include "util/hash_util.hpp" // IWYU pragma: keep +#include "util/thrift_util.h" +#include "util/time.h" + +namespace doris::pipeline { + +struct ScheduleRecord { + TUniqueId query_id; + std::string task_id; + uint32_t core_id; + uint64_t thread_id; + uint64_t start_time; + uint64_t end_time; + std::string state_name; + + bool operator<(const ScheduleRecord& rhs) const { return start_time < rhs.start_time; } + std::string to_string(uint64_t append_value) const { + return fmt::format("{}|{}|{}|{}|{}|{}|{}|{}\n", doris::to_string(query_id), task_id, + core_id, thread_id, start_time, end_time, state_name, append_value); + } +}; + +// all tracing datas of ONE specific query +using OneQueryTraces = moodycamel::ConcurrentQueue<ScheduleRecord>; + +// belongs to exec_env, for all query, if enable +class PipelineTracerContext { +public: + enum class RecordType { + None, // disable + PerQuery, // record per query. one query one file. + Periodic // record per times. one timeslice one file. + }; + void record(ScheduleRecord record); // record one schedule record + void end_query(TUniqueId query_id, + uint64_t task_group); // tell context this query is end. may leads to dump. + Status change_record_params(const std::map<std::string, std::string>& params); + + bool enabled() const { return !(_dump_type == RecordType::None); } + +private: + void _dump(TUniqueId query_id); // dump data to disk. one query or all. + + std::mutex _data_lock; // lock for map, not map items. + phmap::flat_hash_map<TUniqueId, OneQueryTraces> _datas; + std::mutex _tg_lock; //TODO: use an lockfree DS + phmap::flat_hash_map<TUniqueId, uint64_t> _id_to_taskgroup; + + RecordType _dump_type = RecordType::None; + std::filesystem::path _dir = config::pipeline_tracing_log_dir; + decltype(MonotonicSeconds()) _last_dump_time; + decltype(MonotonicSeconds()) _dump_interval_s = + 60; // effective iff Periodic mode. 1 minute default. +}; +} // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp index 696fcfefba5..f13cf37b1fb 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.cpp @@ -23,12 +23,13 @@ #include <gen_cpp/Planner_types.h> #include <pthread.h> #include <runtime/result_buffer_mgr.h> -#include <stdlib.h> + +#include <cstdlib> // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep #include <map> +#include <memory> #include <ostream> -#include <typeinfo> #include <utility> #include "common/config.h" @@ -42,7 +43,6 @@ #include "pipeline/exec/analytic_sink_operator.h" #include "pipeline/exec/analytic_source_operator.h" #include "pipeline/exec/assert_num_rows_operator.h" -#include "pipeline/exec/data_queue.h" #include "pipeline/exec/datagen_operator.h" #include "pipeline/exec/distinct_streaming_aggregation_operator.h" #include "pipeline/exec/empty_set_operator.h" @@ -67,7 +67,6 @@ #include "pipeline/exec/repeat_operator.h" #include "pipeline/exec/result_file_sink_operator.h" #include "pipeline/exec/result_sink_operator.h" -#include "pipeline/exec/scan_operator.h" #include "pipeline/exec/schema_scan_operator.h" #include "pipeline/exec/select_operator.h" #include "pipeline/exec/set_probe_sink_operator.h" @@ -93,7 +92,6 @@ #include "util/container_util.hpp" #include "util/debug_util.h" #include "util/uid_util.h" -#include "vec/common/assert_cast.h" #include "vec/runtime/vdata_stream_mgr.h" namespace doris::pipeline { @@ -162,7 +160,7 @@ Status PipelineXFragmentContext::prepare(const doris::TPipelineFragmentParams& r } _num_instances = request.local_params.size(); _total_instances = request.__isset.total_instances ? request.total_instances : _num_instances; - _runtime_profile.reset(new RuntimeProfile("PipelineContext")); + _runtime_profile = std::make_unique<RuntimeProfile>("PipelineContext"); _prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime"); SCOPED_TIMER(_prepare_timer); @@ -378,7 +376,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData // TODO: figure out good buffer size based on size of output row // Result file sink is not the top sink - if (params.__isset.destinations && params.destinations.size() > 0) { + if (params.__isset.destinations && !params.destinations.empty()) { _sink.reset(new ResultFileSinkOperatorX(next_sink_operator_id(), row_desc, thrift_sink.result_file_sink, params.destinations, output_exprs, desc_tbl)); @@ -408,7 +406,7 @@ Status PipelineXFragmentContext::_create_data_sink(ObjectPool* pool, const TData auto new_pipeline = add_pipeline(); RowDescriptor* _row_desc = nullptr; { - auto& tmp_row_desc = + const auto& tmp_row_desc = !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty() ? RowDescriptor(state->desc_tbl(), {thrift_sink.multi_cast_stream_sink.sinks[i] @@ -602,14 +600,14 @@ Status PipelineXFragmentContext::_build_pipeline_tasks( return Status::OK(); }; - for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { - if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) { - auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()]; + for (auto& _pipeline : _pipelines) { + if (pipeline_id_to_task.contains(_pipeline->id())) { + auto* task = pipeline_id_to_task[_pipeline->id()]; DCHECK(task != nullptr); // if this task has upstream dependency, then record them. - if (_dag.find(_pipelines[pip_idx]->id()) != _dag.end()) { - auto& deps = _dag[_pipelines[pip_idx]->id()]; + if (_dag.find(_pipeline->id()) != _dag.end()) { + auto& deps = _dag[_pipeline->id()]; for (auto& dep : deps) { if (pipeline_id_to_task.contains(dep)) { task->add_upstream_dependency( @@ -642,11 +640,14 @@ Status PipelineXFragmentContext::_build_pipelines(ObjectPool* pool, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr* root, PipelinePtr cur_pipe) { - if (request.fragment.plan.nodes.size() == 0) { + if (request.fragment.plan.nodes.empty()) { throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!"); } int node_idx = 0; + + cur_pipe->_name.append(std::to_string(cur_pipe->id())); + RETURN_IF_ERROR(_create_tree_helper(pool, request.fragment.plan.nodes, request, descs, nullptr, &node_idx, root, cur_pipe, 0)); @@ -688,6 +689,10 @@ Status PipelineXFragmentContext::_create_tree_helper(ObjectPool* pool, *root = op; } + cur_pipe->_name.push_back('-'); + cur_pipe->_name.append(std::to_string(op->id())); + cur_pipe->_name.append(op->get_name()); + // rely on that tnodes is preorder of the plan for (int i = 0; i < num_children; i++) { ++*node_idx; @@ -875,6 +880,8 @@ Status PipelineXFragmentContext::_add_local_exchange( return Status::OK(); } +// NOLINTBEGIN(readability-function-size) +// NOLINTBEGIN(readability-function-cognitive-complexity) Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode, const doris::TPipelineFragmentParams& request, const DescriptorTbl& descs, OperatorXPtr& op, @@ -1150,6 +1157,9 @@ Status PipelineXFragmentContext::_create_operator(ObjectPool* pool, const TPlanN return Status::OK(); } +// NOLINTEND(readability-function-cognitive-complexity) +// NOLINTEND(readability-function-size) + template <bool is_intersect> Status PipelineXFragmentContext::_build_operators_for_set_operation_node( ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorXPtr& op, @@ -1295,30 +1305,13 @@ Status PipelineXFragmentContext::send_report(bool done) { {true, exec_status, runtime_states, nullptr, _runtime_state->load_channel_profile(), done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, TUniqueId(), _backend_num, _runtime_state.get(), - std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), - std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, - std::placeholders::_2)}, + [this](Status st) { return update_status(st); }, + [this](const PPlanFragmentCancelReason& reason, const std::string& msg) { + cancel(reason, msg); + }}, std::dynamic_pointer_cast<PipelineXFragmentContext>(shared_from_this())); } -bool PipelineXFragmentContext::_has_inverted_index_or_partial_update(TOlapTableSink sink) { - OlapTableSchemaParam schema; - if (!schema.init(sink.schema).ok()) { - return false; - } - if (schema.is_partial_update()) { - return true; - } - for (const auto& index_schema : schema.indexes()) { - for (const auto& index : index_schema->indexes) { - if (index->index_type() == INVERTED) { - return true; - } - } - } - return false; -} - std::string PipelineXFragmentContext::debug_string() { fmt::memory_buffer debug_string_buffer; fmt::format_to(debug_string_buffer, "PipelineXFragmentContext Info:\n"); diff --git a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h index 102dd854998..9630484f443 100644 --- a/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h +++ b/be/src/pipeline/pipeline_x/pipeline_x_fragment_context.h @@ -169,8 +169,6 @@ private: const std::map<int, int>& shuffle_idx_to_instance_idx, const bool ignore_data_distribution); - bool _has_inverted_index_or_partial_update(TOlapTableSink sink); - bool _enable_local_shuffle() const { return _runtime_state->enable_local_shuffle(); } OperatorXPtr _root_op = nullptr; diff --git a/be/src/pipeline/task_scheduler.cpp b/be/src/pipeline/task_scheduler.cpp index 98678685d3f..5032bdef6b7 100644 --- a/be/src/pipeline/task_scheduler.cpp +++ b/be/src/pipeline/task_scheduler.cpp @@ -23,33 +23,33 @@ #include <glog/logging.h> #include <sched.h> -#include <algorithm> // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep #include <functional> #include <ostream> #include <string> #include <thread> +#include <utility> #include "common/logging.h" -#include "common/signal_handler.h" #include "pipeline/pipeline_task.h" -#include "pipeline/pipeline_x/dependency.h" #include "pipeline/pipeline_x/pipeline_x_task.h" #include "pipeline/task_queue.h" #include "pipeline_fragment_context.h" +#include "runtime/exec_env.h" #include "runtime/query_context.h" #include "util/debug_util.h" #include "util/sse_util.hpp" #include "util/thread.h" #include "util/threadpool.h" +#include "util/time.h" #include "util/uid_util.h" #include "vec/runtime/vdatetime_value.h" namespace doris::pipeline { BlockedTaskScheduler::BlockedTaskScheduler(std::string name) - : _name(name), _started(false), _shutdown(false) {} + : _name(std::move(name)), _started(false), _shutdown(false) {} Status BlockedTaskScheduler::start() { LOG(INFO) << "BlockedTaskScheduler start"; @@ -192,7 +192,7 @@ void BlockedTaskScheduler::_schedule() { void BlockedTaskScheduler::_make_task_run(std::list<PipelineTask*>& local_tasks, std::list<PipelineTask*>::iterator& task_itr, PipelineTaskState t_state) { - auto task = *task_itr; + auto* task = *task_itr; task->set_state(t_state); local_tasks.erase(task_itr++); static_cast<void>(task->get_task_queue()->push_back(task)); @@ -215,8 +215,7 @@ Status TaskScheduler::start() { _markers.reserve(cores); for (size_t i = 0; i < cores; ++i) { _markers.push_back(std::make_unique<std::atomic<bool>>(true)); - RETURN_IF_ERROR( - _fix_thread_pool->submit_func(std::bind(&TaskScheduler::_do_work, this, i))); + RETURN_IF_ERROR(_fix_thread_pool->submit_func([this, i] { _do_work(i); })); } return Status::OK(); } @@ -226,6 +225,29 @@ Status TaskScheduler::schedule_task(PipelineTask* task) { // TODO control num of task } +// after _close_task, task maybe destructed. +void _close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { + // close_a_pipeline may delete fragment context and will core in some defer + // code, because the defer code will access fragment context it self. + auto lock_for_context = task->fragment_context()->shared_from_this(); + // is_pending_finish does not check status, so has to check status in close API. + // For example, in async writer, the writer may failed during dealing with eos_block + // but it does not return error status. Has to check the error status in close API. + // We have already refactor all source and sink api, the close API does not need waiting + // for pending finish now. So that could call close directly. + Status status = task->close(exec_status); + if (!status.ok() && state != PipelineTaskState::CANCELED) { + task->query_context()->cancel(true, status.to_string(), + Status::Cancelled(status.to_string())); + state = PipelineTaskState::CANCELED; + } + task->set_state(state); + task->set_close_pipeline_time(); + task->finalize(); + task->set_running(false); + task->fragment_context()->close_a_pipeline(); +} + void TaskScheduler::_do_work(size_t index) { const auto& marker = _markers[index]; while (*marker) { @@ -286,7 +308,31 @@ void TaskScheduler::_do_work(size_t index) { auto status = Status::OK(); try { - status = task->execute(&eos); + //TODO: use a better enclose to abstracting these + if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) { + TUniqueId query_id = task->query_context()->query_id(); + std::string task_name = task->task_name(); +#ifdef __APPLE__ + uint32_t core_id = 0; +#else + uint32_t core_id = sched_getcpu(); +#endif + std::thread::id tid = std::this_thread::get_id(); + uint64_t thread_id = *reinterpret_cast<uint64_t*>(&tid); + uint64_t start_time = MonotonicMicros(); + + status = task->execute(&eos); + + uint64_t end_time = MonotonicMicros(); + auto state = task->get_state(); + std::string state_name = + state == PipelineTaskState::RUNNABLE ? get_state_name(state) : ""; + ExecEnv::GetInstance()->pipeline_tracer_context()->record( + {query_id, task_name, core_id, thread_id, start_time, end_time, + state_name}); + } else { + status = task->execute(&eos); + } } catch (const Exception& e) { status = e.to_status(); } @@ -372,28 +418,6 @@ void TaskScheduler::_do_work(size_t index) { } } -void TaskScheduler::_close_task(PipelineTask* task, PipelineTaskState state, Status exec_status) { - // close_a_pipeline may delete fragment context and will core in some defer - // code, because the defer code will access fragment context it self. - auto lock_for_context = task->fragment_context()->shared_from_this(); - // is_pending_finish does not check status, so has to check status in close API. - // For example, in async writer, the writer may failed during dealing with eos_block - // but it does not return error status. Has to check the error status in close API. - // We have already refactor all source and sink api, the close API does not need waiting - // for pending finish now. So that could call close directly. - Status status = task->close(exec_status); - if (!status.ok() && state != PipelineTaskState::CANCELED) { - task->query_context()->cancel(true, status.to_string(), - Status::Cancelled(status.to_string())); - state = PipelineTaskState::CANCELED; - } - task->set_state(state); - task->set_close_pipeline_time(); - task->finalize(); - task->set_running(false); - task->fragment_context()->close_a_pipeline(); -} - void TaskScheduler::stop() { if (!this->_shutdown.load()) { if (_task_queue) { diff --git a/be/src/pipeline/task_scheduler.h b/be/src/pipeline/task_scheduler.h index 41ac8c0c098..3074cf02afc 100644 --- a/be/src/pipeline/task_scheduler.h +++ b/be/src/pipeline/task_scheduler.h @@ -103,8 +103,5 @@ private: CgroupCpuCtl* _cgroup_cpu_ctl = nullptr; void _do_work(size_t index); - // after _close_task, task maybe destructed. - void _close_task(PipelineTask* task, PipelineTaskState state, - Status exec_status = Status::OK()); }; } // namespace doris::pipeline \ No newline at end of file diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h index 1f6b1b7e8b9..89d871be293 100644 --- a/be/src/runtime/exec_env.h +++ b/be/src/runtime/exec_env.h @@ -34,6 +34,7 @@ #include "olap/olap_define.h" #include "olap/options.h" #include "olap/tablet_fwd.h" +#include "pipeline/pipeline_tracing.h" #include "runtime/frontend_info.h" // TODO(zhiqiang): find a way to remove this include header #include "util/threadpool.h" @@ -143,7 +144,7 @@ public: static bool ready() { return _s_ready.load(std::memory_order_acquire); } const std::string& token() const; ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; } - doris::vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; } + vectorized::VDataStreamMgr* vstream_mgr() { return _vstream_mgr; } ResultBufferMgr* result_mgr() { return _result_mgr; } ResultQueueMgr* result_queue_mgr() { return _result_queue_mgr; } ClientCache<BackendServiceClient>* client_cache() { return _backend_client_cache; } @@ -206,7 +207,7 @@ public: std::shared_ptr<StreamLoadExecutor> stream_load_executor() { return _stream_load_executor; } RoutineLoadTaskExecutor* routine_load_task_executor() { return _routine_load_task_executor; } HeartbeatFlags* heartbeat_flags() { return _heartbeat_flags; } - doris::vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } + vectorized::ScannerScheduler* scanner_scheduler() { return _scanner_scheduler; } FileMetaCache* file_meta_cache() { return _file_meta_cache; } MemTableMemoryLimiter* memtable_memory_limiter() { return _memtable_memory_limiter.get(); } WalManager* wal_mgr() { return _wal_manager.get(); } @@ -264,14 +265,18 @@ public: } std::shared_ptr<DummyLRUCache> get_dummy_lru_cache() { return _dummy_lru_cache; } - std::shared_ptr<doris::pipeline::BlockedTaskScheduler> get_global_block_scheduler() { + std::shared_ptr<pipeline::BlockedTaskScheduler> get_global_block_scheduler() { return _global_block_scheduler; } - doris::pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() { + pipeline::RuntimeFilterTimerQueue* runtime_filter_timer_queue() { return _runtime_filter_timer_queue; } + pipeline::PipelineTracerContext* pipeline_tracer_context() { + return _pipeline_tracer_ctx.get(); + } + private: ExecEnv(); @@ -291,7 +296,7 @@ private: UserFunctionCache* _user_function_cache = nullptr; // Leave protected so that subclasses can override ExternalScanContextMgr* _external_scan_context_mgr = nullptr; - doris::vectorized::VDataStreamMgr* _vstream_mgr = nullptr; + vectorized::VDataStreamMgr* _vstream_mgr = nullptr; ResultBufferMgr* _result_mgr = nullptr; ResultQueueMgr* _result_queue_mgr = nullptr; ClientCache<BackendServiceClient>* _backend_client_cache = nullptr; @@ -342,7 +347,7 @@ private: RoutineLoadTaskExecutor* _routine_load_task_executor = nullptr; SmallFileMgr* _small_file_mgr = nullptr; HeartbeatFlags* _heartbeat_flags = nullptr; - doris::vectorized::ScannerScheduler* _scanner_scheduler = nullptr; + vectorized::ScannerScheduler* _scanner_scheduler = nullptr; BlockSpillManager* _block_spill_mgr = nullptr; // To save meta info of external file, such as parquet footer. @@ -374,15 +379,17 @@ private: std::shared_ptr<DummyLRUCache> _dummy_lru_cache = nullptr; // used for query with group cpu hard limit - std::shared_ptr<doris::pipeline::BlockedTaskScheduler> _global_block_scheduler; + std::shared_ptr<pipeline::BlockedTaskScheduler> _global_block_scheduler; // used for query without workload group - std::shared_ptr<doris::pipeline::BlockedTaskScheduler> _without_group_block_scheduler; + std::shared_ptr<pipeline::BlockedTaskScheduler> _without_group_block_scheduler; - doris::pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr; + pipeline::RuntimeFilterTimerQueue* _runtime_filter_timer_queue = nullptr; WorkloadSchedPolicyMgr* _workload_sched_mgr = nullptr; RuntimeQueryStatiticsMgr* _runtime_query_statistics_mgr = nullptr; + + std::unique_ptr<pipeline::PipelineTracerContext> _pipeline_tracer_ctx; }; template <> diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index f5f08a71064..2f6ac61d646 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -50,6 +50,7 @@ #include "olap/storage_engine.h" #include "olap/tablet_schema_cache.h" #include "olap/wal/wal_manager.h" +#include "pipeline/pipeline_tracing.h" #include "pipeline/task_queue.h" #include "pipeline/task_scheduler.h" #include "runtime/block_spill_manager.h" @@ -201,6 +202,7 @@ Status ExecEnv::_init(const std::vector<StorePath>& store_paths, // so it should be created before all query begin and deleted after all query and daemon thread stoppped _runtime_query_statistics_mgr = new RuntimeQueryStatiticsMgr(); init_file_cache_factory(); + _pipeline_tracer_ctx = std::make_unique<pipeline::PipelineTracerContext>(); // before query RETURN_IF_ERROR(init_pipeline_task_scheduler()); _task_group_manager = new taskgroup::TaskGroupManager(); _scanner_scheduler = new doris::vectorized::ScannerScheduler(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index e52b71d277b..c722fb24bb7 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -612,7 +612,7 @@ Status FragmentMgr::_get_query_ctx(const Params& params, TUniqueId query_id, boo // This may be a first fragment request of the query. // Create the query fragments context. query_ctx = QueryContext::create_shared(query_id, params.fragment_num_on_host, _exec_env, - params.query_options, params.coord); + params.query_options, params.coord, pipeline); RETURN_IF_ERROR(DescriptorTbl::create(&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl))); // set file scan range params diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp index 2c25d37d14b..bca8b409c02 100644 --- a/be/src/runtime/query_context.cpp +++ b/be/src/runtime/query_context.cpp @@ -17,6 +17,9 @@ #include "runtime/query_context.h" +#include <exception> +#include <memory> + #include "pipeline/pipeline_fragment_context.h" #include "pipeline/pipeline_x/dependency.h" #include "runtime/runtime_query_statistics_mgr.h" @@ -34,11 +37,13 @@ public: }; QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env, - const TQueryOptions& query_options, TNetworkAddress coord_addr) + const TQueryOptions& query_options, TNetworkAddress coord_addr, + bool is_pipeline) : fragment_num(total_fragment_num), timeout_second(-1), _query_id(query_id), _exec_env(exec_env), + _is_pipeline(is_pipeline), _query_options(query_options) { this->coord_addr = coord_addr; _start_time = VecDateTimeValue::local_time(); @@ -46,8 +51,8 @@ QueryContext::QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* _shared_scanner_controller.reset(new vectorized::SharedScannerController()); _execution_dependency = pipeline::Dependency::create_unique(-1, -1, "ExecutionDependency", this); - _runtime_filter_mgr.reset( - new RuntimeFilterMgr(TUniqueId(), RuntimeFilterParamsContext::create(this))); + _runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>( + TUniqueId(), RuntimeFilterParamsContext::create(this)); timeout_second = query_options.execution_timeout; @@ -86,7 +91,7 @@ QueryContext::~QueryContext() { // it is found that query already exists in _query_ctx_map, and query mem tracker is not used. // query mem tracker consumption is not equal to 0 after use, because there is memory consumed // on query mem tracker, released on other trackers. - std::string mem_tracker_msg {""}; + std::string mem_tracker_msg; if (query_mem_tracker->peak_consumption() != 0) { mem_tracker_msg = fmt::format( ", deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " @@ -95,7 +100,9 @@ QueryContext::~QueryContext() { MemTracker::print_bytes(query_mem_tracker->consumption()), MemTracker::print_bytes(query_mem_tracker->peak_consumption())); } + uint64_t group_id = 0; if (_task_group) { + group_id = _task_group->id(); // before remove _task_group->remove_mem_tracker_limiter(query_mem_tracker); _task_group->remove_query(_query_id); } @@ -110,6 +117,15 @@ QueryContext::~QueryContext() { static_cast<void>(ExecEnv::GetInstance()->lazy_release_obj_pool()->submit( std::make_shared<DelayReleaseToken>(std::move(_thread_token)))); } + + //TODO: check if pipeline and tracing both enabled + if (_is_pipeline && ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] { + try { + ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id); + } catch (std::exception& e) { + LOG(WARNING) << "Dump trace log failed bacause " << e.what(); + } + } } void QueryContext::set_ready_to_execute(bool is_cancelled) { diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index a639268c552..a7632a443bf 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -33,7 +33,6 @@ #include "runtime/runtime_filter_mgr.h" #include "runtime/runtime_predicate.h" #include "task_group/task_group.h" -#include "util/pretty_printer.h" #include "util/threadpool.h" #include "vec/exec/scan/scanner_scheduler.h" #include "vec/runtime/shared_hash_table_controller.h" @@ -61,6 +60,7 @@ struct ReportStatusRequest { std::function<Status(Status)> update_fn; std::function<void(const PPlanFragmentCancelReason&, const std::string&)> cancel_fn; }; + // Save the common components of fragments in a query. // Some components like DescriptorTbl may be very large // that will slow down each execution of fragments when DeSer them every time. @@ -70,7 +70,7 @@ class QueryContext { public: QueryContext(TUniqueId query_id, int total_fragment_num, ExecEnv* exec_env, - const TQueryOptions& query_options, TNetworkAddress coord_addr); + const TQueryOptions& query_options, TNetworkAddress coord_addr, bool is_pipeline); ~QueryContext(); @@ -216,7 +216,7 @@ public: ThreadPool* get_non_pipe_exec_thread_pool(); - int64_t mem_limit() { return _bytes_limit; } + int64_t mem_limit() const { return _bytes_limit; } void set_merge_controller_handler( std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) { @@ -256,6 +256,7 @@ private: ExecEnv* _exec_env = nullptr; VecDateTimeValue _start_time; int64_t _bytes_limit = 0; + bool _is_pipeline = false; // A token used to submit olap scanner to the "_limited_scan_thread_pool", // This thread pool token is created from "_limited_scan_thread_pool" from exec env. diff --git a/be/src/service/http_service.cpp b/be/src/service/http_service.cpp index 6c961959aec..219a9534831 100644 --- a/be/src/service/http_service.cpp +++ b/be/src/service/http_service.cpp @@ -20,13 +20,13 @@ #include <event2/bufferevent.h> #include <event2/http.h> -#include <algorithm> #include <string> #include <vector> #include "common/config.h" #include "common/status.h" #include "http/action/adjust_log_level.h" +#include "http/action/adjust_tracing_dump.h" #include "http/action/check_rpc_channel_action.h" #include "http/action/check_tablet_segment_action.h" #include "http/action/checksum_action.h" @@ -99,6 +99,7 @@ HttpService::~HttpService() { stop(); } +// NOLINTBEGIN(readability-function-size) Status HttpService::start() { add_default_path_handlers(_web_page_handler.get()); @@ -162,6 +163,11 @@ Status HttpService::start() { AdjustLogLevelAction* adjust_log_level_action = _pool.add(new AdjustLogLevelAction()); _ev_http_server->register_handler(HttpMethod::POST, "api/glog/adjust", adjust_log_level_action); + //TODO: add query GET interface + auto* adjust_tracing_dump = _pool.add(new AdjustTracingDump()); + _ev_http_server->register_handler(HttpMethod::POST, "api/pipeline/tracing", + adjust_tracing_dump); + // Register BE version action VersionAction* version_action = _pool.add(new VersionAction(_env, TPrivilegeHier::GLOBAL, TPrivilegeType::NONE)); @@ -201,8 +207,9 @@ Status HttpService::start() { // register metrics { - auto action = _pool.add(new MetricsAction(DorisMetrics::instance()->metric_registry(), _env, - TPrivilegeHier::GLOBAL, TPrivilegeType::NONE)); + auto* action = + _pool.add(new MetricsAction(DorisMetrics::instance()->metric_registry(), _env, + TPrivilegeHier::GLOBAL, TPrivilegeType::NONE)); _ev_http_server->register_handler(HttpMethod::GET, "/metrics", action); } @@ -310,6 +317,7 @@ Status HttpService::start() { _ev_http_server->start(); return Status::OK(); } +// NOLINTEND(readability-function-size) void HttpService::stop() { if (stopped) { diff --git a/be/src/util/thrift_util.cpp b/be/src/util/thrift_util.cpp index fd141f3c74b..395c01ec390 100644 --- a/be/src/util/thrift_util.cpp +++ b/be/src/util/thrift_util.cpp @@ -24,18 +24,17 @@ #include <thrift/transport/TTransportException.h> // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep +#include <string> #include "common/compiler_util.h" // IWYU pragma: keep #include "common/logging.h" +#include "exec/tablet_info.h" +#include "olap/tablet_schema.h" #include "util/thrift_server.h" -namespace apache { -namespace thrift { -namespace protocol { +namespace apache::thrift::protocol { class TProtocol; -} // namespace protocol -} // namespace thrift -} // namespace apache +} // namespace apache::thrift::protocol // TCompactProtocol requires some #defines to work right. They also define UNLIKELY // so we need to undef this. @@ -152,4 +151,27 @@ bool t_network_address_comparator(const TNetworkAddress& a, const TNetworkAddres return false; } + +std::string to_string(const TUniqueId& id) { + return std::to_string(id.hi).append(std::to_string(id.lo)); +} + +bool _has_inverted_index_or_partial_update(TOlapTableSink sink) { + OlapTableSchemaParam schema; + if (!schema.init(sink.schema).ok()) { + return false; + } + if (schema.is_partial_update()) { + return true; + } + for (const auto& index_schema : schema.indexes()) { + for (const auto& index : index_schema->indexes) { + if (index->index_type() == INVERTED) { + return true; + } + } + } + return false; +} + } // namespace doris diff --git a/be/src/util/thrift_util.h b/be/src/util/thrift_util.h index b16482df610..aff3a9ab101 100644 --- a/be/src/util/thrift_util.h +++ b/be/src/util/thrift_util.h @@ -17,11 +17,13 @@ #pragma once -#include <stdint.h> -#include <string.h> +#include <gen_cpp/DataSinks_types.h> +#include <gen_cpp/Types_types.h> #include <thrift/TApplicationException.h> #include <thrift/transport/TBufferTransports.h> +#include <cstdint> +#include <cstring> #include <exception> #include <memory> #include <string> @@ -29,14 +31,10 @@ #include "common/status.h" -namespace apache { -namespace thrift { -namespace protocol { +namespace apache::thrift::protocol { class TProtocol; class TProtocolFactory; -} // namespace protocol -} // namespace thrift -} // namespace apache +} // namespace apache::thrift::protocol namespace doris { @@ -61,7 +59,7 @@ public: uint8_t* buffer = nullptr; RETURN_IF_ERROR(serialize<T>(obj, &len, &buffer)); result->resize(len); - memcpy(&((*result)[0]), buffer, len); + memcpy(result->data(), buffer, len); return Status::OK(); } @@ -177,4 +175,8 @@ void t_network_address_to_string(const TNetworkAddress& address, std::string* ou // string representation bool t_network_address_comparator(const TNetworkAddress& a, const TNetworkAddress& b); +PURE std::string to_string(const TUniqueId& id); + +PURE bool _has_inverted_index_or_partial_update(TOlapTableSink sink); + } // namespace doris diff --git a/build.sh b/build.sh index 7b67a303db1..ffc73dcab23 100755 --- a/build.sh +++ b/build.sh @@ -727,6 +727,7 @@ EOF cp -r -p "${DORIS_THIRDPARTY}/installed/webroot"/* "${DORIS_OUTPUT}/be/www"/ copy_common_files "${DORIS_OUTPUT}/be/" mkdir -p "${DORIS_OUTPUT}/be/log" + mkdir -p "${DORIS_OUTPUT}/be/log/tracing" mkdir -p "${DORIS_OUTPUT}/be/storage" fi --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
