This is an automated email from the ASF dual-hosted git repository. gabriellee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 3d6077efe0 [pipeline](profile) Support real-time profile report in pipeline (#16772) 3d6077efe0 is described below commit 3d6077efe0e02d0e5a1f555cfe51bea9df3e2d6e Author: Gabriel <gabrielleeb...@gmail.com> AuthorDate: Fri Feb 17 10:01:34 2023 +0800 [pipeline](profile) Support real-time profile report in pipeline (#16772) --- be/src/pipeline/pipeline_fragment_context.cpp | 235 +++++++++-------------- be/src/pipeline/pipeline_fragment_context.h | 54 +++++- be/src/runtime/fragment_mgr.cpp | 261 ++++++++++++++------------ be/src/runtime/fragment_mgr.h | 24 ++- be/src/runtime/plan_fragment_executor.cpp | 6 +- 5 files changed, 304 insertions(+), 276 deletions(-) diff --git a/be/src/pipeline/pipeline_fragment_context.cpp b/be/src/pipeline/pipeline_fragment_context.cpp index 6e91991061..524e1be67e 100644 --- a/be/src/pipeline/pipeline_fragment_context.cpp +++ b/be/src/pipeline/pipeline_fragment_context.cpp @@ -90,7 +90,8 @@ namespace doris::pipeline { PipelineFragmentContext::PipelineFragmentContext( const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id, int backend_num, std::shared_ptr<QueryFragmentsCtx> query_ctx, ExecEnv* exec_env, - std::function<void(RuntimeState*, Status*)> call_back) + const std::function<void(RuntimeState*, Status*)>& call_back, + const report_status_callback& report_status_cb) : _query_id(query_id), _fragment_instance_id(instance_id), _fragment_id(fragment_id), @@ -98,12 +99,16 @@ PipelineFragmentContext::PipelineFragmentContext( _exec_env(exec_env), _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR), _query_ctx(std::move(query_ctx)), - _call_back(call_back) { + _call_back(call_back), + _report_thread_active(false), + _report_status_cb(report_status_cb), + _is_report_on_cancel(true) { _fragment_watcher.start(); } PipelineFragmentContext::~PipelineFragmentContext() { _call_back(_runtime_state.get(), &_exec_status); + DCHECK(!_report_thread_active); } void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, @@ -115,6 +120,7 @@ void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason, } if (reason != PPlanFragmentCancelReason::LIMIT_REACH) { _exec_status = Status::Cancelled(msg); + _set_is_report_on_cancel(false); } _runtime_state->set_is_cancelled(true); if (_pipe != nullptr) { @@ -269,6 +275,13 @@ Status PipelineFragmentContext::prepare(const doris::TExecPlanFragmentParams& re _runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(), true, nullptr); _runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, nullptr); + if (_is_report_success && config::status_report_interval > 0) { + std::unique_lock<std::mutex> l(_report_thread_lock); + _report_thread = std::thread(&PipelineFragmentContext::report_profile, this); + // make sure the thread started up, otherwise report_profile() might get into a race + // with stop_report_thread() + _report_thread_started_cv.wait(l); + } _prepared = true; return Status::OK(); } @@ -455,6 +468,68 @@ Status PipelineFragmentContext::_build_pipeline_tasks( return Status::OK(); } +void PipelineFragmentContext::_stop_report_thread() { + if (!_report_thread_active) { + return; + } + + _report_thread_active = false; + + _stop_report_thread_cv.notify_one(); + _report_thread.join(); +} + +void PipelineFragmentContext::report_profile() { + SCOPED_ATTACH_TASK(_runtime_state.get()); + VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id(); + + _report_thread_active = true; + + std::unique_lock<std::mutex> l(_report_thread_lock); + // tell Open() that we started + _report_thread_started_cv.notify_one(); + + // Jitter the reporting time of remote fragments by a random amount between + // 0 and the report_interval. This way, the coordinator doesn't get all the + // updates at once so its better for contention as well as smoother progress + // reporting. + int report_fragment_offset = rand() % config::status_report_interval; + // We don't want to wait longer than it takes to run the entire fragment. + _stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset)); + while (_report_thread_active) { + if (config::status_report_interval > 0) { + // wait_for can return because the timeout occurred or the condition variable + // was signaled. We can't rely on its return value to distinguish between the + // two cases (e.g. there is a race here where the wait timed out but before grabbing + // the lock, the condition variable was signaled). Instead, we will use an external + // flag, _report_thread_active, to coordinate this. + _stop_report_thread_cv.wait_for(l, + std::chrono::seconds(config::status_report_interval)); + } else { + LOG(WARNING) << "config::status_report_interval is equal to or less than zero, exiting " + "reporting thread."; + break; + } + + if (VLOG_FILE_IS_ON) { + VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : " ") + << "profile for instance " << _runtime_state->fragment_instance_id(); + std::stringstream ss; + _runtime_state->runtime_profile()->compute_time_in_profile(); + _runtime_state->runtime_profile()->pretty_print(&ss); + VLOG_FILE << ss.str(); + } + + if (!_report_thread_active) { + break; + } + + send_report(false); + } + + VLOG_FILE << "exiting reporting thread: instance_id=" << _runtime_state->fragment_instance_id(); +} + // TODO: use virtual function to do abstruct Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur_pipe) { auto node_type = node->type(); @@ -752,6 +827,7 @@ Status PipelineFragmentContext::_create_sink(const TDataSink& thrift_sink) { void PipelineFragmentContext::_close_action() { _runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); send_report(true); + _stop_report_thread(); // all submitted tasks done _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this()); } @@ -763,24 +839,11 @@ void PipelineFragmentContext::close_a_pipeline() { } } -// TODO pipeline dump copy from FragmentExecState::to_http_path -std::string PipelineFragmentContext::to_http_path(const std::string& file_name) { - std::stringstream url; - url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port - << "/api/_download_load?" - << "token=" << _exec_env->token() << "&file=" << file_name; - return url.str(); -} - -// TODO pipeline dump copy from FragmentExecState::coordinator_callback -// TODO pipeline this callback should be placed in a thread pool void PipelineFragmentContext::send_report(bool done) { Status exec_status = Status::OK(); { std::lock_guard<std::mutex> l(_status_lock); - if (!_exec_status.ok()) { - exec_status = _exec_status; - } + exec_status = _exec_status; } // If plan is done successfully, but _is_report_success is false, @@ -789,137 +852,21 @@ void PipelineFragmentContext::send_report(bool done) { return; } - Status coord_status; - auto coord_addr = _query_ctx->coord_addr; - FrontendServiceConnection coord(_exec_env->frontend_client_cache(), coord_addr, &coord_status); - if (!coord_status.ok()) { - std::stringstream ss; - ss << "couldn't get a client for " << coord_addr << ", reason: " << coord_status; - LOG(WARNING) << "query_id: " << print_id(_query_id) << ", " << ss.str(); - { - std::lock_guard<std::mutex> l(_status_lock); - if (_exec_status.ok()) { - _exec_status = Status::InternalError(ss.str()); - } - } + // If both _is_report_success and _is_report_on_cancel are false, + // which means no matter query is success or failed, no report is needed. + // This may happen when the query limit reached and + // a internal cancellation being processed + if (!_is_report_success && !_is_report_on_cancel) { return; } - auto* profile = _is_report_success ? _runtime_state->runtime_profile() : nullptr; - - TReportExecStatusParams params; - params.protocol_version = FrontendServiceVersion::V1; - params.__set_query_id(_query_id); - params.__set_backend_num(_backend_num); - params.__set_fragment_instance_id(_fragment_instance_id); - params.__set_fragment_id(_fragment_id); - exec_status.set_t_status(¶ms); - params.__set_done(true); - - auto* runtime_state = _runtime_state.get(); - DCHECK(runtime_state != nullptr); - if (runtime_state->query_type() == TQueryType::LOAD && !done && exec_status.ok()) { - // this is a load plan, and load is not finished, just make a brief report - params.__set_loaded_rows(runtime_state->num_rows_load_total()); - params.__set_loaded_bytes(runtime_state->num_bytes_load_total()); - } else { - if (runtime_state->query_type() == TQueryType::LOAD) { - params.__set_loaded_rows(runtime_state->num_rows_load_total()); - params.__set_loaded_bytes(runtime_state->num_bytes_load_total()); - } - if (profile == nullptr) { - params.__isset.profile = false; - } else { - profile->to_thrift(¶ms.profile); - params.__isset.profile = true; - } - - if (!runtime_state->output_files().empty()) { - params.__isset.delta_urls = true; - for (auto& it : runtime_state->output_files()) { - params.delta_urls.push_back(to_http_path(it)); - } - } - if (runtime_state->num_rows_load_total() > 0 || - runtime_state->num_rows_load_filtered() > 0) { - params.__isset.load_counters = true; - - static std::string s_dpp_normal_all = "dpp.norm.ALL"; - static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; - static std::string s_unselected_rows = "unselected.rows"; - - params.load_counters.emplace(s_dpp_normal_all, - std::to_string(runtime_state->num_rows_load_success())); - params.load_counters.emplace(s_dpp_abnormal_all, - std::to_string(runtime_state->num_rows_load_filtered())); - params.load_counters.emplace(s_unselected_rows, - std::to_string(runtime_state->num_rows_load_unselected())); - } - if (!runtime_state->get_error_log_file_path().empty()) { - params.__set_tracking_url( - to_load_error_http_path(runtime_state->get_error_log_file_path())); - } - if (!runtime_state->export_output_files().empty()) { - params.__isset.export_files = true; - params.export_files = runtime_state->export_output_files(); - } - if (!runtime_state->tablet_commit_infos().empty()) { - params.__isset.commitInfos = true; - params.commitInfos.reserve(runtime_state->tablet_commit_infos().size()); - for (auto& info : runtime_state->tablet_commit_infos()) { - params.commitInfos.push_back(info); - } - } - if (!runtime_state->error_tablet_infos().empty()) { - params.__isset.errorTabletInfos = true; - params.errorTabletInfos.reserve(runtime_state->error_tablet_infos().size()); - for (auto& info : runtime_state->error_tablet_infos()) { - params.errorTabletInfos.push_back(info); - } - } - // Send new errors to coordinator - runtime_state->get_unreported_errors(&(params.error_log)); - params.__isset.error_log = (params.error_log.size() > 0); - } - - if (_exec_env->master_info()->__isset.backend_id) { - params.__set_backend_id(_exec_env->master_info()->backend_id); - } - - TReportExecStatusResult res; - Status rpc_status; - - VLOG_DEBUG << "reportExecStatus params is " - << apache::thrift::ThriftDebugString(params).c_str(); - try { - try { - coord->reportExecStatus(res, params); - } catch (TTransportException& e) { - LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(_query_id) - << ", instance id: " << print_id(_fragment_instance_id) << " to " - << coord_addr << ", err: " << e.what(); - rpc_status = coord.reopen(); - - if (!rpc_status.ok()) { - // we need to cancel the execution of this fragment - cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "report rpc fail"); - return; - } - coord->reportExecStatus(res, params); - } - - rpc_status = Status(res.status); - } catch (TException& e) { - std::stringstream msg; - msg << "ReportExecStatus() to " << coord_addr << " failed:\n" << e.what(); - LOG(WARNING) << msg.str(); - rpc_status = Status::InternalError(msg.str()); - } - - if (!rpc_status.ok()) { - // we need to cancel the execution of this fragment - cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "rpc fail 2"); - } + _report_status_cb( + {exec_status, _is_report_success ? _runtime_state->runtime_profile() : nullptr, + done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id, + _fragment_instance_id, _backend_num, _runtime_state.get(), + std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1), + std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1, + std::placeholders::_2)}); } } // namespace doris::pipeline diff --git a/be/src/pipeline/pipeline_fragment_context.h b/be/src/pipeline/pipeline_fragment_context.h index 3a4deb5772..38b07d503e 100644 --- a/be/src/pipeline/pipeline_fragment_context.h +++ b/be/src/pipeline/pipeline_fragment_context.h @@ -25,6 +25,7 @@ namespace doris { class ExecNode; class DataSink; +struct ReportStatusRequest; namespace vectorized { template <bool is_intersect> @@ -37,10 +38,18 @@ class PipelineTask; class PipelineFragmentContext : public std::enable_shared_from_this<PipelineFragmentContext> { public: + // Callback to report execution status of plan fragment. + // 'profile' is the cumulative profile, 'done' indicates whether the execution + // is done or still continuing. + // Note: this does not take a const RuntimeProfile&, because it might need to call + // functions like PrettyPrint() or to_thrift(), neither of which is const + // because they take locks. + using report_status_callback = std::function<void(const ReportStatusRequest)>; PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id, int backend_num, std::shared_ptr<QueryFragmentsCtx> query_ctx, ExecEnv* exec_env, - std::function<void(RuntimeState*, Status*)> call_back); + const std::function<void(RuntimeState*, Status*)>& call_back, + const report_status_callback& report_status_cb); ~PipelineFragmentContext(); @@ -92,7 +101,27 @@ public: void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; } std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; } + void report_profile(); + + Status update_status(Status status) { + std::lock_guard<std::mutex> l(_status_lock); + if (!status.ok() && _exec_status.ok()) { + _exec_status = status; + } + return _exec_status; + } + private: + Status _create_sink(const TDataSink& t_data_sink); + Status _build_pipelines(ExecNode*, PipelinePtr); + Status _build_pipeline_tasks(const doris::TExecPlanFragmentParams& request); + Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request); + template <bool is_intersect> + Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); + void _close_action(); + void _stop_report_thread(); + void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; } + // Id of this query TUniqueId _query_id; TUniqueId _fragment_instance_id; @@ -142,16 +171,23 @@ private: std::shared_ptr<io::StreamLoadPipe> _pipe; - Status _create_sink(const TDataSink& t_data_sink); - Status _build_pipelines(ExecNode*, PipelinePtr); - Status _build_pipeline_tasks(const doris::TExecPlanFragmentParams& request); - Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& request); - - template <bool is_intersect> - Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr); std::function<void(RuntimeState*, Status*)> _call_back; - void _close_action(); std::once_flag _close_once_flag; + + std::condition_variable _report_thread_started_cv; + // true if we started the thread + bool _report_thread_active; + // profile reporting-related + report_status_callback _report_status_cb; + std::thread _report_thread; + std::mutex _report_thread_lock; + + // Indicates that profile reporting thread should stop. + // Tied to _report_thread_lock. + std::condition_variable _stop_report_thread_cv; + // If this is set to false, and '_is_report_success' is false as well, + // This executor will not report status to FE on being cancelled. + bool _is_report_on_cancel; }; } // namespace pipeline } // namespace doris \ No newline at end of file diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7d0d2e82b6..f2a2c9e798 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -79,14 +79,14 @@ using apache::thrift::transport::TTransportException; class RuntimeProfile; class FragmentExecState { public: + using report_status_callback_impl = std::function<void(const ReportStatusRequest)>; // Constructor by using QueryFragmentsCtx FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num, - ExecEnv* exec_env, std::shared_ptr<QueryFragmentsCtx> fragments_ctx); + ExecEnv* exec_env, std::shared_ptr<QueryFragmentsCtx> fragments_ctx, + const report_status_callback_impl& report_status_cb_impl); Status prepare(const TExecPlanFragmentParams& params); - std::string to_http_path(const std::string& file_name); - Status execute(); Status cancel(const PPlanFragmentCancelReason& reason, const std::string& msg = ""); @@ -146,7 +146,6 @@ private: TUniqueId _fragment_instance_id; // Used to report to coordinator which backend is over int _backend_num; - ExecEnv* _exec_env; TNetworkAddress _coord_addr; PlanFragmentExecutor _executor; @@ -171,22 +170,24 @@ private: // If set the true, this plan fragment will be executed only after FE send execution start rpc. bool _need_wait_execution_trigger = false; + report_status_callback_impl _report_status_cb_impl; }; FragmentExecState::FragmentExecState(const TUniqueId& query_id, const TUniqueId& fragment_instance_id, int backend_num, ExecEnv* exec_env, - std::shared_ptr<QueryFragmentsCtx> fragments_ctx) + std::shared_ptr<QueryFragmentsCtx> fragments_ctx, + const report_status_callback_impl& report_status_cb_impl) : _query_id(query_id), _fragment_instance_id(fragment_instance_id), _backend_num(backend_num), - _exec_env(exec_env), _executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback), this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)), _set_rsc_info(false), _timeout_second(-1), - _fragments_ctx(std::move(fragments_ctx)) { + _fragments_ctx(std::move(fragments_ctx)), + _report_status_cb_impl(report_status_cb_impl) { _start_time = DateTimeValue::local_time(); _coord_addr = _fragments_ctx->coord_addr; } @@ -256,9 +257,67 @@ Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const return Status::OK(); } -std::string FragmentExecState::to_http_path(const std::string& file_name) { +// There can only be one of these callbacks in-flight at any moment, because +// it is only invoked from the executor's reporting thread. +// Also, the reported status will always reflect the most recent execution status, +// including the final status when execution finishes. +void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile, + bool done) { + _report_status_cb_impl( + {status, profile, done, _coord_addr, _query_id, -1, _fragment_instance_id, _backend_num, + _executor.runtime_state(), + std::bind(&FragmentExecState::update_status, this, std::placeholders::_1), + std::bind(&PlanFragmentExecutor::cancel, &_executor, std::placeholders::_1, + std::placeholders::_2)}); + DCHECK(status.ok() || done); // if !status.ok() => done +} + +FragmentMgr::FragmentMgr(ExecEnv* exec_env) + : _exec_env(exec_env), _stop_background_threads_latch(1) { + _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); + INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count); + REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return _fragment_map.size(); }); + + auto s = Thread::create( + "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); }, + &_cancel_thread); + CHECK(s.ok()) << s.to_string(); + + // TODO(zc): we need a better thread-pool + // now one user can use all the thread pool, others have no resource. + s = ThreadPoolBuilder("FragmentMgrThreadPool") + .set_min_threads(config::fragment_pool_thread_num_min) + .set_max_threads(config::fragment_pool_thread_num_max) + .set_max_queue_size(config::fragment_pool_queue_size) + .build(&_thread_pool); + + REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size, + [this]() { return _thread_pool->get_queue_size(); }); + CHECK(s.ok()) << s.to_string(); +} + +FragmentMgr::~FragmentMgr() { + DEREGISTER_HOOK_METRIC(plan_fragment_count); + DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size); + _stop_background_threads_latch.count_down(); + if (_cancel_thread) { + _cancel_thread->join(); + } + // Stop all the worker, should wait for a while? + // _thread_pool->wait_for(); + _thread_pool->shutdown(); + + // Only me can delete + { + std::lock_guard<std::mutex> lock(_lock); + _fragment_map.clear(); + _fragments_ctx_map.clear(); + } +} + +std::string FragmentMgr::to_http_path(const std::string& file_name) { std::stringstream url; - url << "http://" << get_host_port(BackendOptions::get_localhost(), config::webserver_port) + url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port << "/api/_download_load?" << "token=" << _exec_env->token() << "&file=" << file_name; return url.str(); @@ -268,94 +327,96 @@ std::string FragmentExecState::to_http_path(const std::string& file_name) { // it is only invoked from the executor's reporting thread. // Also, the reported status will always reflect the most recent execution status, // including the final status when execution finishes. -void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile, - bool done) { - DCHECK(status.ok() || done); // if !status.ok() => done - Status exec_status = update_status(status); +void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { + DCHECK(req.status.ok() || req.done); // if !status.ok() => done + Status exec_status = req.update_fn(req.status); Status coord_status; - FrontendServiceConnection coord(_exec_env->frontend_client_cache(), _coord_addr, &coord_status); + FrontendServiceConnection coord(_exec_env->frontend_client_cache(), req.coord_addr, + &coord_status); if (!coord_status.ok()) { std::stringstream ss; - UniqueId uid(_query_id.hi, _query_id.lo); - ss << "couldn't get a client for " << _coord_addr << ", reason: " << coord_status; + UniqueId uid(req.query_id.hi, req.query_id.lo); + ss << "couldn't get a client for " << req.coord_addr << ", reason: " << coord_status; LOG(WARNING) << "query_id: " << uid << ", " << ss.str(); - update_status(Status::InternalError(ss.str())); + req.update_fn(Status::InternalError(ss.str())); return; } TReportExecStatusParams params; params.protocol_version = FrontendServiceVersion::V1; - params.__set_query_id(_query_id); - params.__set_backend_num(_backend_num); - params.__set_fragment_instance_id(_fragment_instance_id); + params.__set_query_id(req.query_id); + params.__set_backend_num(req.backend_num); + params.__set_fragment_instance_id(req.fragment_instance_id); + params.__set_fragment_id(req.fragment_id); exec_status.set_t_status(¶ms); - params.__set_done(done); + params.__set_done(req.done); - RuntimeState* runtime_state = _executor.runtime_state(); - DCHECK(runtime_state != nullptr); - if (runtime_state->query_type() == TQueryType::LOAD && !done && status.ok()) { + DCHECK(req.runtime_state != nullptr); + if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && req.status.ok()) { // this is a load plan, and load is not finished, just make a brief report - params.__set_loaded_rows(runtime_state->num_rows_load_total()); - params.__set_loaded_bytes(runtime_state->num_bytes_load_total()); + params.__set_loaded_rows(req.runtime_state->num_rows_load_total()); + params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); } else { - if (runtime_state->query_type() == TQueryType::LOAD) { - params.__set_loaded_rows(runtime_state->num_rows_load_total()); - params.__set_loaded_bytes(runtime_state->num_bytes_load_total()); + if (req.runtime_state->query_type() == TQueryType::LOAD) { + params.__set_loaded_rows(req.runtime_state->num_rows_load_total()); + params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total()); } - if (profile == nullptr) { + if (req.profile == nullptr) { params.__isset.profile = false; } else { - profile->to_thrift(¶ms.profile); + req.profile->to_thrift(¶ms.profile); params.__isset.profile = true; } - if (!runtime_state->output_files().empty()) { + if (!req.runtime_state->output_files().empty()) { params.__isset.delta_urls = true; - for (auto& it : runtime_state->output_files()) { + for (auto& it : req.runtime_state->output_files()) { params.delta_urls.push_back(to_http_path(it)); } } - if (runtime_state->num_rows_load_total() > 0 || - runtime_state->num_rows_load_filtered() > 0) { + if (req.runtime_state->num_rows_load_total() > 0 || + req.runtime_state->num_rows_load_filtered() > 0) { params.__isset.load_counters = true; static std::string s_dpp_normal_all = "dpp.norm.ALL"; static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; static std::string s_unselected_rows = "unselected.rows"; - params.load_counters.emplace(s_dpp_normal_all, - std::to_string(runtime_state->num_rows_load_success())); - params.load_counters.emplace(s_dpp_abnormal_all, - std::to_string(runtime_state->num_rows_load_filtered())); - params.load_counters.emplace(s_unselected_rows, - std::to_string(runtime_state->num_rows_load_unselected())); + params.load_counters.emplace( + s_dpp_normal_all, std::to_string(req.runtime_state->num_rows_load_success())); + params.load_counters.emplace( + s_dpp_abnormal_all, + std::to_string(req.runtime_state->num_rows_load_filtered())); + params.load_counters.emplace( + s_unselected_rows, + std::to_string(req.runtime_state->num_rows_load_unselected())); } - if (!runtime_state->get_error_log_file_path().empty()) { + if (!req.runtime_state->get_error_log_file_path().empty()) { params.__set_tracking_url( - to_load_error_http_path(runtime_state->get_error_log_file_path())); + to_load_error_http_path(req.runtime_state->get_error_log_file_path())); } - if (!runtime_state->export_output_files().empty()) { + if (!req.runtime_state->export_output_files().empty()) { params.__isset.export_files = true; - params.export_files = runtime_state->export_output_files(); + params.export_files = req.runtime_state->export_output_files(); } - if (!runtime_state->tablet_commit_infos().empty()) { + if (!req.runtime_state->tablet_commit_infos().empty()) { params.__isset.commitInfos = true; - params.commitInfos.reserve(runtime_state->tablet_commit_infos().size()); - for (auto& info : runtime_state->tablet_commit_infos()) { + params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size()); + for (auto& info : req.runtime_state->tablet_commit_infos()) { params.commitInfos.push_back(info); } } - if (!runtime_state->error_tablet_infos().empty()) { + if (!req.runtime_state->error_tablet_infos().empty()) { params.__isset.errorTabletInfos = true; - params.errorTabletInfos.reserve(runtime_state->error_tablet_infos().size()); - for (auto& info : runtime_state->error_tablet_infos()) { + params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size()); + for (auto& info : req.runtime_state->error_tablet_infos()) { params.errorTabletInfos.push_back(info); } } // Send new errors to coordinator - runtime_state->get_unreported_errors(&(params.error_log)); + req.runtime_state->get_unreported_errors(&(params.error_log)); params.__isset.error_log = (params.error_log.size() > 0); } @@ -370,22 +431,23 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil << apache::thrift::ThriftDebugString(params).c_str(); if (!exec_status.ok()) { LOG(WARNING) << "report error status: " << exec_status.to_string() - << " to coordinator: " << _coord_addr << ", query id: " << print_id(_query_id) - << ", instance id: " << print_id(_fragment_instance_id); + << " to coordinator: " << req.coord_addr + << ", query id: " << print_id(req.query_id) + << ", instance id: " << print_id(req.fragment_instance_id); } try { try { coord->reportExecStatus(res, params); } catch (TTransportException& e) { - LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(_query_id) - << ", instance id: " << print_id(_fragment_instance_id) << " to " - << _coord_addr << ", err: " << e.what(); + LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id) + << ", instance id: " << print_id(req.fragment_instance_id) << " to " + << req.coord_addr << ", err: " << e.what(); rpc_status = coord.reopen(); if (!rpc_status.ok()) { // we need to cancel the execution of this fragment - update_status(rpc_status); - _executor.cancel(); + req.update_fn(rpc_status); + req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR, "report rpc fail"); return; } coord->reportExecStatus(res, params); @@ -394,64 +456,22 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil rpc_status = Status(res.status); } catch (TException& e) { std::stringstream msg; - msg << "ReportExecStatus() to " << _coord_addr << " failed:\n" << e.what(); + msg << "ReportExecStatus() to " << req.coord_addr << " failed:\n" << e.what(); LOG(WARNING) << msg.str(); rpc_status = Status::InternalError(msg.str()); } if (!rpc_status.ok()) { // we need to cancel the execution of this fragment - update_status(rpc_status); - _executor.cancel(); - } -} - -FragmentMgr::FragmentMgr(ExecEnv* exec_env) - : _exec_env(exec_env), _stop_background_threads_latch(1) { - _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); - INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count); - REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return _fragment_map.size(); }); - - auto s = Thread::create( - "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); }, - &_cancel_thread); - CHECK(s.ok()) << s.to_string(); - - // TODO(zc): we need a better thread-pool - // now one user can use all the thread pool, others have no resource. - s = ThreadPoolBuilder("FragmentMgrThreadPool") - .set_min_threads(config::fragment_pool_thread_num_min) - .set_max_threads(config::fragment_pool_thread_num_max) - .set_max_queue_size(config::fragment_pool_queue_size) - .build(&_thread_pool); - - REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size, - [this]() { return _thread_pool->get_queue_size(); }); - CHECK(s.ok()) << s.to_string(); -} - -FragmentMgr::~FragmentMgr() { - DEREGISTER_HOOK_METRIC(plan_fragment_count); - DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size); - _stop_background_threads_latch.count_down(); - if (_cancel_thread) { - _cancel_thread->join(); - } - // Stop all the worker, should wait for a while? - // _thread_pool->wait_for(); - _thread_pool->shutdown(); - - // Only me can delete - { - std::lock_guard<std::mutex> lock(_lock); - _fragment_map.clear(); - _fragments_ctx_map.clear(); + req.update_fn(rpc_status); + req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR, "rpc fail 2"); } } static void empty_function(RuntimeState*, Status*) {} -void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb) { +void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, + const FinishCallback& cb) { std::string func_name {"PlanFragmentExecutor::_exec_actual"}; #ifndef BE_TEST auto span = exec_state->executor()->runtime_state()->get_tracer()->StartSpan(func_name); @@ -597,7 +617,8 @@ void FragmentMgr::remove_pipeline_context( } } -Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) { +Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, + const FinishCallback& cb) { auto tracer = telemetry::is_current_span_valid() ? telemetry::get_tracer("tracer") : telemetry::get_noop_tracer(); VLOG_ROW << "exec_plan_fragment params is " @@ -703,9 +724,11 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi } fragments_ctx->fragment_ids.push_back(fragment_instance_id); - exec_state.reset(new FragmentExecState(fragments_ctx->query_id, - params.params.fragment_instance_id, params.backend_num, - _exec_env, fragments_ctx)); + exec_state.reset( + new FragmentExecState(fragments_ctx->query_id, params.params.fragment_instance_id, + params.backend_num, _exec_env, fragments_ctx, + std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), + this, std::placeholders::_1))); if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { // set need_wait_execution_trigger means this instance will not actually being executed // until the execPlanFragmentStart RPC trigger to start it. @@ -756,7 +779,9 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi std::shared_ptr<pipeline::PipelineFragmentContext> context = std::make_shared<pipeline::PipelineFragmentContext>( fragments_ctx->query_id, fragment_instance_id, -1, params.backend_num, - fragments_ctx, _exec_env, cb); + fragments_ctx, _exec_env, cb, + std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this, + std::placeholders::_1)); { SCOPED_RAW_TIMER(&duration_ns); auto prepare_st = context->prepare(params); @@ -782,7 +807,8 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi return Status::OK(); } -Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, FinishCallback cb) { +Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, + const FinishCallback& cb) { auto tracer = telemetry::is_current_span_valid() ? telemetry::get_tracer("tracer") : telemetry::get_noop_tracer(); VLOG_ROW << "exec_plan_fragment params is " @@ -889,8 +915,11 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, Fi fragments_ctx->fragment_ids.push_back(fragment_instance_id); - exec_state.reset(new FragmentExecState(fragments_ctx->query_id, fragment_instance_id, - local_params.backend_num, _exec_env, fragments_ctx)); + exec_state.reset(new FragmentExecState( + fragments_ctx->query_id, fragment_instance_id, local_params.backend_num, _exec_env, + fragments_ctx, + std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this, + std::placeholders::_1))); if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { // set need_wait_execution_trigger means this instance will not actually being executed // until the execPlanFragmentStart RPC trigger to start it. @@ -906,7 +935,9 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, Fi std::shared_ptr<pipeline::PipelineFragmentContext> context = std::make_shared<pipeline::PipelineFragmentContext>( fragments_ctx->query_id, fragment_instance_id, params.fragment_id, - local_params.backend_num, fragments_ctx, _exec_env, cb); + local_params.backend_num, fragments_ctx, _exec_env, cb, + std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this, + std::placeholders::_1)); { SCOPED_RAW_TIMER(&duration_ns); auto prepare_st = context->prepare(params, i); diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 98e6c26ddd..6508ea0659 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -61,6 +61,20 @@ class RuntimeFilterMergeController; std::string to_load_error_http_path(const std::string& file_name); +struct ReportStatusRequest { + const Status& status; + RuntimeProfile* profile; + bool done; + TNetworkAddress coord_addr; + TUniqueId query_id; + int fragment_id; + TUniqueId fragment_instance_id; + int backend_num; + RuntimeState* runtime_state; + std::function<Status(Status)> update_fn; + std::function<void(const PPlanFragmentCancelReason&, const std::string&)> cancel_fn; +}; + // This class used to manage all the fragment execute in this instance class FragmentMgr : public RestMonitorIface { public: @@ -78,9 +92,9 @@ public: std::shared_ptr<pipeline::PipelineFragmentContext> pipeline_context); // TODO(zc): report this is over - Status exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb); + Status exec_plan_fragment(const TExecPlanFragmentParams& params, const FinishCallback& cb); - Status exec_plan_fragment(const TPipelineFragmentParams& params, FinishCallback cb); + Status exec_plan_fragment(const TPipelineFragmentParams& params, const FinishCallback& cb); Status start_query_execution(const PExecPlanFragmentStartRequest* request); @@ -116,8 +130,12 @@ public: std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId& fragment_instance_id); + std::string to_http_path(const std::string& file_name); + + void coordinator_callback(const ReportStatusRequest& req); + private: - void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb); + void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const FinishCallback& cb); void _set_scan_concurrency(const TExecPlanFragmentParams& params, QueryFragmentsCtx* fragments_ctx); diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp index 96908dde30..f62bd4bae1 100644 --- a/be/src/runtime/plan_fragment_executor.cpp +++ b/be/src/runtime/plan_fragment_executor.cpp @@ -445,11 +445,7 @@ void PlanFragmentExecutor::send_report(bool done) { // This will send a report even if we are cancelled. If the query completed correctly // but fragments still need to be cancelled (e.g. limit reached), the coordinator will // be waiting for a final report and profile. - if (_is_report_success) { - _report_status_cb(status, profile(), done || !status.ok()); - } else { - _report_status_cb(status, nullptr, done || !status.ok()); - } + _report_status_cb(status, _is_report_success ? profile() : nullptr, done || !status.ok()); } void PlanFragmentExecutor::stop_report_thread() { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org