This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 3d6077efe0 [pipeline](profile) Support real-time profile report in 
pipeline (#16772)
3d6077efe0 is described below

commit 3d6077efe0e02d0e5a1f555cfe51bea9df3e2d6e
Author: Gabriel <gabrielleeb...@gmail.com>
AuthorDate: Fri Feb 17 10:01:34 2023 +0800

    [pipeline](profile) Support real-time profile report in pipeline (#16772)
---
 be/src/pipeline/pipeline_fragment_context.cpp | 235 +++++++++--------------
 be/src/pipeline/pipeline_fragment_context.h   |  54 +++++-
 be/src/runtime/fragment_mgr.cpp               | 261 ++++++++++++++------------
 be/src/runtime/fragment_mgr.h                 |  24 ++-
 be/src/runtime/plan_fragment_executor.cpp     |   6 +-
 5 files changed, 304 insertions(+), 276 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 6e91991061..524e1be67e 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -90,7 +90,8 @@ namespace doris::pipeline {
 PipelineFragmentContext::PipelineFragmentContext(
         const TUniqueId& query_id, const TUniqueId& instance_id, const int 
fragment_id,
         int backend_num, std::shared_ptr<QueryFragmentsCtx> query_ctx, 
ExecEnv* exec_env,
-        std::function<void(RuntimeState*, Status*)> call_back)
+        const std::function<void(RuntimeState*, Status*)>& call_back,
+        const report_status_callback& report_status_cb)
         : _query_id(query_id),
           _fragment_instance_id(instance_id),
           _fragment_id(fragment_id),
@@ -98,12 +99,16 @@ PipelineFragmentContext::PipelineFragmentContext(
           _exec_env(exec_env),
           _cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR),
           _query_ctx(std::move(query_ctx)),
-          _call_back(call_back) {
+          _call_back(call_back),
+          _report_thread_active(false),
+          _report_status_cb(report_status_cb),
+          _is_report_on_cancel(true) {
     _fragment_watcher.start();
 }
 
 PipelineFragmentContext::~PipelineFragmentContext() {
     _call_back(_runtime_state.get(), &_exec_status);
+    DCHECK(!_report_thread_active);
 }
 
 void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
@@ -115,6 +120,7 @@ void PipelineFragmentContext::cancel(const 
PPlanFragmentCancelReason& reason,
         }
         if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
             _exec_status = Status::Cancelled(msg);
+            _set_is_report_on_cancel(false);
         }
         _runtime_state->set_is_cancelled(true);
         if (_pipe != nullptr) {
@@ -269,6 +275,13 @@ Status PipelineFragmentContext::prepare(const 
doris::TExecPlanFragmentParams& re
     
_runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(), 
true, nullptr);
     _runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, 
nullptr);
 
+    if (_is_report_success && config::status_report_interval > 0) {
+        std::unique_lock<std::mutex> l(_report_thread_lock);
+        _report_thread = std::thread(&PipelineFragmentContext::report_profile, 
this);
+        // make sure the thread started up, otherwise report_profile() might 
get into a race
+        // with stop_report_thread()
+        _report_thread_started_cv.wait(l);
+    }
     _prepared = true;
     return Status::OK();
 }
@@ -455,6 +468,68 @@ Status PipelineFragmentContext::_build_pipeline_tasks(
     return Status::OK();
 }
 
+void PipelineFragmentContext::_stop_report_thread() {
+    if (!_report_thread_active) {
+        return;
+    }
+
+    _report_thread_active = false;
+
+    _stop_report_thread_cv.notify_one();
+    _report_thread.join();
+}
+
+void PipelineFragmentContext::report_profile() {
+    SCOPED_ATTACH_TASK(_runtime_state.get());
+    VLOG_FILE << "report_profile(): instance_id=" << 
_runtime_state->fragment_instance_id();
+
+    _report_thread_active = true;
+
+    std::unique_lock<std::mutex> l(_report_thread_lock);
+    // tell Open() that we started
+    _report_thread_started_cv.notify_one();
+
+    // Jitter the reporting time of remote fragments by a random amount between
+    // 0 and the report_interval.  This way, the coordinator doesn't get all 
the
+    // updates at once so its better for contention as well as smoother 
progress
+    // reporting.
+    int report_fragment_offset = rand() % config::status_report_interval;
+    // We don't want to wait longer than it takes to run the entire fragment.
+    _stop_report_thread_cv.wait_for(l, 
std::chrono::seconds(report_fragment_offset));
+    while (_report_thread_active) {
+        if (config::status_report_interval > 0) {
+            // wait_for can return because the timeout occurred or the 
condition variable
+            // was signaled.  We can't rely on its return value to distinguish 
between the
+            // two cases (e.g. there is a race here where the wait timed out 
but before grabbing
+            // the lock, the condition variable was signaled).  Instead, we 
will use an external
+            // flag, _report_thread_active, to coordinate this.
+            _stop_report_thread_cv.wait_for(l,
+                                            
std::chrono::seconds(config::status_report_interval));
+        } else {
+            LOG(WARNING) << "config::status_report_interval is equal to or 
less than zero, exiting "
+                            "reporting thread.";
+            break;
+        }
+
+        if (VLOG_FILE_IS_ON) {
+            VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : 
" ")
+                      << "profile for instance " << 
_runtime_state->fragment_instance_id();
+            std::stringstream ss;
+            _runtime_state->runtime_profile()->compute_time_in_profile();
+            _runtime_state->runtime_profile()->pretty_print(&ss);
+            VLOG_FILE << ss.str();
+        }
+
+        if (!_report_thread_active) {
+            break;
+        }
+
+        send_report(false);
+    }
+
+    VLOG_FILE << "exiting reporting thread: instance_id=" << 
_runtime_state->fragment_instance_id();
+}
+
 // TODO: use virtual function to do abstruct
 Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr 
cur_pipe) {
     auto node_type = node->type();
@@ -752,6 +827,7 @@ Status PipelineFragmentContext::_create_sink(const 
TDataSink& thrift_sink) {
 void PipelineFragmentContext::_close_action() {
     
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
     send_report(true);
+    _stop_report_thread();
     // all submitted tasks done
     _exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this());
 }
@@ -763,24 +839,11 @@ void PipelineFragmentContext::close_a_pipeline() {
     }
 }
 
-// TODO pipeline dump copy from FragmentExecState::to_http_path
-std::string PipelineFragmentContext::to_http_path(const std::string& 
file_name) {
-    std::stringstream url;
-    url << "http://"; << BackendOptions::get_localhost() << ":" << 
config::webserver_port
-        << "/api/_download_load?"
-        << "token=" << _exec_env->token() << "&file=" << file_name;
-    return url.str();
-}
-
-// TODO pipeline dump copy from FragmentExecState::coordinator_callback
-// TODO pipeline this callback should be placed in a thread pool
 void PipelineFragmentContext::send_report(bool done) {
     Status exec_status = Status::OK();
     {
         std::lock_guard<std::mutex> l(_status_lock);
-        if (!_exec_status.ok()) {
-            exec_status = _exec_status;
-        }
+        exec_status = _exec_status;
     }
 
     // If plan is done successfully, but _is_report_success is false,
@@ -789,137 +852,21 @@ void PipelineFragmentContext::send_report(bool done) {
         return;
     }
 
-    Status coord_status;
-    auto coord_addr = _query_ctx->coord_addr;
-    FrontendServiceConnection coord(_exec_env->frontend_client_cache(), 
coord_addr, &coord_status);
-    if (!coord_status.ok()) {
-        std::stringstream ss;
-        ss << "couldn't get a client for " << coord_addr << ", reason: " << 
coord_status;
-        LOG(WARNING) << "query_id: " << print_id(_query_id) << ", " << 
ss.str();
-        {
-            std::lock_guard<std::mutex> l(_status_lock);
-            if (_exec_status.ok()) {
-                _exec_status = Status::InternalError(ss.str());
-            }
-        }
+    // If both _is_report_success and _is_report_on_cancel are false,
+    // which means no matter query is success or failed, no report is needed.
+    // This may happen when the query limit reached and
+    // a internal cancellation being processed
+    if (!_is_report_success && !_is_report_on_cancel) {
         return;
     }
-    auto* profile = _is_report_success ? _runtime_state->runtime_profile() : 
nullptr;
-
-    TReportExecStatusParams params;
-    params.protocol_version = FrontendServiceVersion::V1;
-    params.__set_query_id(_query_id);
-    params.__set_backend_num(_backend_num);
-    params.__set_fragment_instance_id(_fragment_instance_id);
-    params.__set_fragment_id(_fragment_id);
-    exec_status.set_t_status(&params);
-    params.__set_done(true);
-
-    auto* runtime_state = _runtime_state.get();
-    DCHECK(runtime_state != nullptr);
-    if (runtime_state->query_type() == TQueryType::LOAD && !done && 
exec_status.ok()) {
-        // this is a load plan, and load is not finished, just make a brief 
report
-        params.__set_loaded_rows(runtime_state->num_rows_load_total());
-        params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
-    } else {
-        if (runtime_state->query_type() == TQueryType::LOAD) {
-            params.__set_loaded_rows(runtime_state->num_rows_load_total());
-            params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
-        }
-        if (profile == nullptr) {
-            params.__isset.profile = false;
-        } else {
-            profile->to_thrift(&params.profile);
-            params.__isset.profile = true;
-        }
-
-        if (!runtime_state->output_files().empty()) {
-            params.__isset.delta_urls = true;
-            for (auto& it : runtime_state->output_files()) {
-                params.delta_urls.push_back(to_http_path(it));
-            }
-        }
-        if (runtime_state->num_rows_load_total() > 0 ||
-            runtime_state->num_rows_load_filtered() > 0) {
-            params.__isset.load_counters = true;
-
-            static std::string s_dpp_normal_all = "dpp.norm.ALL";
-            static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
-            static std::string s_unselected_rows = "unselected.rows";
-
-            params.load_counters.emplace(s_dpp_normal_all,
-                                         
std::to_string(runtime_state->num_rows_load_success()));
-            params.load_counters.emplace(s_dpp_abnormal_all,
-                                         
std::to_string(runtime_state->num_rows_load_filtered()));
-            params.load_counters.emplace(s_unselected_rows,
-                                         
std::to_string(runtime_state->num_rows_load_unselected()));
-        }
-        if (!runtime_state->get_error_log_file_path().empty()) {
-            params.__set_tracking_url(
-                    
to_load_error_http_path(runtime_state->get_error_log_file_path()));
-        }
-        if (!runtime_state->export_output_files().empty()) {
-            params.__isset.export_files = true;
-            params.export_files = runtime_state->export_output_files();
-        }
-        if (!runtime_state->tablet_commit_infos().empty()) {
-            params.__isset.commitInfos = true;
-            
params.commitInfos.reserve(runtime_state->tablet_commit_infos().size());
-            for (auto& info : runtime_state->tablet_commit_infos()) {
-                params.commitInfos.push_back(info);
-            }
-        }
-        if (!runtime_state->error_tablet_infos().empty()) {
-            params.__isset.errorTabletInfos = true;
-            
params.errorTabletInfos.reserve(runtime_state->error_tablet_infos().size());
-            for (auto& info : runtime_state->error_tablet_infos()) {
-                params.errorTabletInfos.push_back(info);
-            }
-        }
 
-        // Send new errors to coordinator
-        runtime_state->get_unreported_errors(&(params.error_log));
-        params.__isset.error_log = (params.error_log.size() > 0);
-    }
-
-    if (_exec_env->master_info()->__isset.backend_id) {
-        params.__set_backend_id(_exec_env->master_info()->backend_id);
-    }
-
-    TReportExecStatusResult res;
-    Status rpc_status;
-
-    VLOG_DEBUG << "reportExecStatus params is "
-               << apache::thrift::ThriftDebugString(params).c_str();
-    try {
-        try {
-            coord->reportExecStatus(res, params);
-        } catch (TTransportException& e) {
-            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << 
print_id(_query_id)
-                         << ", instance id: " << 
print_id(_fragment_instance_id) << " to "
-                         << coord_addr << ", err: " << e.what();
-            rpc_status = coord.reopen();
-
-            if (!rpc_status.ok()) {
-                // we need to cancel the execution of this fragment
-                cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "report rpc 
fail");
-                return;
-            }
-            coord->reportExecStatus(res, params);
-        }
-
-        rpc_status = Status(res.status);
-    } catch (TException& e) {
-        std::stringstream msg;
-        msg << "ReportExecStatus() to " << coord_addr << " failed:\n" << 
e.what();
-        LOG(WARNING) << msg.str();
-        rpc_status = Status::InternalError(msg.str());
-    }
-
-    if (!rpc_status.ok()) {
-        // we need to cancel the execution of this fragment
-        cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "rpc fail 2");
-    }
+    _report_status_cb(
+            {exec_status, _is_report_success ? 
_runtime_state->runtime_profile() : nullptr,
+             done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, 
_fragment_id,
+             _fragment_instance_id, _backend_num, _runtime_state.get(),
+             std::bind(&PipelineFragmentContext::update_status, this, 
std::placeholders::_1),
+             std::bind(&PipelineFragmentContext::cancel, this, 
std::placeholders::_1,
+                       std::placeholders::_2)});
 }
 
 } // namespace doris::pipeline
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 3a4deb5772..38b07d503e 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -25,6 +25,7 @@
 namespace doris {
 class ExecNode;
 class DataSink;
+struct ReportStatusRequest;
 
 namespace vectorized {
 template <bool is_intersect>
@@ -37,10 +38,18 @@ class PipelineTask;
 
 class PipelineFragmentContext : public 
std::enable_shared_from_this<PipelineFragmentContext> {
 public:
+    // Callback to report execution status of plan fragment.
+    // 'profile' is the cumulative profile, 'done' indicates whether the 
execution
+    // is done or still continuing.
+    // Note: this does not take a const RuntimeProfile&, because it might need 
to call
+    // functions like PrettyPrint() or to_thrift(), neither of which is const
+    // because they take locks.
+    using report_status_callback = std::function<void(const 
ReportStatusRequest)>;
     PipelineFragmentContext(const TUniqueId& query_id, const TUniqueId& 
instance_id,
                             const int fragment_id, int backend_num,
                             std::shared_ptr<QueryFragmentsCtx> query_ctx, 
ExecEnv* exec_env,
-                            std::function<void(RuntimeState*, Status*)> 
call_back);
+                            const std::function<void(RuntimeState*, Status*)>& 
call_back,
+                            const report_status_callback& report_status_cb);
 
     ~PipelineFragmentContext();
 
@@ -92,7 +101,27 @@ public:
     void set_pipe(std::shared_ptr<io::StreamLoadPipe> pipe) { _pipe = pipe; }
     std::shared_ptr<io::StreamLoadPipe> get_pipe() const { return _pipe; }
 
+    void report_profile();
+
+    Status update_status(Status status) {
+        std::lock_guard<std::mutex> l(_status_lock);
+        if (!status.ok() && _exec_status.ok()) {
+            _exec_status = status;
+        }
+        return _exec_status;
+    }
+
 private:
+    Status _create_sink(const TDataSink& t_data_sink);
+    Status _build_pipelines(ExecNode*, PipelinePtr);
+    Status _build_pipeline_tasks(const doris::TExecPlanFragmentParams& 
request);
+    Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& 
request);
+    template <bool is_intersect>
+    Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
+    void _close_action();
+    void _stop_report_thread();
+    void _set_is_report_on_cancel(bool val) { _is_report_on_cancel = val; }
+
     // Id of this query
     TUniqueId _query_id;
     TUniqueId _fragment_instance_id;
@@ -142,16 +171,23 @@ private:
 
     std::shared_ptr<io::StreamLoadPipe> _pipe;
 
-    Status _create_sink(const TDataSink& t_data_sink);
-    Status _build_pipelines(ExecNode*, PipelinePtr);
-    Status _build_pipeline_tasks(const doris::TExecPlanFragmentParams& 
request);
-    Status _build_pipeline_tasks(const doris::TPipelineFragmentParams& 
request);
-
-    template <bool is_intersect>
-    Status _build_operators_for_set_operation_node(ExecNode*, PipelinePtr);
     std::function<void(RuntimeState*, Status*)> _call_back;
-    void _close_action();
     std::once_flag _close_once_flag;
+
+    std::condition_variable _report_thread_started_cv;
+    // true if we started the thread
+    bool _report_thread_active;
+    // profile reporting-related
+    report_status_callback _report_status_cb;
+    std::thread _report_thread;
+    std::mutex _report_thread_lock;
+
+    // Indicates that profile reporting thread should stop.
+    // Tied to _report_thread_lock.
+    std::condition_variable _stop_report_thread_cv;
+    // If this is set to false, and '_is_report_success' is false as well,
+    // This executor will not report status to FE on being cancelled.
+    bool _is_report_on_cancel;
 };
 } // namespace pipeline
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 7d0d2e82b6..f2a2c9e798 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -79,14 +79,14 @@ using apache::thrift::transport::TTransportException;
 class RuntimeProfile;
 class FragmentExecState {
 public:
+    using report_status_callback_impl = std::function<void(const 
ReportStatusRequest)>;
     // Constructor by using QueryFragmentsCtx
     FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, 
int backend_num,
-                      ExecEnv* exec_env, std::shared_ptr<QueryFragmentsCtx> 
fragments_ctx);
+                      ExecEnv* exec_env, std::shared_ptr<QueryFragmentsCtx> 
fragments_ctx,
+                      const report_status_callback_impl& 
report_status_cb_impl);
 
     Status prepare(const TExecPlanFragmentParams& params);
 
-    std::string to_http_path(const std::string& file_name);
-
     Status execute();
 
     Status cancel(const PPlanFragmentCancelReason& reason, const std::string& 
msg = "");
@@ -146,7 +146,6 @@ private:
     TUniqueId _fragment_instance_id;
     // Used to report to coordinator which backend is over
     int _backend_num;
-    ExecEnv* _exec_env;
     TNetworkAddress _coord_addr;
 
     PlanFragmentExecutor _executor;
@@ -171,22 +170,24 @@ private:
 
     // If set the true, this plan fragment will be executed only after FE send 
execution start rpc.
     bool _need_wait_execution_trigger = false;
+    report_status_callback_impl _report_status_cb_impl;
 };
 
 FragmentExecState::FragmentExecState(const TUniqueId& query_id,
                                      const TUniqueId& fragment_instance_id, 
int backend_num,
                                      ExecEnv* exec_env,
-                                     std::shared_ptr<QueryFragmentsCtx> 
fragments_ctx)
+                                     std::shared_ptr<QueryFragmentsCtx> 
fragments_ctx,
+                                     const report_status_callback_impl& 
report_status_cb_impl)
         : _query_id(query_id),
           _fragment_instance_id(fragment_instance_id),
           _backend_num(backend_num),
-          _exec_env(exec_env),
           _executor(exec_env, 
std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback),
                                               this, std::placeholders::_1, 
std::placeholders::_2,
                                               std::placeholders::_3)),
           _set_rsc_info(false),
           _timeout_second(-1),
-          _fragments_ctx(std::move(fragments_ctx)) {
+          _fragments_ctx(std::move(fragments_ctx)),
+          _report_status_cb_impl(report_status_cb_impl) {
     _start_time = DateTimeValue::local_time();
     _coord_addr = _fragments_ctx->coord_addr;
 }
@@ -256,9 +257,67 @@ Status FragmentExecState::cancel(const 
PPlanFragmentCancelReason& reason, const
     return Status::OK();
 }
 
-std::string FragmentExecState::to_http_path(const std::string& file_name) {
+// There can only be one of these callbacks in-flight at any moment, because
+// it is only invoked from the executor's reporting thread.
+// Also, the reported status will always reflect the most recent execution 
status,
+// including the final status when execution finishes.
+void FragmentExecState::coordinator_callback(const Status& status, 
RuntimeProfile* profile,
+                                             bool done) {
+    _report_status_cb_impl(
+            {status, profile, done, _coord_addr, _query_id, -1, 
_fragment_instance_id, _backend_num,
+             _executor.runtime_state(),
+             std::bind(&FragmentExecState::update_status, this, 
std::placeholders::_1),
+             std::bind(&PlanFragmentExecutor::cancel, &_executor, 
std::placeholders::_1,
+                       std::placeholders::_2)});
+    DCHECK(status.ok() || done); // if !status.ok() => done
+}
+
+FragmentMgr::FragmentMgr(ExecEnv* exec_env)
+        : _exec_env(exec_env), _stop_background_threads_latch(1) {
+    _entity = 
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
+    INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
+    REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return 
_fragment_map.size(); });
+
+    auto s = Thread::create(
+            "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { 
this->cancel_worker(); },
+            &_cancel_thread);
+    CHECK(s.ok()) << s.to_string();
+
+    // TODO(zc): we need a better thread-pool
+    // now one user can use all the thread pool, others have no resource.
+    s = ThreadPoolBuilder("FragmentMgrThreadPool")
+                .set_min_threads(config::fragment_pool_thread_num_min)
+                .set_max_threads(config::fragment_pool_thread_num_max)
+                .set_max_queue_size(config::fragment_pool_queue_size)
+                .build(&_thread_pool);
+
+    REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
+                         [this]() { return _thread_pool->get_queue_size(); });
+    CHECK(s.ok()) << s.to_string();
+}
+
+FragmentMgr::~FragmentMgr() {
+    DEREGISTER_HOOK_METRIC(plan_fragment_count);
+    DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
+    _stop_background_threads_latch.count_down();
+    if (_cancel_thread) {
+        _cancel_thread->join();
+    }
+    // Stop all the worker, should wait for a while?
+    // _thread_pool->wait_for();
+    _thread_pool->shutdown();
+
+    // Only me can delete
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        _fragment_map.clear();
+        _fragments_ctx_map.clear();
+    }
+}
+
+std::string FragmentMgr::to_http_path(const std::string& file_name) {
     std::stringstream url;
-    url << "http://"; << get_host_port(BackendOptions::get_localhost(), 
config::webserver_port)
+    url << "http://"; << BackendOptions::get_localhost() << ":" << 
config::webserver_port
         << "/api/_download_load?"
         << "token=" << _exec_env->token() << "&file=" << file_name;
     return url.str();
@@ -268,94 +327,96 @@ std::string FragmentExecState::to_http_path(const 
std::string& file_name) {
 // it is only invoked from the executor's reporting thread.
 // Also, the reported status will always reflect the most recent execution 
status,
 // including the final status when execution finishes.
-void FragmentExecState::coordinator_callback(const Status& status, 
RuntimeProfile* profile,
-                                             bool done) {
-    DCHECK(status.ok() || done); // if !status.ok() => done
-    Status exec_status = update_status(status);
+void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
+    DCHECK(req.status.ok() || req.done); // if !status.ok() => done
+    Status exec_status = req.update_fn(req.status);
 
     Status coord_status;
-    FrontendServiceConnection coord(_exec_env->frontend_client_cache(), 
_coord_addr, &coord_status);
+    FrontendServiceConnection coord(_exec_env->frontend_client_cache(), 
req.coord_addr,
+                                    &coord_status);
     if (!coord_status.ok()) {
         std::stringstream ss;
-        UniqueId uid(_query_id.hi, _query_id.lo);
-        ss << "couldn't get a client for " << _coord_addr << ", reason: " << 
coord_status;
+        UniqueId uid(req.query_id.hi, req.query_id.lo);
+        ss << "couldn't get a client for " << req.coord_addr << ", reason: " 
<< coord_status;
         LOG(WARNING) << "query_id: " << uid << ", " << ss.str();
-        update_status(Status::InternalError(ss.str()));
+        req.update_fn(Status::InternalError(ss.str()));
         return;
     }
 
     TReportExecStatusParams params;
     params.protocol_version = FrontendServiceVersion::V1;
-    params.__set_query_id(_query_id);
-    params.__set_backend_num(_backend_num);
-    params.__set_fragment_instance_id(_fragment_instance_id);
+    params.__set_query_id(req.query_id);
+    params.__set_backend_num(req.backend_num);
+    params.__set_fragment_instance_id(req.fragment_instance_id);
+    params.__set_fragment_id(req.fragment_id);
     exec_status.set_t_status(&params);
-    params.__set_done(done);
+    params.__set_done(req.done);
 
-    RuntimeState* runtime_state = _executor.runtime_state();
-    DCHECK(runtime_state != nullptr);
-    if (runtime_state->query_type() == TQueryType::LOAD && !done && 
status.ok()) {
+    DCHECK(req.runtime_state != nullptr);
+    if (req.runtime_state->query_type() == TQueryType::LOAD && !req.done && 
req.status.ok()) {
         // this is a load plan, and load is not finished, just make a brief 
report
-        params.__set_loaded_rows(runtime_state->num_rows_load_total());
-        params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
+        params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
+        params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
     } else {
-        if (runtime_state->query_type() == TQueryType::LOAD) {
-            params.__set_loaded_rows(runtime_state->num_rows_load_total());
-            params.__set_loaded_bytes(runtime_state->num_bytes_load_total());
+        if (req.runtime_state->query_type() == TQueryType::LOAD) {
+            params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
+            
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
         }
-        if (profile == nullptr) {
+        if (req.profile == nullptr) {
             params.__isset.profile = false;
         } else {
-            profile->to_thrift(&params.profile);
+            req.profile->to_thrift(&params.profile);
             params.__isset.profile = true;
         }
 
-        if (!runtime_state->output_files().empty()) {
+        if (!req.runtime_state->output_files().empty()) {
             params.__isset.delta_urls = true;
-            for (auto& it : runtime_state->output_files()) {
+            for (auto& it : req.runtime_state->output_files()) {
                 params.delta_urls.push_back(to_http_path(it));
             }
         }
-        if (runtime_state->num_rows_load_total() > 0 ||
-            runtime_state->num_rows_load_filtered() > 0) {
+        if (req.runtime_state->num_rows_load_total() > 0 ||
+            req.runtime_state->num_rows_load_filtered() > 0) {
             params.__isset.load_counters = true;
 
             static std::string s_dpp_normal_all = "dpp.norm.ALL";
             static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
             static std::string s_unselected_rows = "unselected.rows";
 
-            params.load_counters.emplace(s_dpp_normal_all,
-                                         
std::to_string(runtime_state->num_rows_load_success()));
-            params.load_counters.emplace(s_dpp_abnormal_all,
-                                         
std::to_string(runtime_state->num_rows_load_filtered()));
-            params.load_counters.emplace(s_unselected_rows,
-                                         
std::to_string(runtime_state->num_rows_load_unselected()));
+            params.load_counters.emplace(
+                    s_dpp_normal_all, 
std::to_string(req.runtime_state->num_rows_load_success()));
+            params.load_counters.emplace(
+                    s_dpp_abnormal_all,
+                    
std::to_string(req.runtime_state->num_rows_load_filtered()));
+            params.load_counters.emplace(
+                    s_unselected_rows,
+                    
std::to_string(req.runtime_state->num_rows_load_unselected()));
         }
-        if (!runtime_state->get_error_log_file_path().empty()) {
+        if (!req.runtime_state->get_error_log_file_path().empty()) {
             params.__set_tracking_url(
-                    
to_load_error_http_path(runtime_state->get_error_log_file_path()));
+                    
to_load_error_http_path(req.runtime_state->get_error_log_file_path()));
         }
-        if (!runtime_state->export_output_files().empty()) {
+        if (!req.runtime_state->export_output_files().empty()) {
             params.__isset.export_files = true;
-            params.export_files = runtime_state->export_output_files();
+            params.export_files = req.runtime_state->export_output_files();
         }
-        if (!runtime_state->tablet_commit_infos().empty()) {
+        if (!req.runtime_state->tablet_commit_infos().empty()) {
             params.__isset.commitInfos = true;
-            
params.commitInfos.reserve(runtime_state->tablet_commit_infos().size());
-            for (auto& info : runtime_state->tablet_commit_infos()) {
+            
params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
+            for (auto& info : req.runtime_state->tablet_commit_infos()) {
                 params.commitInfos.push_back(info);
             }
         }
-        if (!runtime_state->error_tablet_infos().empty()) {
+        if (!req.runtime_state->error_tablet_infos().empty()) {
             params.__isset.errorTabletInfos = true;
-            
params.errorTabletInfos.reserve(runtime_state->error_tablet_infos().size());
-            for (auto& info : runtime_state->error_tablet_infos()) {
+            
params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
+            for (auto& info : req.runtime_state->error_tablet_infos()) {
                 params.errorTabletInfos.push_back(info);
             }
         }
 
         // Send new errors to coordinator
-        runtime_state->get_unreported_errors(&(params.error_log));
+        req.runtime_state->get_unreported_errors(&(params.error_log));
         params.__isset.error_log = (params.error_log.size() > 0);
     }
 
@@ -370,22 +431,23 @@ void FragmentExecState::coordinator_callback(const 
Status& status, RuntimeProfil
                << apache::thrift::ThriftDebugString(params).c_str();
     if (!exec_status.ok()) {
         LOG(WARNING) << "report error status: " << exec_status.to_string()
-                     << " to coordinator: " << _coord_addr << ", query id: " 
<< print_id(_query_id)
-                     << ", instance id: " << print_id(_fragment_instance_id);
+                     << " to coordinator: " << req.coord_addr
+                     << ", query id: " << print_id(req.query_id)
+                     << ", instance id: " << 
print_id(req.fragment_instance_id);
     }
     try {
         try {
             coord->reportExecStatus(res, params);
         } catch (TTransportException& e) {
-            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << 
print_id(_query_id)
-                         << ", instance id: " << 
print_id(_fragment_instance_id) << " to "
-                         << _coord_addr << ", err: " << e.what();
+            LOG(WARNING) << "Retrying ReportExecStatus. query id: " << 
print_id(req.query_id)
+                         << ", instance id: " << 
print_id(req.fragment_instance_id) << " to "
+                         << req.coord_addr << ", err: " << e.what();
             rpc_status = coord.reopen();
 
             if (!rpc_status.ok()) {
                 // we need to cancel the execution of this fragment
-                update_status(rpc_status);
-                _executor.cancel();
+                req.update_fn(rpc_status);
+                req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR, 
"report rpc fail");
                 return;
             }
             coord->reportExecStatus(res, params);
@@ -394,64 +456,22 @@ void FragmentExecState::coordinator_callback(const 
Status& status, RuntimeProfil
         rpc_status = Status(res.status);
     } catch (TException& e) {
         std::stringstream msg;
-        msg << "ReportExecStatus() to " << _coord_addr << " failed:\n" << 
e.what();
+        msg << "ReportExecStatus() to " << req.coord_addr << " failed:\n" << 
e.what();
         LOG(WARNING) << msg.str();
         rpc_status = Status::InternalError(msg.str());
     }
 
     if (!rpc_status.ok()) {
         // we need to cancel the execution of this fragment
-        update_status(rpc_status);
-        _executor.cancel();
-    }
-}
-
-FragmentMgr::FragmentMgr(ExecEnv* exec_env)
-        : _exec_env(exec_env), _stop_background_threads_latch(1) {
-    _entity = 
DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
-    INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
-    REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return 
_fragment_map.size(); });
-
-    auto s = Thread::create(
-            "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { 
this->cancel_worker(); },
-            &_cancel_thread);
-    CHECK(s.ok()) << s.to_string();
-
-    // TODO(zc): we need a better thread-pool
-    // now one user can use all the thread pool, others have no resource.
-    s = ThreadPoolBuilder("FragmentMgrThreadPool")
-                .set_min_threads(config::fragment_pool_thread_num_min)
-                .set_max_threads(config::fragment_pool_thread_num_max)
-                .set_max_queue_size(config::fragment_pool_queue_size)
-                .build(&_thread_pool);
-
-    REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
-                         [this]() { return _thread_pool->get_queue_size(); });
-    CHECK(s.ok()) << s.to_string();
-}
-
-FragmentMgr::~FragmentMgr() {
-    DEREGISTER_HOOK_METRIC(plan_fragment_count);
-    DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
-    _stop_background_threads_latch.count_down();
-    if (_cancel_thread) {
-        _cancel_thread->join();
-    }
-    // Stop all the worker, should wait for a while?
-    // _thread_pool->wait_for();
-    _thread_pool->shutdown();
-
-    // Only me can delete
-    {
-        std::lock_guard<std::mutex> lock(_lock);
-        _fragment_map.clear();
-        _fragments_ctx_map.clear();
+        req.update_fn(rpc_status);
+        req.cancel_fn(PPlanFragmentCancelReason::INTERNAL_ERROR, "rpc fail 2");
     }
 }
 
 static void empty_function(RuntimeState*, Status*) {}
 
-void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, 
FinishCallback cb) {
+void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state,
+                               const FinishCallback& cb) {
     std::string func_name {"PlanFragmentExecutor::_exec_actual"};
 #ifndef BE_TEST
     auto span = 
exec_state->executor()->runtime_state()->get_tracer()->StartSpan(func_name);
@@ -597,7 +617,8 @@ void FragmentMgr::remove_pipeline_context(
     }
 }
 
-Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, 
FinishCallback cb) {
+Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
+                                       const FinishCallback& cb) {
     auto tracer = telemetry::is_current_span_valid() ? 
telemetry::get_tracer("tracer")
                                                      : 
telemetry::get_noop_tracer();
     VLOG_ROW << "exec_plan_fragment params is "
@@ -703,9 +724,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
     }
     fragments_ctx->fragment_ids.push_back(fragment_instance_id);
 
-    exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
-                                           params.params.fragment_instance_id, 
params.backend_num,
-                                           _exec_env, fragments_ctx));
+    exec_state.reset(
+            new FragmentExecState(fragments_ctx->query_id, 
params.params.fragment_instance_id,
+                                  params.backend_num, _exec_env, fragments_ctx,
+                                  
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback),
+                                                  this, 
std::placeholders::_1)));
     if (params.__isset.need_wait_execution_trigger && 
params.need_wait_execution_trigger) {
         // set need_wait_execution_trigger means this instance will not 
actually being executed
         // until the execPlanFragmentStart RPC trigger to start it.
@@ -756,7 +779,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
         std::shared_ptr<pipeline::PipelineFragmentContext> context =
                 std::make_shared<pipeline::PipelineFragmentContext>(
                         fragments_ctx->query_id, fragment_instance_id, -1, 
params.backend_num,
-                        fragments_ctx, _exec_env, cb);
+                        fragments_ctx, _exec_env, cb,
+                        
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
+                                        std::placeholders::_1));
         {
             SCOPED_RAW_TIMER(&duration_ns);
             auto prepare_st = context->prepare(params);
@@ -782,7 +807,8 @@ Status FragmentMgr::exec_plan_fragment(const 
TExecPlanFragmentParams& params, Fi
     return Status::OK();
 }
 
-Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params, 
FinishCallback cb) {
+Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
+                                       const FinishCallback& cb) {
     auto tracer = telemetry::is_current_span_valid() ? 
telemetry::get_tracer("tracer")
                                                      : 
telemetry::get_noop_tracer();
     VLOG_ROW << "exec_plan_fragment params is "
@@ -889,8 +915,11 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params, Fi
 
         fragments_ctx->fragment_ids.push_back(fragment_instance_id);
 
-        exec_state.reset(new FragmentExecState(fragments_ctx->query_id, 
fragment_instance_id,
-                                               local_params.backend_num, 
_exec_env, fragments_ctx));
+        exec_state.reset(new FragmentExecState(
+                fragments_ctx->query_id, fragment_instance_id, 
local_params.backend_num, _exec_env,
+                fragments_ctx,
+                
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
+                                std::placeholders::_1)));
         if (params.__isset.need_wait_execution_trigger && 
params.need_wait_execution_trigger) {
             // set need_wait_execution_trigger means this instance will not 
actually being executed
             // until the execPlanFragmentStart RPC trigger to start it.
@@ -906,7 +935,9 @@ Status FragmentMgr::exec_plan_fragment(const 
TPipelineFragmentParams& params, Fi
         std::shared_ptr<pipeline::PipelineFragmentContext> context =
                 std::make_shared<pipeline::PipelineFragmentContext>(
                         fragments_ctx->query_id, fragment_instance_id, 
params.fragment_id,
-                        local_params.backend_num, fragments_ctx, _exec_env, 
cb);
+                        local_params.backend_num, fragments_ctx, _exec_env, cb,
+                        
std::bind<void>(std::mem_fn(&FragmentMgr::coordinator_callback), this,
+                                        std::placeholders::_1));
         {
             SCOPED_RAW_TIMER(&duration_ns);
             auto prepare_st = context->prepare(params, i);
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 98e6c26ddd..6508ea0659 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -61,6 +61,20 @@ class RuntimeFilterMergeController;
 
 std::string to_load_error_http_path(const std::string& file_name);
 
+struct ReportStatusRequest {
+    const Status& status;
+    RuntimeProfile* profile;
+    bool done;
+    TNetworkAddress coord_addr;
+    TUniqueId query_id;
+    int fragment_id;
+    TUniqueId fragment_instance_id;
+    int backend_num;
+    RuntimeState* runtime_state;
+    std::function<Status(Status)> update_fn;
+    std::function<void(const PPlanFragmentCancelReason&, const std::string&)> 
cancel_fn;
+};
+
 // This class used to manage all the fragment execute in this instance
 class FragmentMgr : public RestMonitorIface {
 public:
@@ -78,9 +92,9 @@ public:
             std::shared_ptr<pipeline::PipelineFragmentContext> 
pipeline_context);
 
     // TODO(zc): report this is over
-    Status exec_plan_fragment(const TExecPlanFragmentParams& params, 
FinishCallback cb);
+    Status exec_plan_fragment(const TExecPlanFragmentParams& params, const 
FinishCallback& cb);
 
-    Status exec_plan_fragment(const TPipelineFragmentParams& params, 
FinishCallback cb);
+    Status exec_plan_fragment(const TPipelineFragmentParams& params, const 
FinishCallback& cb);
 
     Status start_query_execution(const PExecPlanFragmentStartRequest* request);
 
@@ -116,8 +130,12 @@ public:
 
     std::shared_ptr<io::StreamLoadPipe> get_pipe(const TUniqueId& 
fragment_instance_id);
 
+    std::string to_http_path(const std::string& file_name);
+
+    void coordinator_callback(const ReportStatusRequest& req);
+
 private:
-    void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, 
FinishCallback cb);
+    void _exec_actual(std::shared_ptr<FragmentExecState> exec_state, const 
FinishCallback& cb);
 
     void _set_scan_concurrency(const TExecPlanFragmentParams& params,
                                QueryFragmentsCtx* fragments_ctx);
diff --git a/be/src/runtime/plan_fragment_executor.cpp 
b/be/src/runtime/plan_fragment_executor.cpp
index 96908dde30..f62bd4bae1 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -445,11 +445,7 @@ void PlanFragmentExecutor::send_report(bool done) {
     // This will send a report even if we are cancelled.  If the query 
completed correctly
     // but fragments still need to be cancelled (e.g. limit reached), the 
coordinator will
     // be waiting for a final report and profile.
-    if (_is_report_success) {
-        _report_status_cb(status, profile(), done || !status.ok());
-    } else {
-        _report_status_cb(status, nullptr, done || !status.ok());
-    }
+    _report_status_cb(status, _is_report_success ? profile() : nullptr, done 
|| !status.ok());
 }
 
 void PlanFragmentExecutor::stop_report_thread() {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to