github-actions[bot] commented on code in PR #33015:
URL: https://github.com/apache/doris/pull/33015#discussion_r1547513280


##########
be/src/runtime/plan_fragment_executor.h:
##########
@@ -21,6 +21,7 @@
 #pragma once
 
 #include <gen_cpp/PaloInternalService_types.h>

Review Comment:
   warning: 'gen_cpp/PaloInternalService_types.h' file not found 
[clang-diagnostic-error]
   ```cpp
   #include <gen_cpp/PaloInternalService_types.h>
            ^
   ```
   



##########
be/src/runtime/fragment_mgr.cpp:
##########
@@ -1609,4 +1616,27 @@ void 
FragmentMgr::get_runtime_query_info(std::vector<WorkloadQueryInfo>* query_i
     }
 }
 
+Status FragmentMgr::collect_realtime_profile_x(

Review Comment:
   warning: method 'collect_realtime_profile_x' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static Status FragmentMgr::collect_realtime_profile_x(
   ```
   



##########
be/src/runtime/query_context.cpp:
##########
@@ -293,4 +310,81 @@
     return Status::OK();
 }
 
+void QueryContext::add_fragment_profile_x(
+        int fragment_id,
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
pipeline_profiles) {
+#ifndef NDEBUG
+    for (const auto& p : pipeline_profiles) {
+        DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, 
query {}, fragment {}",
+                                            print_id(this->_query_id), 
fragment_id);
+    }
+#endif
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
+             print_id(this->_query_id), fragment_id, pipeline_profiles.size());
+    _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles));
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& instance_id,
+                                        std::shared_ptr<TRuntimeProfileTree> 
profile) {
+    DCHECK(profile != nullptr) << print_id(instance_id);
+
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    _profile_map.insert(std::make_pair(instance_id, profile));
+}
+
+void QueryContext::_async_report_profile() {
+    if (!enable_profile()) {
+        return;
+    }
+
+    _async_report_profile_x();
+    _async_report_profile_non_pipeline();
+}
+
+void QueryContext::_async_report_profile_non_pipeline() {
+    if (enable_pipeline_exec()) {
+        return;
+    }
+
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    LOG_INFO("Query {}, register query profile, instance profile count {}", 
print_id(_query_id),
+             _profile_map.size());
+
+    for (auto& [instance_id, instance_profile] : _profile_map) {
+        
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_instance_profile(
+                _query_id, this->coord_addr, instance_id, instance_profile);
+    }
+}
+
+void QueryContext::_async_report_profile_x() {

Review Comment:
   warning: method '_async_report_profile_x' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void QueryContext::_async_report_profile_x() const {
   ```
   
   be/src/runtime/query_context.h:383:
   ```diff
   -     void _async_report_profile_x();
   +     void _async_report_profile_x() const;
   ```
   



##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,300 @@
 
 #include "runtime/runtime_query_statistics_mgr.h"
 
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
 #include "util/debug_util.h"
 #include "util/time.h"
+#include "util/uid_util.h"
 #include "vec/core/block.h"
 
 namespace doris {
 
+static Status _doReportExecStatsRpc(const TNetworkAddress& coor_addr,
+                                    const TReportExecStatusParams& req,
+                                    TReportExecStatusResult& res) {
+    Status client_status;
+    FrontendServiceConnection 
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+                                         &client_status);
+    if (!client_status.ok()) {
+        LOG_WARNING(
+                "could not get client rpc client of {} when reporting 
profiles, reason is {}, "
+                "not reporting, profile will be lost",
+                PrintThriftNetworkAddress(coor_addr), 
client_status.to_string());
+        return Status::RpcError("Client rpc client failed");
+    }
+
+    try {
+        try {
+            rpc_client->reportExecStatus(res, req);
+        } catch (const apache::thrift::transport::TTransportException& e) {
+            LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+                        PrintThriftNetworkAddress(coor_addr), e.what());
+            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+            if (!client_status.ok()) {
+                LOG_WARNING("Reopen failed, reason: {}", 
client_status.to_string());
+                return Status::RpcError("Open rpc client failed");
+            }
+
+            rpc_client->reportExecStatus(res, req);
+        }
+    } catch (apache::thrift::TApplicationException& e) {
+        if (e.getType() == e.UNKNOWN_METHOD) {
+            LOG_WARNING(
+                    "Failed to send statistics to {} due to {}, usually 
because the frontend "
+                    "is not upgraded, check the version",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        } else {
+            LOG_WARNING(
+                    "Failed to send statistics to {}, reason: {}, you can see 
fe log for "
+                    "details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        }
+        return Status::RpcError("Send stats failed");
+    } catch (std::exception& e) {
+        LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see 
fe log for details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        return Status::RpcError("Send stats failed");
+    }
+
+    return Status::OK();
+}
+
+TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
+        const TUniqueId& query_id, int32 fragment_id,
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& f_profile) {
+    TReportExecStatusParams req;
+    req.__set_query_id(query_id);
+    req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+    req.__set_fragment_id(fragment_id);
+    std::vector<TDetailedReportParams> detailed_params;
+    for (const auto& p_profile : f_profile) {
+        TDetailedReportParams tmp;
+        tmp.__set_profile(*p_profile);
+        // tmp.fragment_instance_id is not needed for pipeline x
+        detailed_params.push_back(tmp);
+    }
+
+    req.__set_detailed_report(detailed_params);
+    return req;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+    if (_report_profile_thread == nullptr) {
+        _report_profile_thread = std::make_unique<std::thread>(
+                &RuntimeQueryStatiticsMgr::report_query_profiles_thread, this);
+    }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+    while (true) {
+        {
+            std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+            while (!_force_report_profile) {
+                _report_profile_cv.wait_for(lock, 
std::chrono::milliseconds(5000));
+                break;
+            }
+        }
+
+        _report_query_profiles_function();
+        _force_report_profile = false;
+
+        if (_report_profile_thread_stop) {
+            return;
+        }
+    }
+}
+
+void RuntimeQueryStatiticsMgr::force_report_profile() {
+    std::unique_lock<std::mutex> lock(_report_profile_mutex);
+    _force_report_profile = true;
+    _report_profile_cv.notify_one();
+}
+
+void RuntimeQueryStatiticsMgr::stop_report_thread() {

Review Comment:
   warning: method 'stop_report_thread' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/runtime/runtime_query_statistics_mgr.h:87:
   ```diff
   -     void stop_report_thread();
   +     static void stop_report_thread();
   ```
   



##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,300 @@
 
 #include "runtime/runtime_query_statistics_mgr.h"
 
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
 #include "util/debug_util.h"
 #include "util/time.h"
+#include "util/uid_util.h"
 #include "vec/core/block.h"
 
 namespace doris {
 
+static Status _doReportExecStatsRpc(const TNetworkAddress& coor_addr,
+                                    const TReportExecStatusParams& req,
+                                    TReportExecStatusResult& res) {
+    Status client_status;
+    FrontendServiceConnection 
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+                                         &client_status);
+    if (!client_status.ok()) {
+        LOG_WARNING(
+                "could not get client rpc client of {} when reporting 
profiles, reason is {}, "
+                "not reporting, profile will be lost",
+                PrintThriftNetworkAddress(coor_addr), 
client_status.to_string());
+        return Status::RpcError("Client rpc client failed");
+    }
+
+    try {
+        try {
+            rpc_client->reportExecStatus(res, req);
+        } catch (const apache::thrift::transport::TTransportException& e) {
+            LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+                        PrintThriftNetworkAddress(coor_addr), e.what());
+            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+            if (!client_status.ok()) {
+                LOG_WARNING("Reopen failed, reason: {}", 
client_status.to_string());
+                return Status::RpcError("Open rpc client failed");
+            }
+
+            rpc_client->reportExecStatus(res, req);
+        }
+    } catch (apache::thrift::TApplicationException& e) {
+        if (e.getType() == e.UNKNOWN_METHOD) {
+            LOG_WARNING(
+                    "Failed to send statistics to {} due to {}, usually 
because the frontend "
+                    "is not upgraded, check the version",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        } else {
+            LOG_WARNING(
+                    "Failed to send statistics to {}, reason: {}, you can see 
fe log for "
+                    "details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        }
+        return Status::RpcError("Send stats failed");
+    } catch (std::exception& e) {
+        LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see 
fe log for details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        return Status::RpcError("Send stats failed");
+    }
+
+    return Status::OK();
+}
+
+TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
+        const TUniqueId& query_id, int32 fragment_id,
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& f_profile) {
+    TReportExecStatusParams req;
+    req.__set_query_id(query_id);
+    req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+    req.__set_fragment_id(fragment_id);
+    std::vector<TDetailedReportParams> detailed_params;
+    for (const auto& p_profile : f_profile) {
+        TDetailedReportParams tmp;
+        tmp.__set_profile(*p_profile);
+        // tmp.fragment_instance_id is not needed for pipeline x
+        detailed_params.push_back(tmp);
+    }
+
+    req.__set_detailed_report(detailed_params);
+    return req;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {

Review Comment:
   warning: method 'start_report_thread' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/runtime/runtime_query_statistics_mgr.h:84:
   ```diff
   -     void start_report_thread();
   +     static void start_report_thread();
   ```
   



##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,300 @@
 
 #include "runtime/runtime_query_statistics_mgr.h"
 
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
 #include "util/debug_util.h"
 #include "util/time.h"
+#include "util/uid_util.h"
 #include "vec/core/block.h"
 
 namespace doris {
 
+static Status _doReportExecStatsRpc(const TNetworkAddress& coor_addr,
+                                    const TReportExecStatusParams& req,
+                                    TReportExecStatusResult& res) {
+    Status client_status;
+    FrontendServiceConnection 
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+                                         &client_status);
+    if (!client_status.ok()) {
+        LOG_WARNING(
+                "could not get client rpc client of {} when reporting 
profiles, reason is {}, "
+                "not reporting, profile will be lost",
+                PrintThriftNetworkAddress(coor_addr), 
client_status.to_string());
+        return Status::RpcError("Client rpc client failed");
+    }
+
+    try {
+        try {
+            rpc_client->reportExecStatus(res, req);
+        } catch (const apache::thrift::transport::TTransportException& e) {
+            LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+                        PrintThriftNetworkAddress(coor_addr), e.what());
+            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+            if (!client_status.ok()) {
+                LOG_WARNING("Reopen failed, reason: {}", 
client_status.to_string());
+                return Status::RpcError("Open rpc client failed");
+            }
+
+            rpc_client->reportExecStatus(res, req);
+        }
+    } catch (apache::thrift::TApplicationException& e) {
+        if (e.getType() == e.UNKNOWN_METHOD) {
+            LOG_WARNING(
+                    "Failed to send statistics to {} due to {}, usually 
because the frontend "
+                    "is not upgraded, check the version",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        } else {
+            LOG_WARNING(
+                    "Failed to send statistics to {}, reason: {}, you can see 
fe log for "
+                    "details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        }
+        return Status::RpcError("Send stats failed");
+    } catch (std::exception& e) {
+        LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see 
fe log for details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        return Status::RpcError("Send stats failed");
+    }
+
+    return Status::OK();
+}
+
+TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
+        const TUniqueId& query_id, int32 fragment_id,
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& f_profile) {
+    TReportExecStatusParams req;
+    req.__set_query_id(query_id);
+    req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+    req.__set_fragment_id(fragment_id);
+    std::vector<TDetailedReportParams> detailed_params;
+    for (const auto& p_profile : f_profile) {
+        TDetailedReportParams tmp;
+        tmp.__set_profile(*p_profile);
+        // tmp.fragment_instance_id is not needed for pipeline x
+        detailed_params.push_back(tmp);
+    }
+
+    req.__set_detailed_report(detailed_params);
+    return req;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+    if (_report_profile_thread == nullptr) {
+        _report_profile_thread = std::make_unique<std::thread>(
+                &RuntimeQueryStatiticsMgr::report_query_profiles_thread, this);
+    }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+    while (true) {
+        {
+            std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+            while (!_force_report_profile) {
+                _report_profile_cv.wait_for(lock, 
std::chrono::milliseconds(5000));
+                break;
+            }
+        }
+
+        _report_query_profiles_function();
+        _force_report_profile = false;
+
+        if (_report_profile_thread_stop) {
+            return;
+        }
+    }
+}
+
+void RuntimeQueryStatiticsMgr::force_report_profile() {

Review Comment:
   warning: method 'force_report_profile' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/runtime/runtime_query_statistics_mgr.h:86:
   ```diff
   -     void force_report_profile();
   +     static void force_report_profile();
   ```
   



##########
be/src/runtime/plan_fragment_executor.cpp:
##########
@@ -644,4 +646,11 @@ void PlanFragmentExecutor::close() {
     _closed = true;
 }
 
+std::shared_ptr<TRuntimeProfileTree> 
PlanFragmentExecutor::collect_realtime_profile() const {

Review Comment:
   warning: method 'collect_realtime_profile' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/runtime/plan_fragment_executor.h:149:
   ```diff
   -     std::shared_ptr<TRuntimeProfileTree> collect_realtime_profile() const;
   +     static std::shared_ptr<TRuntimeProfileTree> collect_realtime_profile() 
;
   ```
   
   ```suggestion
   std::shared_ptr<TRuntimeProfileTree> 
PlanFragmentExecutor::collect_realtime_profile() {
   ```
   



##########
be/src/runtime/query_context.cpp:
##########
@@ -293,4 +310,81 @@ Status QueryContext::set_workload_group(WorkloadGroupPtr& 
tg) {
     return Status::OK();
 }
 
+void QueryContext::add_fragment_profile_x(
+        int fragment_id,
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& 
pipeline_profiles) {
+#ifndef NDEBUG
+    for (const auto& p : pipeline_profiles) {
+        DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, 
query {}, fragment {}",
+                                            print_id(this->_query_id), 
fragment_id);
+    }
+#endif
+    std::lock_guard<std::mutex> l(_profile_mutex);
+    LOG_INFO("Query X add fragment profile, query {}, fragment {}, pipeline 
profile count {} ",
+             print_id(this->_query_id), fragment_id, pipeline_profiles.size());
+    _profile_map_x.insert(std::make_pair(fragment_id, pipeline_profiles));
+}
+
+void QueryContext::add_instance_profile(const TUniqueId& instance_id,
+                                        std::shared_ptr<TRuntimeProfileTree> 
profile) {
+    DCHECK(profile != nullptr) << print_id(instance_id);
+
+    std::lock_guard<std::mutex> lg(_profile_mutex);
+    _profile_map.insert(std::make_pair(instance_id, profile));
+}
+
+void QueryContext::_async_report_profile() {
+    if (!enable_profile()) {
+        return;
+    }
+
+    _async_report_profile_x();
+    _async_report_profile_non_pipeline();
+}
+
+void QueryContext::_async_report_profile_non_pipeline() {

Review Comment:
   warning: method '_async_report_profile_non_pipeline' can be made const 
[readability-make-member-function-const]
   
   ```suggestion
   void QueryContext::_async_report_profile_non_pipeline() const {
   ```
   
   be/src/runtime/query_context.h:382:
   ```diff
   -     void _async_report_profile_non_pipeline();
   +     void _async_report_profile_non_pipeline() const;
   ```
   



##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,300 @@
 
 #include "runtime/runtime_query_statistics_mgr.h"
 
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
 #include "util/debug_util.h"
 #include "util/time.h"
+#include "util/uid_util.h"
 #include "vec/core/block.h"
 
 namespace doris {
 
+static Status _doReportExecStatsRpc(const TNetworkAddress& coor_addr,
+                                    const TReportExecStatusParams& req,
+                                    TReportExecStatusResult& res) {
+    Status client_status;
+    FrontendServiceConnection 
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+                                         &client_status);
+    if (!client_status.ok()) {
+        LOG_WARNING(
+                "could not get client rpc client of {} when reporting 
profiles, reason is {}, "
+                "not reporting, profile will be lost",
+                PrintThriftNetworkAddress(coor_addr), 
client_status.to_string());
+        return Status::RpcError("Client rpc client failed");
+    }
+
+    try {
+        try {
+            rpc_client->reportExecStatus(res, req);
+        } catch (const apache::thrift::transport::TTransportException& e) {
+            LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+                        PrintThriftNetworkAddress(coor_addr), e.what());
+            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+            if (!client_status.ok()) {
+                LOG_WARNING("Reopen failed, reason: {}", 
client_status.to_string());
+                return Status::RpcError("Open rpc client failed");
+            }
+
+            rpc_client->reportExecStatus(res, req);
+        }
+    } catch (apache::thrift::TApplicationException& e) {
+        if (e.getType() == e.UNKNOWN_METHOD) {
+            LOG_WARNING(
+                    "Failed to send statistics to {} due to {}, usually 
because the frontend "
+                    "is not upgraded, check the version",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        } else {
+            LOG_WARNING(
+                    "Failed to send statistics to {}, reason: {}, you can see 
fe log for "
+                    "details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        }
+        return Status::RpcError("Send stats failed");
+    } catch (std::exception& e) {
+        LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see 
fe log for details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        return Status::RpcError("Send stats failed");
+    }
+
+    return Status::OK();
+}
+
+TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
+        const TUniqueId& query_id, int32 fragment_id,
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& f_profile) {
+    TReportExecStatusParams req;
+    req.__set_query_id(query_id);
+    req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+    req.__set_fragment_id(fragment_id);
+    std::vector<TDetailedReportParams> detailed_params;
+    for (const auto& p_profile : f_profile) {
+        TDetailedReportParams tmp;
+        tmp.__set_profile(*p_profile);
+        // tmp.fragment_instance_id is not needed for pipeline x
+        detailed_params.push_back(tmp);
+    }
+
+    req.__set_detailed_report(detailed_params);
+    return req;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+    if (_report_profile_thread == nullptr) {
+        _report_profile_thread = std::make_unique<std::thread>(
+                &RuntimeQueryStatiticsMgr::report_query_profiles_thread, this);
+    }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+    while (true) {
+        {
+            std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+            while (!_force_report_profile) {
+                _report_profile_cv.wait_for(lock, 
std::chrono::milliseconds(5000));
+                break;
+            }
+        }
+
+        _report_query_profiles_function();
+        _force_report_profile = false;
+
+        if (_report_profile_thread_stop) {
+            return;
+        }
+    }
+}
+
+void RuntimeQueryStatiticsMgr::force_report_profile() {
+    std::unique_lock<std::mutex> lock(_report_profile_mutex);
+    _force_report_profile = true;
+    _report_profile_cv.notify_one();
+}
+
+void RuntimeQueryStatiticsMgr::stop_report_thread() {
+    if (_report_profile_thread == nullptr) {
+        return;
+    }
+
+    {
+        std::unique_lock<std::mutex> lock(_report_profile_mutex);
+        _force_report_profile = true;
+        _report_profile_thread_stop = true;
+        _report_profile_cv.notify_one();
+    }
+
+    _report_profile_thread->join();
+    _report_profile_thread.reset();
+}
+
+void RuntimeQueryStatiticsMgr::register_instance_profile(

Review Comment:
   warning: method 'register_instance_profile' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void RuntimeQueryStatiticsMgr::register_instance_profile(
   ```
   



##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,300 @@
 
 #include "runtime/runtime_query_statistics_mgr.h"
 
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
 #include "util/debug_util.h"
 #include "util/time.h"
+#include "util/uid_util.h"
 #include "vec/core/block.h"
 
 namespace doris {
 
+static Status _doReportExecStatsRpc(const TNetworkAddress& coor_addr,
+                                    const TReportExecStatusParams& req,
+                                    TReportExecStatusResult& res) {
+    Status client_status;
+    FrontendServiceConnection 
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+                                         &client_status);
+    if (!client_status.ok()) {
+        LOG_WARNING(
+                "could not get client rpc client of {} when reporting 
profiles, reason is {}, "
+                "not reporting, profile will be lost",
+                PrintThriftNetworkAddress(coor_addr), 
client_status.to_string());
+        return Status::RpcError("Client rpc client failed");
+    }
+
+    try {
+        try {
+            rpc_client->reportExecStatus(res, req);
+        } catch (const apache::thrift::transport::TTransportException& e) {
+            LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+                        PrintThriftNetworkAddress(coor_addr), e.what());
+            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+            if (!client_status.ok()) {
+                LOG_WARNING("Reopen failed, reason: {}", 
client_status.to_string());
+                return Status::RpcError("Open rpc client failed");
+            }
+
+            rpc_client->reportExecStatus(res, req);
+        }
+    } catch (apache::thrift::TApplicationException& e) {
+        if (e.getType() == e.UNKNOWN_METHOD) {
+            LOG_WARNING(
+                    "Failed to send statistics to {} due to {}, usually 
because the frontend "
+                    "is not upgraded, check the version",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        } else {
+            LOG_WARNING(
+                    "Failed to send statistics to {}, reason: {}, you can see 
fe log for "
+                    "details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        }
+        return Status::RpcError("Send stats failed");
+    } catch (std::exception& e) {
+        LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see 
fe log for details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        return Status::RpcError("Send stats failed");
+    }
+
+    return Status::OK();
+}
+
+TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
+        const TUniqueId& query_id, int32 fragment_id,
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& f_profile) {
+    TReportExecStatusParams req;
+    req.__set_query_id(query_id);
+    req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+    req.__set_fragment_id(fragment_id);
+    std::vector<TDetailedReportParams> detailed_params;
+    for (const auto& p_profile : f_profile) {
+        TDetailedReportParams tmp;
+        tmp.__set_profile(*p_profile);
+        // tmp.fragment_instance_id is not needed for pipeline x
+        detailed_params.push_back(tmp);
+    }
+
+    req.__set_detailed_report(detailed_params);
+    return req;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+    if (_report_profile_thread == nullptr) {
+        _report_profile_thread = std::make_unique<std::thread>(
+                &RuntimeQueryStatiticsMgr::report_query_profiles_thread, this);
+    }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+    while (true) {
+        {
+            std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+            while (!_force_report_profile) {
+                _report_profile_cv.wait_for(lock, 
std::chrono::milliseconds(5000));
+                break;
+            }
+        }
+
+        _report_query_profiles_function();
+        _force_report_profile = false;
+
+        if (_report_profile_thread_stop) {
+            return;
+        }
+    }
+}
+
+void RuntimeQueryStatiticsMgr::force_report_profile() {
+    std::unique_lock<std::mutex> lock(_report_profile_mutex);
+    _force_report_profile = true;
+    _report_profile_cv.notify_one();
+}
+
+void RuntimeQueryStatiticsMgr::stop_report_thread() {
+    if (_report_profile_thread == nullptr) {
+        return;
+    }
+
+    {
+        std::unique_lock<std::mutex> lock(_report_profile_mutex);
+        _force_report_profile = true;
+        _report_profile_thread_stop = true;
+        _report_profile_cv.notify_one();
+    }
+
+    _report_profile_thread->join();
+    _report_profile_thread.reset();
+}
+
+void RuntimeQueryStatiticsMgr::register_instance_profile(
+        const TUniqueId& query_id, const TNetworkAddress& coor_addr, const 
TUniqueId& instance_id,
+        std::shared_ptr<TRuntimeProfileTree> instance_profile) {
+    if (instance_profile == nullptr) {
+        auto msg = fmt::format("Register instance profile {} {} failed, 
profile is null",
+                               print_id(query_id), print_id(instance_id));
+        DCHECK(false) << msg;
+        LOG_ERROR(msg);
+        return;
+    }
+
+    std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+    if (!_profile_map.contains(query_id)) {
+        _profile_map[query_id] = std::make_tuple(
+                coor_addr, std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>());
+    }
+
+    std::get<1>(_profile_map[query_id]).insert(std::make_pair(instance_id, 
instance_profile));
+
+    LOG_INFO("Register instance profile {} {}", print_id(query_id), 
print_id(instance_id));
+}
+
+void RuntimeQueryStatiticsMgr::register_fragment_profile_x(
+        const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t 
fragment_id,
+        std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles) {
+    for (const auto& p : p_profiles) {
+        if (p == nullptr) {
+            auto msg = fmt::format("Register fragment profile {} {} failed, 
profile is null",
+                                   print_id(query_id), fragment_id);
+            DCHECK(false) << msg;
+            LOG_ERROR(msg);
+            return;
+        }
+    }
+
+    std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+    if (!_profile_map_x.contains(query_id)) {
+        _profile_map_x[query_id] = std::make_tuple(
+                coor_addr,
+                std::unordered_map<int, 
std::vector<std::shared_ptr<TRuntimeProfileTree>>>());
+    }
+
+    std::get<1>(_profile_map_x[query_id]).insert(std::make_pair(fragment_id, 
p_profiles));
+
+    LOG_INFO("register x profile done {}, fragment {}, profiles {}", 
print_id(query_id),
+             fragment_id, p_profiles.size());
+}
+
+void RuntimeQueryStatiticsMgr::_report_query_profiles_non_pipeline() {

Review Comment:
   warning: method '_report_query_profiles_non_pipeline' can be made static 
[readability-convert-member-functions-to-static]
   
   be/src/runtime/runtime_query_statistics_mgr.h:128:
   ```diff
   -     void _report_query_profiles_non_pipeline();
   +     static void _report_query_profiles_non_pipeline();
   ```
   



##########
be/src/runtime/runtime_query_statistics_mgr.cpp:
##########
@@ -17,14 +17,300 @@
 
 #include "runtime/runtime_query_statistics_mgr.h"
 
+#include <gen_cpp/FrontendService_types.h>
+#include <gen_cpp/RuntimeProfile_types.h>
+#include <gen_cpp/Status_types.h>
+#include <gen_cpp/Types_types.h>
+#include <thrift/TApplicationException.h>
+
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <random>
+#include <shared_mutex>
+#include <string>
+#include <tuple>
+#include <unordered_map>
+#include <vector>
+
+#include "common/logging.h"
 #include "runtime/client_cache.h"
 #include "runtime/exec_env.h"
+#include "runtime/query_context.h"
+#include "service/backend_options.h"
 #include "util/debug_util.h"
 #include "util/time.h"
+#include "util/uid_util.h"
 #include "vec/core/block.h"
 
 namespace doris {
 
+static Status _doReportExecStatsRpc(const TNetworkAddress& coor_addr,
+                                    const TReportExecStatusParams& req,
+                                    TReportExecStatusResult& res) {
+    Status client_status;
+    FrontendServiceConnection 
rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr,
+                                         &client_status);
+    if (!client_status.ok()) {
+        LOG_WARNING(
+                "could not get client rpc client of {} when reporting 
profiles, reason is {}, "
+                "not reporting, profile will be lost",
+                PrintThriftNetworkAddress(coor_addr), 
client_status.to_string());
+        return Status::RpcError("Client rpc client failed");
+    }
+
+    try {
+        try {
+            rpc_client->reportExecStatus(res, req);
+        } catch (const apache::thrift::transport::TTransportException& e) {
+            LOG_WARNING("Transport exception from {}, reason: {}, reopening",
+                        PrintThriftNetworkAddress(coor_addr), e.what());
+            client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
+            if (!client_status.ok()) {
+                LOG_WARNING("Reopen failed, reason: {}", 
client_status.to_string());
+                return Status::RpcError("Open rpc client failed");
+            }
+
+            rpc_client->reportExecStatus(res, req);
+        }
+    } catch (apache::thrift::TApplicationException& e) {
+        if (e.getType() == e.UNKNOWN_METHOD) {
+            LOG_WARNING(
+                    "Failed to send statistics to {} due to {}, usually 
because the frontend "
+                    "is not upgraded, check the version",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        } else {
+            LOG_WARNING(
+                    "Failed to send statistics to {}, reason: {}, you can see 
fe log for "
+                    "details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        }
+        return Status::RpcError("Send stats failed");
+    } catch (std::exception& e) {
+        LOG_WARNING("Failed to send statistics to {}, reason: {}, you can see 
fe log for details.",
+                    PrintThriftNetworkAddress(coor_addr), e.what());
+        return Status::RpcError("Send stats failed");
+    }
+
+    return Status::OK();
+}
+
+TReportExecStatusParams 
RuntimeQueryStatiticsMgr::create_report_exec_status_params(
+        const TUniqueId& query_id, int32 fragment_id,
+        const std::vector<std::shared_ptr<TRuntimeProfileTree>>& f_profile) {
+    TReportExecStatusParams req;
+    req.__set_query_id(query_id);
+    req.__set_backend_id(ExecEnv::GetInstance()->master_info()->backend_id);
+    req.__set_fragment_id(fragment_id);
+    std::vector<TDetailedReportParams> detailed_params;
+    for (const auto& p_profile : f_profile) {
+        TDetailedReportParams tmp;
+        tmp.__set_profile(*p_profile);
+        // tmp.fragment_instance_id is not needed for pipeline x
+        detailed_params.push_back(tmp);
+    }
+
+    req.__set_detailed_report(detailed_params);
+    return req;
+}
+
+void RuntimeQueryStatiticsMgr::start_report_thread() {
+    if (_report_profile_thread == nullptr) {
+        _report_profile_thread = std::make_unique<std::thread>(
+                &RuntimeQueryStatiticsMgr::report_query_profiles_thread, this);
+    }
+}
+
+void RuntimeQueryStatiticsMgr::report_query_profiles_thread() {
+    while (true) {
+        {
+            std::unique_lock<std::mutex> lock(_report_profile_mutex);
+
+            while (!_force_report_profile) {
+                _report_profile_cv.wait_for(lock, 
std::chrono::milliseconds(5000));
+                break;
+            }
+        }
+
+        _report_query_profiles_function();
+        _force_report_profile = false;
+
+        if (_report_profile_thread_stop) {
+            return;
+        }
+    }
+}
+
+void RuntimeQueryStatiticsMgr::force_report_profile() {
+    std::unique_lock<std::mutex> lock(_report_profile_mutex);
+    _force_report_profile = true;
+    _report_profile_cv.notify_one();
+}
+
+void RuntimeQueryStatiticsMgr::stop_report_thread() {
+    if (_report_profile_thread == nullptr) {
+        return;
+    }
+
+    {
+        std::unique_lock<std::mutex> lock(_report_profile_mutex);
+        _force_report_profile = true;
+        _report_profile_thread_stop = true;
+        _report_profile_cv.notify_one();
+    }
+
+    _report_profile_thread->join();
+    _report_profile_thread.reset();
+}
+
+void RuntimeQueryStatiticsMgr::register_instance_profile(
+        const TUniqueId& query_id, const TNetworkAddress& coor_addr, const 
TUniqueId& instance_id,
+        std::shared_ptr<TRuntimeProfileTree> instance_profile) {
+    if (instance_profile == nullptr) {
+        auto msg = fmt::format("Register instance profile {} {} failed, 
profile is null",
+                               print_id(query_id), print_id(instance_id));
+        DCHECK(false) << msg;
+        LOG_ERROR(msg);
+        return;
+    }
+
+    std::lock_guard<std::shared_mutex> lg(_query_profile_map_lock);
+
+    if (!_profile_map.contains(query_id)) {
+        _profile_map[query_id] = std::make_tuple(
+                coor_addr, std::unordered_map<TUniqueId, 
std::shared_ptr<TRuntimeProfileTree>>());
+    }
+
+    std::get<1>(_profile_map[query_id]).insert(std::make_pair(instance_id, 
instance_profile));
+
+    LOG_INFO("Register instance profile {} {}", print_id(query_id), 
print_id(instance_id));
+}
+
+void RuntimeQueryStatiticsMgr::register_fragment_profile_x(

Review Comment:
   warning: method 'register_fragment_profile_x' can be made static 
[readability-convert-member-functions-to-static]
   
   ```suggestion
   static void RuntimeQueryStatiticsMgr::register_fragment_profile_x(
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to