This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.2-lts by this push:
     new df860e1db6 Revert "[Improvement](brpc) Using a thread pool for RPC 
service avoiding std::mutex block brpc::bthread (#16639)" (#17198)
df860e1db6 is described below

commit df860e1db64d83ffae630decbc08fd2f151cf387
Author: Mingyu Chen <[email protected]>
AuthorDate: Mon Feb 27 20:50:39 2023 +0800

    Revert "[Improvement](brpc) Using a thread pool for RPC service avoiding 
std::mutex block brpc::bthread (#16639)" (#17198)
    
    This reverts commit 8534abc47976969600b09e19d74bee5393f61d8d.
    This PR has some potential dead lock issue, so revert it from 1.2 branch.
---
 be/src/common/config.h                             |  14 +-
 be/src/service/internal_service.cpp                | 922 ++++++++-------------
 be/src/service/internal_service.h                  |   9 +-
 be/src/util/doris_metrics.h                        |  10 -
 be/src/util/priority_thread_pool.hpp               |   6 +-
 .../maint-monitor/monitor-metrics/metrics.md       |   6 -
 gensrc/proto/internal_service.proto                |   1 -
 7 files changed, 347 insertions(+), 621 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index b278a9c55c..4a97529abc 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -35,8 +35,7 @@ CONF_Int32(be_port, "9060");
 // port for brpc
 CONF_Int32(brpc_port, "8060");
 
-// the number of bthreads for brpc, the default value is set to -1,
-// which means the number of bthreads is #cpu-cores
+// the number of bthreads for brpc, the default value is set to -1, which 
means the number of bthreads is #cpu-cores
 CONF_Int32(brpc_num_threads, "-1");
 
 // port to brpc server for single replica load
@@ -400,15 +399,8 @@ CONF_Int32(single_replica_load_download_num_workers, "64");
 CONF_Int64(load_data_reserve_hours, "4");
 // log error log will be removed after this time
 CONF_mInt64(load_error_log_reserve_hours, "48");
-
-// be brpc interface is classified into two categories: light and heavy
-// each category has diffrent thread number
-// threads to handle heavy api interface, such as transmit_data/transmit_block 
etc
-CONF_Int32(brpc_heavy_work_pool_threads, "192");
-// threads to handle light api interface, such as 
exec_plan_fragment_prepare/exec_plan_fragment_start
-CONF_Int32(brpc_light_work_pool_threads, "32");
-CONF_Int32(brpc_heavy_work_pool_max_queue_size, "10240");
-CONF_Int32(brpc_light_work_pool_max_queue_size, "10240");
+CONF_Int32(number_tablet_writer_threads, "16");
+CONF_Int32(number_slave_replica_download_threads, "64");
 
 // The maximum amount of data that can be processed by a stream load
 CONF_mInt64(streaming_load_max_mb, "10240");
diff --git a/be/src/service/internal_service.cpp 
b/be/src/service/internal_service.cpp
index aa3510452d..381782af85 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -60,15 +60,7 @@ namespace doris {
 
 const uint32_t DOWNLOAD_FILE_MAX_RETRY = 3;
 
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_active_threads, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_active_threads, 
MetricUnit::NOUNIT);
-
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_pool_max_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_pool_max_queue_size, 
MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(heavy_work_max_threads, MetricUnit::NOUNIT);
-DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(light_work_max_threads, MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, 
MetricUnit::NOUNIT);
 
 bthread_key_t btls_key;
 
@@ -102,42 +94,16 @@ private:
 
 PInternalServiceImpl::PInternalServiceImpl(ExecEnv* exec_env)
         : _exec_env(exec_env),
-          _heavy_work_pool(config::brpc_heavy_work_pool_threads,
-                           config::brpc_heavy_work_pool_max_queue_size, 
"brpc_heavy"),
-          _light_work_pool(config::brpc_light_work_pool_threads,
-                           config::brpc_light_work_pool_max_queue_size, 
"brpc_light") {
-    REGISTER_HOOK_METRIC(heavy_work_pool_queue_size,
-                         [this]() { return _heavy_work_pool.get_queue_size(); 
});
-    REGISTER_HOOK_METRIC(light_work_pool_queue_size,
-                         [this]() { return _light_work_pool.get_queue_size(); 
});
-    REGISTER_HOOK_METRIC(heavy_work_active_threads,
-                         [this]() { return 
_heavy_work_pool.get_active_threads(); });
-    REGISTER_HOOK_METRIC(light_work_active_threads,
-                         [this]() { return 
_light_work_pool.get_active_threads(); });
-
-    REGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size,
-                         []() { return 
config::brpc_heavy_work_pool_max_queue_size; });
-    REGISTER_HOOK_METRIC(light_work_pool_max_queue_size,
-                         []() { return 
config::brpc_light_work_pool_max_queue_size; });
-    REGISTER_HOOK_METRIC(heavy_work_max_threads,
-                         []() { return config::brpc_heavy_work_pool_threads; 
});
-    REGISTER_HOOK_METRIC(light_work_max_threads,
-                         []() { return config::brpc_light_work_pool_threads; 
});
-
+          _tablet_worker_pool(config::number_tablet_writer_threads, 10240, 
"tablet_writer"),
+          
_slave_replica_worker_pool(config::number_slave_replica_download_threads, 10240,
+                                     "replica_download") {
+    REGISTER_HOOK_METRIC(add_batch_task_queue_size,
+                         [this]() { return 
_tablet_worker_pool.get_queue_size(); });
     CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
 }
 
 PInternalServiceImpl::~PInternalServiceImpl() {
-    DEREGISTER_HOOK_METRIC(heavy_work_pool_queue_size);
-    DEREGISTER_HOOK_METRIC(light_work_pool_queue_size);
-    DEREGISTER_HOOK_METRIC(heavy_work_active_threads);
-    DEREGISTER_HOOK_METRIC(light_work_active_threads);
-
-    DEREGISTER_HOOK_METRIC(heavy_work_pool_max_queue_size);
-    DEREGISTER_HOOK_METRIC(light_work_pool_max_queue_size);
-    DEREGISTER_HOOK_METRIC(heavy_work_max_threads);
-    DEREGISTER_HOOK_METRIC(light_work_max_threads);
-
+    DEREGISTER_HOOK_METRIC(add_batch_task_queue_size);
     CHECK_EQ(0, bthread_key_delete(btls_key));
 }
 
@@ -165,7 +131,7 @@ void 
PInternalServiceImpl::transmit_data_by_http(google::protobuf::RpcController
     _transmit_data(cntl_base, new_request, response, new_done, st);
 }
 
-void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* 
controller,
+void PInternalServiceImpl::_transmit_data(google::protobuf::RpcController* 
cntl_base,
                                           const PTransmitDataParams* request,
                                           PTransmitDataResult* response,
                                           google::protobuf::Closure* done,
@@ -203,31 +169,23 @@ void 
PInternalServiceImpl::tablet_writer_open(google::protobuf::RpcController* c
                                               const PTabletWriterOpenRequest* 
request,
                                               PTabletWriterOpenResult* 
response,
                                               google::protobuf::Closure* done) 
{
-    bool ret = _light_work_pool.offer([this, request, response, done]() {
-        VLOG_RPC << "tablet writer open, id=" << request->id()
-                 << ", index_id=" << request->index_id() << ", txn_id=" << 
request->txn_id();
-        brpc::ClosureGuard closure_guard(done);
-        auto st = _exec_env->load_channel_mgr()->open(*request);
-        if (!st.ok()) {
-            LOG(WARNING) << "load channel open failed, message=" << 
st.get_error_msg()
-                         << ", id=" << request->id() << ", index_id=" << 
request->index_id()
-                         << ", txn_id=" << request->txn_id();
-        }
-        st.to_protobuf(response->mutable_status());
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << 
request->index_id()
+             << ", txn_id=" << request->txn_id();
+    brpc::ClosureGuard closure_guard(done);
+    auto st = _exec_env->load_channel_mgr()->open(*request);
+    if (!st.ok()) {
+        LOG(WARNING) << "load channel open failed, message=" << 
st.get_error_msg()
+                     << ", id=" << request->id() << ", index_id=" << 
request->index_id()
+                     << ", txn_id=" << request->txn_id();
     }
+    st.to_protobuf(response->mutable_status());
 }
 
-void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* 
controller,
+void PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* 
cntl_base,
                                               const PExecPlanFragmentRequest* 
request,
                                               PExecPlanFragmentResult* 
response,
                                               google::protobuf::Closure* done) 
{
-    auto span = telemetry::start_rpc_server_span("exec_plan_fragment", 
controller);
+    auto span = telemetry::start_rpc_server_span("exec_plan_fragment", 
cntl_base);
     auto scope = OpentelemetryScope {span};
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
@@ -241,95 +199,67 @@ void 
PInternalServiceImpl::exec_plan_fragment(google::protobuf::RpcController* c
     st.to_protobuf(response->mutable_status());
 }
 
-void 
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
 controller,
+void 
PInternalServiceImpl::exec_plan_fragment_prepare(google::protobuf::RpcController*
 cntl_base,
                                                       const 
PExecPlanFragmentRequest* request,
                                                       PExecPlanFragmentResult* 
response,
                                                       
google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.offer([this, controller, request, response, 
done]() {
-        exec_plan_fragment(controller, request, response, done);
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
-    }
+    exec_plan_fragment(cntl_base, request, response, done);
 }
 
 void 
PInternalServiceImpl::exec_plan_fragment_start(google::protobuf::RpcController* 
controller,
                                                     const 
PExecPlanFragmentStartRequest* request,
                                                     PExecPlanFragmentResult* 
result,
                                                     google::protobuf::Closure* 
done) {
-    bool ret = _light_work_pool.offer([this, controller, request, result, 
done]() {
-        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
-        auto scope = OpentelemetryScope {span};
-        brpc::ClosureGuard closure_guard(done);
-        auto st = _exec_env->fragment_mgr()->start_query_execution(request);
-        st.to_protobuf(result->mutable_status());
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
-    }
+    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
controller);
+    auto scope = OpentelemetryScope {span};
+    brpc::ClosureGuard closure_guard(done);
+    auto st = _exec_env->fragment_mgr()->start_query_execution(request);
+    st.to_protobuf(result->mutable_status());
 }
 
-void 
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* 
controller,
+void 
PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcController* 
cntl_base,
                                                    const 
PTabletWriterAddBlockRequest* request,
                                                    
PTabletWriterAddBlockResult* response,
                                                    google::protobuf::Closure* 
done) {
-    bool ret = _heavy_work_pool.offer([this, controller, request, response, 
done]() {
-        // TODO(zxy) delete in 1.2 version
-        google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-        
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
+    // TODO(zxy) delete in 1.2 version
+    google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, 
cntl);
 
-        _tablet_writer_add_block(controller, request, response, new_done);
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
-    }
+    _tablet_writer_add_block(cntl_base, request, response, new_done);
 }
 
 void PInternalServiceImpl::tablet_writer_add_block_by_http(
-        google::protobuf::RpcController* controller, const 
::doris::PEmptyRequest* request,
+        google::protobuf::RpcController* cntl_base, const 
::doris::PEmptyRequest* request,
         PTabletWriterAddBlockResult* response, google::protobuf::Closure* 
done) {
-    bool ret = _heavy_work_pool.offer([this, controller, response, done]() {
-        PTabletWriterAddBlockRequest* new_request = new 
PTabletWriterAddBlockRequest();
-        google::protobuf::Closure* new_done =
-                new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, 
done);
-        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-        Status st = 
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(
-                new_request, cntl);
-        if (st.ok()) {
-            _tablet_writer_add_block(controller, new_request, response, 
new_done);
-        } else {
-            st.to_protobuf(response->mutable_status());
-        }
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    PTabletWriterAddBlockRequest* new_request = new 
PTabletWriterAddBlockRequest();
+    google::protobuf::Closure* new_done =
+            new NewHttpClosure<PTabletWriterAddBlockRequest>(new_request, 
done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = 
attachment_extract_request_contain_block<PTabletWriterAddBlockRequest>(new_request,
+                                                                               
        cntl);
+    if (st.ok()) {
+        _tablet_writer_add_block(cntl_base, new_request, response, new_done);
+    } else {
+        st.to_protobuf(response->mutable_status());
     }
 }
 
-void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* 
controller,
+void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcController* 
cntl_base,
                                                     const 
PTabletWriterAddBlockRequest* request,
                                                     
PTabletWriterAddBlockResult* response,
                                                     google::protobuf::Closure* 
done) {
+    VLOG_RPC << "tablet writer add block, id=" << request->id()
+             << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id()
+             << ", current_queued_size=" << 
_tablet_worker_pool.get_queue_size();
     int64_t submit_task_time_ns = MonotonicNanos();
-    bool ret = _heavy_work_pool.offer([request, response, done, 
submit_task_time_ns, this]() {
+    _tablet_worker_pool.offer([request, response, done, submit_task_time_ns, 
this]() {
         int64_t wait_execution_time_ns = MonotonicNanos() - 
submit_task_time_ns;
         brpc::ClosureGuard closure_guard(done);
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
+
             auto st = _exec_env->load_channel_mgr()->add_batch(*request, 
response);
             if (!st.ok()) {
                 LOG(WARNING) << "tablet writer add block failed, message=" << 
st.get_error_msg()
@@ -342,12 +272,6 @@ void 
PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
         response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
         response->set_wait_execution_time_us(wait_execution_time_ns / 
NANOS_PER_MICRO);
     });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
-    }
 }
 
 void 
PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcController* 
cntl_base,
@@ -379,13 +303,13 @@ void 
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
                                                     
PTabletWriterAddBatchResult* response,
                                                     google::protobuf::Closure* 
done) {
     VLOG_RPC << "tablet writer add batch, id=" << request->id()
-             << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id();
+             << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id()
+             << ", current_queued_size=" << 
_tablet_worker_pool.get_queue_size();
     // add batch maybe cost a lot of time, and this callback thread will be 
held.
     // this will influence query execution, because the pthreads under bthread 
may be
     // exhausted, so we put this to a local thread pool to process
     int64_t submit_task_time_ns = MonotonicNanos();
-    bool ret = _heavy_work_pool.offer([cntl_base, request, response, done, 
submit_task_time_ns,
-                                       this]() {
+    _tablet_worker_pool.offer([cntl_base, request, response, done, 
submit_task_time_ns, this]() {
         int64_t wait_execution_time_ns = MonotonicNanos() - 
submit_task_time_ns;
         brpc::ClosureGuard closure_guard(done);
         int64_t execution_time_ns = 0;
@@ -408,32 +332,20 @@ void 
PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
         response->set_execution_time_us(execution_time_ns / NANOS_PER_MICRO);
         response->set_wait_execution_time_us(wait_execution_time_ns / 
NANOS_PER_MICRO);
     });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
-    }
 }
 
 void 
PInternalServiceImpl::tablet_writer_cancel(google::protobuf::RpcController* 
controller,
                                                 const 
PTabletWriterCancelRequest* request,
                                                 PTabletWriterCancelResult* 
response,
                                                 google::protobuf::Closure* 
done) {
-    bool ret = _light_work_pool.offer([this, request, done]() {
-        VLOG_RPC << "tablet writer cancel, id=" << request->id()
-                 << ", index_id=" << request->index_id() << ", sender_id=" << 
request->sender_id();
-        brpc::ClosureGuard closure_guard(done);
-        auto st = _exec_env->load_channel_mgr()->cancel(*request);
-        if (!st.ok()) {
-            LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
-                         << ", index_id=" << request->index_id()
-                         << ", sender_id=" << request->sender_id();
-        }
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
+    VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" 
<< request->index_id()
+             << ", sender_id=" << request->sender_id();
+    brpc::ClosureGuard closure_guard(done);
+    auto st = _exec_env->load_channel_mgr()->cancel(*request);
+    if (!st.ok()) {
+        LOG(WARNING) << "tablet writer cancel failed, id=" << request->id()
+                     << ", index_id=" << request->index_id()
+                     << ", sender_id=" << request->sender_id();
     }
 }
 
@@ -465,404 +377,309 @@ Status PInternalServiceImpl::_exec_plan_fragment(const 
std::string& ser_request,
     }
 }
 
-void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
controller,
+void 
PInternalServiceImpl::cancel_plan_fragment(google::protobuf::RpcController* 
cntl_base,
                                                 const 
PCancelPlanFragmentRequest* request,
                                                 PCancelPlanFragmentResult* 
result,
                                                 google::protobuf::Closure* 
done) {
-    bool ret = _light_work_pool.offer([this, controller, request, result, 
done]() {
-        auto span = 
telemetry::start_rpc_server_span("exec_plan_fragment_start", controller);
-        auto scope = OpentelemetryScope {span};
-        brpc::ClosureGuard closure_guard(done);
-        TUniqueId tid;
-        tid.__set_hi(request->finst_id().hi());
-        tid.__set_lo(request->finst_id().lo());
-
-        Status st = Status::OK();
-        if (request->has_cancel_reason()) {
-            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid)
-                      << ", reason: " << request->cancel_reason();
-            _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
-        } else {
-            LOG(INFO) << "cancel fragment, fragment_instance_id=" << 
print_id(tid);
-            _exec_env->fragment_mgr()->cancel(tid);
-        }
-        // TODO: the logic seems useless, cancel only return Status::OK. 
remove it
-        st.to_protobuf(result->mutable_status());
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
+    auto span = telemetry::start_rpc_server_span("exec_plan_fragment_start", 
cntl_base);
+    auto scope = OpentelemetryScope {span};
+    brpc::ClosureGuard closure_guard(done);
+    TUniqueId tid;
+    tid.__set_hi(request->finst_id().hi());
+    tid.__set_lo(request->finst_id().lo());
+
+    Status st;
+    if (request->has_cancel_reason()) {
+        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid)
+                  << ", reason: " << request->cancel_reason();
+        st = _exec_env->fragment_mgr()->cancel(tid, request->cancel_reason());
+    } else {
+        LOG(INFO) << "cancel fragment, fragment_instance_id=" << print_id(tid);
+        st = _exec_env->fragment_mgr()->cancel(tid);
     }
+    if (!st.ok()) {
+        LOG(WARNING) << "cancel plan fragment failed, errmsg=" << 
st.get_error_msg();
+    }
+    st.to_protobuf(result->mutable_status());
 }
 
-void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
controller,
+void PInternalServiceImpl::fetch_data(google::protobuf::RpcController* 
cntl_base,
                                       const PFetchDataRequest* request, 
PFetchDataResult* result,
                                       google::protobuf::Closure* done) {
-    bool ret = _heavy_work_pool.offer([this, controller, request, result, 
done]() {
-        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-        GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
-        _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
-    }
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
+    _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
 }
 
 void PInternalServiceImpl::fetch_table_schema(google::protobuf::RpcController* 
controller,
                                               const PFetchTableSchemaRequest* 
request,
                                               PFetchTableSchemaResult* result,
                                               google::protobuf::Closure* done) 
{
-    bool ret = _heavy_work_pool.offer([request, result, done]() {
-        VLOG_RPC << "fetch table schema";
-        brpc::ClosureGuard closure_guard(done);
-        TFileScanRange file_scan_range;
-        Status st = Status::OK();
-        {
-            const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
-            uint32_t len = request->file_scan_range().size();
-            st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
-            if (!st.ok()) {
-                LOG(WARNING) << "fetch table schema failed, errmsg=" << 
st.get_error_msg();
-                st.to_protobuf(result->mutable_status());
-                return;
-            }
-        }
-        if (file_scan_range.__isset.ranges == false) {
-            st = Status::InternalError("can not get TFileRangeDesc.");
-            st.to_protobuf(result->mutable_status());
-            return;
-        }
-        if (file_scan_range.__isset.params == false) {
-            st = Status::InternalError("can not get TFileScanRangeParams.");
-            st.to_protobuf(result->mutable_status());
-            return;
-        }
-        const TFileRangeDesc& range = file_scan_range.ranges.at(0);
-        const TFileScanRangeParams& params = file_scan_range.params;
-
-        std::unique_ptr<vectorized::GenericReader> reader(nullptr);
-        std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
-        switch (params.format_type) {
-        case TFileFormatType::FORMAT_CSV_PLAIN:
-        case TFileFormatType::FORMAT_CSV_GZ:
-        case TFileFormatType::FORMAT_CSV_BZ2:
-        case TFileFormatType::FORMAT_CSV_LZ4FRAME:
-        case TFileFormatType::FORMAT_CSV_LZOP:
-        case TFileFormatType::FORMAT_CSV_DEFLATE: {
-            // file_slots is no use
-            std::vector<SlotDescriptor*> file_slots;
-            reader.reset(new vectorized::CsvReader(profile.get(), params, 
range, file_slots));
-            break;
-        }
-        case TFileFormatType::FORMAT_PARQUET: {
-            reader.reset(new vectorized::ParquetReader(params, range));
-            break;
-        }
-        case TFileFormatType::FORMAT_ORC: {
-            std::vector<std::string> column_names;
-            reader.reset(new vectorized::OrcReader(params, range, 
column_names, ""));
-            break;
-        }
-        case TFileFormatType::FORMAT_JSON: {
-            std::vector<SlotDescriptor*> file_slots;
-            reader.reset(new vectorized::NewJsonReader(profile.get(), params, 
range, file_slots));
-            break;
-        }
-        default:
-            st = Status::InternalError("Not supported file format in fetch 
table schema: {}",
-                                       params.format_type);
-            st.to_protobuf(result->mutable_status());
-            return;
-        }
-        std::vector<std::string> col_names;
-        std::vector<TypeDescriptor> col_types;
-        st = reader->get_parsed_schema(&col_names, &col_types);
+    VLOG_RPC << "fetch table schema";
+    brpc::ClosureGuard closure_guard(done);
+    TFileScanRange file_scan_range;
+    Status st = Status::OK();
+    {
+        const uint8_t* buf = (const 
uint8_t*)(request->file_scan_range().data());
+        uint32_t len = request->file_scan_range().size();
+        st = deserialize_thrift_msg(buf, &len, false, &file_scan_range);
         if (!st.ok()) {
             LOG(WARNING) << "fetch table schema failed, errmsg=" << 
st.get_error_msg();
             st.to_protobuf(result->mutable_status());
             return;
         }
-        result->set_column_nums(col_names.size());
-        for (size_t idx = 0; idx < col_names.size(); ++idx) {
-            result->add_column_names(col_names[idx]);
-        }
-        for (size_t idx = 0; idx < col_types.size(); ++idx) {
-            PTypeDesc* type_desc = result->add_column_types();
-            col_types[idx].to_protobuf(type_desc);
-        }
+    }
+    if (file_scan_range.__isset.ranges == false) {
+        st = Status::InternalError("can not get TFileRangeDesc.");
         st.to_protobuf(result->mutable_status());
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        result->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        result->mutable_status()->add_error_msgs("fail to offer request to the 
work pool");
+        return;
     }
+    if (file_scan_range.__isset.params == false) {
+        st = Status::InternalError("can not get TFileScanRangeParams.");
+        st.to_protobuf(result->mutable_status());
+        return;
+    }
+    const TFileRangeDesc& range = file_scan_range.ranges.at(0);
+    const TFileScanRangeParams& params = file_scan_range.params;
+
+    std::unique_ptr<vectorized::GenericReader> reader(nullptr);
+    std::unique_ptr<RuntimeProfile> profile(new 
RuntimeProfile("FetchTableSchema"));
+    switch (params.format_type) {
+    case TFileFormatType::FORMAT_CSV_PLAIN:
+    case TFileFormatType::FORMAT_CSV_GZ:
+    case TFileFormatType::FORMAT_CSV_BZ2:
+    case TFileFormatType::FORMAT_CSV_LZ4FRAME:
+    case TFileFormatType::FORMAT_CSV_LZOP:
+    case TFileFormatType::FORMAT_CSV_DEFLATE: {
+        // file_slots is no use
+        std::vector<SlotDescriptor*> file_slots;
+        reader.reset(new vectorized::CsvReader(profile.get(), params, range, 
file_slots));
+        break;
+    }
+    case TFileFormatType::FORMAT_PARQUET: {
+        reader.reset(new vectorized::ParquetReader(params, range));
+        break;
+    }
+    case TFileFormatType::FORMAT_ORC: {
+        std::vector<std::string> column_names;
+        reader.reset(new vectorized::OrcReader(params, range, column_names, 
""));
+        break;
+    }
+    case TFileFormatType::FORMAT_JSON: {
+        std::vector<SlotDescriptor*> file_slots;
+        reader.reset(new vectorized::NewJsonReader(profile.get(), params, 
range, file_slots));
+        break;
+    }
+    default:
+        st = Status::InternalError("Not supported file format in fetch table 
schema: {}",
+                                   params.format_type);
+        st.to_protobuf(result->mutable_status());
+        return;
+    }
+    std::vector<std::string> col_names;
+    std::vector<TypeDescriptor> col_types;
+    st = reader->get_parsed_schema(&col_names, &col_types);
+    if (!st.ok()) {
+        LOG(WARNING) << "fetch table schema failed, errmsg=" << 
st.get_error_msg();
+        st.to_protobuf(result->mutable_status());
+        return;
+    }
+    result->set_column_nums(col_names.size());
+    for (size_t idx = 0; idx < col_names.size(); ++idx) {
+        result->add_column_names(col_names[idx]);
+    }
+    for (size_t idx = 0; idx < col_types.size(); ++idx) {
+        PTypeDesc* type_desc = result->add_column_types();
+        col_types[idx].to_protobuf(type_desc);
+    }
+    st.to_protobuf(result->mutable_status());
 }
 
 void PInternalServiceImpl::get_info(google::protobuf::RpcController* 
controller,
                                     const PProxyRequest* request, 
PProxyResult* response,
                                     google::protobuf::Closure* done) {
-    bool ret = _heavy_work_pool.offer([this, request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        // PProxyRequest is defined in gensrc/proto/internal_service.proto
-        // Currently it supports 2 kinds of requests:
-        // 1. get all kafka partition ids for given topic
-        // 2. get all kafka partition offsets for given topic and timestamp.
-        if (request->has_kafka_meta_request()) {
-            const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
-            if (!kafka_request.partition_id_for_latest_offsets().empty()) {
-                // get latest offsets for specified partition ids
-                std::vector<PIntegerPair> partition_offsets;
-                Status st = _exec_env->routine_load_task_executor()
-                                    ->get_kafka_latest_offsets_for_partitions(
-                                            request->kafka_meta_request(), 
&partition_offsets);
-                if (st.ok()) {
-                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                    for (const auto& entry : partition_offsets) {
-                        PIntegerPair* res = part_offsets->add_offset_times();
-                        res->set_key(entry.key());
-                        res->set_val(entry.val());
-                    }
+    brpc::ClosureGuard closure_guard(done);
+    // PProxyRequest is defined in gensrc/proto/internal_service.proto
+    // Currently it supports 2 kinds of requests:
+    // 1. get all kafka partition ids for given topic
+    // 2. get all kafka partition offsets for given topic and timestamp.
+    if (request->has_kafka_meta_request()) {
+        const PKafkaMetaProxyRequest& kafka_request = 
request->kafka_meta_request();
+        if (!kafka_request.partition_id_for_latest_offsets().empty()) {
+            // get latest offsets for specified partition ids
+            std::vector<PIntegerPair> partition_offsets;
+            Status st = _exec_env->routine_load_task_executor()
+                                ->get_kafka_latest_offsets_for_partitions(
+                                        request->kafka_meta_request(), 
&partition_offsets);
+            if (st.ok()) {
+                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                for (const auto& entry : partition_offsets) {
+                    PIntegerPair* res = part_offsets->add_offset_times();
+                    res->set_key(entry.key());
+                    res->set_val(entry.val());
                 }
-                st.to_protobuf(response->mutable_status());
-                return;
-            } else if (!kafka_request.offset_times().empty()) {
-                // if offset_times() has elements, which means this request is 
to get offset by timestamp.
-                std::vector<PIntegerPair> partition_offsets;
-                Status st = _exec_env->routine_load_task_executor()
-                                    ->get_kafka_partition_offsets_for_times(
-                                            request->kafka_meta_request(), 
&partition_offsets);
-                if (st.ok()) {
-                    PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
-                    for (const auto& entry : partition_offsets) {
-                        PIntegerPair* res = part_offsets->add_offset_times();
-                        res->set_key(entry.key());
-                        res->set_val(entry.val());
-                    }
+            }
+            st.to_protobuf(response->mutable_status());
+            return;
+        } else if (!kafka_request.offset_times().empty()) {
+            // if offset_times() has elements, which means this request is to 
get offset by timestamp.
+            std::vector<PIntegerPair> partition_offsets;
+            Status st =
+                    
_exec_env->routine_load_task_executor()->get_kafka_partition_offsets_for_times(
+                            request->kafka_meta_request(), &partition_offsets);
+            if (st.ok()) {
+                PKafkaPartitionOffsets* part_offsets = 
response->mutable_partition_offsets();
+                for (const auto& entry : partition_offsets) {
+                    PIntegerPair* res = part_offsets->add_offset_times();
+                    res->set_key(entry.key());
+                    res->set_val(entry.val());
                 }
-                st.to_protobuf(response->mutable_status());
-                return;
-            } else {
-                // get partition ids of topic
-                std::vector<int32_t> partition_ids;
-                Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
-                        request->kafka_meta_request(), &partition_ids);
-                if (st.ok()) {
-                    PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
-                    for (int32_t id : partition_ids) {
-                        kafka_result->add_partition_ids(id);
-                    }
+            }
+            st.to_protobuf(response->mutable_status());
+            return;
+        } else {
+            // get partition ids of topic
+            std::vector<int32_t> partition_ids;
+            Status st = 
_exec_env->routine_load_task_executor()->get_kafka_partition_meta(
+                    request->kafka_meta_request(), &partition_ids);
+            if (st.ok()) {
+                PKafkaMetaProxyResult* kafka_result = 
response->mutable_kafka_meta_result();
+                for (int32_t id : partition_ids) {
+                    kafka_result->add_partition_ids(id);
                 }
-                st.to_protobuf(response->mutable_status());
-                return;
             }
+            st.to_protobuf(response->mutable_status());
+            return;
         }
-        Status::OK().to_protobuf(response->mutable_status());
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
+    Status::OK().to_protobuf(response->mutable_status());
 }
 
 void PInternalServiceImpl::update_cache(google::protobuf::RpcController* 
controller,
                                         const PUpdateCacheRequest* request,
                                         PCacheResponse* response, 
google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.offer([this, request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        _exec_env->result_cache()->update(request, response);
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->set_status(PCacheStatus::CANCELED);
-    }
+    brpc::ClosureGuard closure_guard(done);
+    _exec_env->result_cache()->update(request, response);
 }
 
 void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* 
controller,
                                        const PFetchCacheRequest* request, 
PFetchCacheResult* result,
                                        google::protobuf::Closure* done) {
-    bool ret = _heavy_work_pool.offer([this, request, result, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        _exec_env->result_cache()->fetch(request, result);
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        result->set_status(PCacheStatus::CANCELED);
-    }
+    brpc::ClosureGuard closure_guard(done);
+    _exec_env->result_cache()->fetch(request, result);
 }
 
 void PInternalServiceImpl::clear_cache(google::protobuf::RpcController* 
controller,
                                        const PClearCacheRequest* request, 
PCacheResponse* response,
                                        google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.offer([this, request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        _exec_env->result_cache()->clear(request, response);
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->set_status(PCacheStatus::CANCELED);
-    }
+    brpc::ClosureGuard closure_guard(done);
+    _exec_env->result_cache()->clear(request, response);
 }
 
 void PInternalServiceImpl::merge_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PMergeFilterRequest* 
request,
                                         ::doris::PMergeFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.offer([this, controller, request, response, 
done]() {
-        brpc::ClosureGuard closure_guard(done);
-        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-        Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
-        if (!st.ok()) {
-            LOG(WARNING) << "merge meet error" << st.to_string();
-        }
-        st.to_protobuf(response->mutable_status());
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    brpc::ClosureGuard closure_guard(done);
+    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+    Status st = _exec_env->fragment_mgr()->merge_filter(request, 
&zero_copy_input_stream);
+    if (!st.ok()) {
+        LOG(WARNING) << "merge meet error" << st.to_string();
     }
+    st.to_protobuf(response->mutable_status());
 }
 
 void PInternalServiceImpl::apply_filter(::google::protobuf::RpcController* 
controller,
                                         const ::doris::PPublishFilterRequest* 
request,
                                         ::doris::PPublishFilterResponse* 
response,
                                         ::google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.offer([this, controller, request, response, 
done]() {
-        brpc::ClosureGuard closure_guard(done);
-        auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
-        butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
-        UniqueId unique_id(request->query_id());
-        VLOG_NOTICE << "rpc apply_filter recv";
-        Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
-        if (!st.ok()) {
-            LOG(WARNING) << "apply filter meet error: " << st.to_string();
-        }
-        st.to_protobuf(response->mutable_status());
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    brpc::ClosureGuard closure_guard(done);
+    auto attachment = 
static_cast<brpc::Controller*>(controller)->request_attachment();
+    butil::IOBufAsZeroCopyInputStream zero_copy_input_stream(attachment);
+    UniqueId unique_id(request->query_id());
+    VLOG_NOTICE << "rpc apply_filter recv";
+    Status st = _exec_env->fragment_mgr()->apply_filter(request, 
&zero_copy_input_stream);
+    if (!st.ok()) {
+        LOG(WARNING) << "apply filter meet error: " << st.to_string();
     }
+    st.to_protobuf(response->mutable_status());
 }
 
 void PInternalServiceImpl::send_data(google::protobuf::RpcController* 
controller,
                                      const PSendDataRequest* request, 
PSendDataResult* response,
                                      google::protobuf::Closure* done) {
-    bool ret = _heavy_work_pool.offer([this, request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        TUniqueId fragment_instance_id;
-        fragment_instance_id.hi = request->fragment_instance_id().hi();
-        fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-        if (pipe == nullptr) {
-            response->mutable_status()->set_status_code(1);
-            response->mutable_status()->add_error_msgs("pipe is null");
-        } else {
-            for (int i = 0; i < request->data_size(); ++i) {
-                PDataRow* row = new PDataRow();
-                row->CopyFrom(request->data(i));
-                pipe->append_and_flush(reinterpret_cast<char*>(&row), 
sizeof(row),
-                                       sizeof(row) + row->ByteSizeLong());
-            }
-            response->mutable_status()->set_status_code(0);
+    brpc::ClosureGuard closure_guard(done);
+    TUniqueId fragment_instance_id;
+    fragment_instance_id.hi = request->fragment_instance_id().hi();
+    fragment_instance_id.lo = request->fragment_instance_id().lo();
+    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+    if (pipe == nullptr) {
+        response->mutable_status()->set_status_code(1);
+        response->mutable_status()->add_error_msgs("pipe is null");
+    } else {
+        for (int i = 0; i < request->data_size(); ++i) {
+            PDataRow* row = new PDataRow();
+            row->CopyFrom(request->data(i));
+            pipe->append_and_flush(reinterpret_cast<char*>(&row), sizeof(row),
+                                   sizeof(row) + row->ByteSizeLong());
         }
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+        response->mutable_status()->set_status_code(0);
     }
 }
 
 void PInternalServiceImpl::commit(google::protobuf::RpcController* controller,
                                   const PCommitRequest* request, 
PCommitResult* response,
                                   google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.offer([this, request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        TUniqueId fragment_instance_id;
-        fragment_instance_id.hi = request->fragment_instance_id().hi();
-        fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-        if (pipe == nullptr) {
-            response->mutable_status()->set_status_code(1);
-            response->mutable_status()->add_error_msgs("pipe is null");
-        } else {
-            pipe->finish();
-            response->mutable_status()->set_status_code(0);
-        }
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    brpc::ClosureGuard closure_guard(done);
+    TUniqueId fragment_instance_id;
+    fragment_instance_id.hi = request->fragment_instance_id().hi();
+    fragment_instance_id.lo = request->fragment_instance_id().lo();
+    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+    if (pipe == nullptr) {
+        response->mutable_status()->set_status_code(1);
+        response->mutable_status()->add_error_msgs("pipe is null");
+    } else {
+        pipe->finish();
+        response->mutable_status()->set_status_code(0);
     }
 }
 
 void PInternalServiceImpl::rollback(google::protobuf::RpcController* 
controller,
                                     const PRollbackRequest* request, 
PRollbackResult* response,
                                     google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.offer([this, request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        TUniqueId fragment_instance_id;
-        fragment_instance_id.hi = request->fragment_instance_id().hi();
-        fragment_instance_id.lo = request->fragment_instance_id().lo();
-
-        auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
-        if (pipe == nullptr) {
-            response->mutable_status()->set_status_code(1);
-            response->mutable_status()->add_error_msgs("pipe is null");
-        } else {
-            pipe->cancel("rollback");
-            response->mutable_status()->set_status_code(0);
-        }
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    brpc::ClosureGuard closure_guard(done);
+    TUniqueId fragment_instance_id;
+    fragment_instance_id.hi = request->fragment_instance_id().hi();
+    fragment_instance_id.lo = request->fragment_instance_id().lo();
+    auto pipe = _exec_env->fragment_mgr()->get_pipe(fragment_instance_id);
+    if (pipe == nullptr) {
+        response->mutable_status()->set_status_code(1);
+        response->mutable_status()->add_error_msgs("pipe is null");
+    } else {
+        pipe->cancel("rollback");
+        response->mutable_status()->set_status_code(0);
     }
 }
 
-void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* 
controller,
+void PInternalServiceImpl::fold_constant_expr(google::protobuf::RpcController* 
cntl_base,
                                               const PConstantExprRequest* 
request,
                                               PConstantExprResult* response,
                                               google::protobuf::Closure* done) 
{
-    bool ret = _light_work_pool.offer([this, request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        Status st = Status::OK();
+    brpc::ClosureGuard closure_guard(done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+    Status st = Status::OK();
+    if (request->has_request()) {
         st = _fold_constant_expr(request->request(), response);
-        if (!st.ok()) {
-            LOG(WARNING) << "exec fold constant expr failed, errmsg=" << 
st.get_error_msg();
-        }
-        st.to_protobuf(response->mutable_status());
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    } else {
+        // TODO(yangzhengguo) this is just for compatible with old version, 
this should be removed in the release 0.15
+        st = _fold_constant_expr(cntl->request_attachment().to_string(), 
response);
+    }
+    if (!st.ok()) {
+        LOG(WARNING) << "exec fold constant expr failed, errmsg=" << 
st.get_error_msg();
     }
+    st.to_protobuf(response->mutable_status());
 }
 
 Status PInternalServiceImpl::_fold_constant_expr(const std::string& 
ser_request,
@@ -879,48 +696,31 @@ Status PInternalServiceImpl::_fold_constant_expr(const 
std::string& ser_request,
     return FoldConstantExecutor().fold_constant_vexpr(t_request, response);
 }
 
-void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* 
controller,
+void PInternalServiceImpl::transmit_block(google::protobuf::RpcController* 
cntl_base,
                                           const PTransmitDataParams* request,
                                           PTransmitDataResult* response,
                                           google::protobuf::Closure* done) {
-    bool ret = _heavy_work_pool.offer([this, controller, request, response, 
done]() {
-        // TODO(zxy) delete in 1.2 version
-        google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
-        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-        attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
+    // TODO(zxy) delete in 1.2 version
+    google::protobuf::Closure* new_done = new 
NewHttpClosure<PTransmitDataParams>(done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
 
-        _transmit_block(controller, request, response, new_done, Status::OK());
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
-    }
+    _transmit_block(cntl_base, request, response, new_done, Status::OK());
 }
 
-void 
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* 
controller,
+void 
PInternalServiceImpl::transmit_block_by_http(google::protobuf::RpcController* 
cntl_base,
                                                   const PEmptyRequest* request,
                                                   PTransmitDataResult* 
response,
                                                   google::protobuf::Closure* 
done) {
-    bool ret = _heavy_work_pool.offer([this, controller, response, done]() {
-        PTransmitDataParams* new_request = new PTransmitDataParams();
-        google::protobuf::Closure* new_done =
-                new NewHttpClosure<PTransmitDataParams>(new_request, done);
-        brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
-        Status st =
-                
attachment_extract_request_contain_block<PTransmitDataParams>(new_request, 
cntl);
-        _transmit_block(controller, new_request, response, new_done, st);
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
-    }
+    PTransmitDataParams* new_request = new PTransmitDataParams();
+    google::protobuf::Closure* new_done =
+            new NewHttpClosure<PTransmitDataParams>(new_request, done);
+    brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+    Status st = 
attachment_extract_request_contain_block<PTransmitDataParams>(new_request, 
cntl);
+    _transmit_block(cntl_base, new_request, response, new_done, st);
 }
 
-void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* 
controller,
+void PInternalServiceImpl::_transmit_block(google::protobuf::RpcController* 
cntl_base,
                                            const PTransmitDataParams* request,
                                            PTransmitDataResult* response,
                                            google::protobuf::Closure* done,
@@ -958,34 +758,25 @@ void 
PInternalServiceImpl::check_rpc_channel(google::protobuf::RpcController* co
                                              const PCheckRPCChannelRequest* 
request,
                                              PCheckRPCChannelResponse* 
response,
                                              google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.offer([request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(0);
-        if (request->data().size() != request->size()) {
+    brpc::ClosureGuard closure_guard(done);
+    response->mutable_status()->set_status_code(0);
+    if (request->data().size() != request->size()) {
+        std::stringstream ss;
+        ss << "data size not same, expected: " << request->size()
+           << ", actual: " << request->data().size();
+        response->mutable_status()->add_error_msgs(ss.str());
+        response->mutable_status()->set_status_code(1);
+
+    } else {
+        Md5Digest digest;
+        digest.update(static_cast<const void*>(request->data().c_str()), 
request->data().size());
+        digest.digest();
+        if (!iequal(digest.hex(), request->md5())) {
             std::stringstream ss;
-            ss << "data size not same, expected: " << request->size()
-               << ", actual: " << request->data().size();
+            ss << "md5 not same, expected: " << request->md5() << ", actual: " 
<< digest.hex();
             response->mutable_status()->add_error_msgs(ss.str());
             response->mutable_status()->set_status_code(1);
-
-        } else {
-            Md5Digest digest;
-            digest.update(static_cast<const void*>(request->data().c_str()),
-                          request->data().size());
-            digest.digest();
-            if (!iequal(digest.hex(), request->md5())) {
-                std::stringstream ss;
-                ss << "md5 not same, expected: " << request->md5() << ", 
actual: " << digest.hex();
-                response->mutable_status()->add_error_msgs(ss.str());
-                response->mutable_status()->set_status_code(1);
-            }
         }
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
     }
 }
 
@@ -993,60 +784,44 @@ void 
PInternalServiceImpl::reset_rpc_channel(google::protobuf::RpcController* co
                                              const PResetRPCChannelRequest* 
request,
                                              PResetRPCChannelResponse* 
response,
                                              google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.offer([request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(0);
-        if (request->all()) {
-            int size = 
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
-            if (size > 0) {
-                std::vector<std::string> endpoints;
-                
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
-                ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
-                *response->mutable_channels() = {endpoints.begin(), 
endpoints.end()};
+    brpc::ClosureGuard closure_guard(done);
+    response->mutable_status()->set_status_code(0);
+    if (request->all()) {
+        int size = 
ExecEnv::GetInstance()->brpc_internal_client_cache()->size();
+        if (size > 0) {
+            std::vector<std::string> endpoints;
+            
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_all(&endpoints);
+            ExecEnv::GetInstance()->brpc_internal_client_cache()->clear();
+            *response->mutable_channels() = {endpoints.begin(), 
endpoints.end()};
+        }
+    } else {
+        for (const std::string& endpoint : request->endpoints()) {
+            if 
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
+                response->mutable_status()->add_error_msgs(endpoint + ": not 
found.");
+                continue;
             }
-        } else {
-            for (const std::string& endpoint : request->endpoints()) {
-                if 
(!ExecEnv::GetInstance()->brpc_internal_client_cache()->exist(endpoint)) {
-                    response->mutable_status()->add_error_msgs(endpoint + ": 
not found.");
-                    continue;
-                }
 
-                if 
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
-                    response->add_channels(endpoint);
-                } else {
-                    response->mutable_status()->add_error_msgs(endpoint + ": 
reset failed.");
-                }
-            }
-            if (request->endpoints_size() != response->channels_size()) {
-                response->mutable_status()->set_status_code(1);
+            if 
(ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(endpoint)) {
+                response->add_channels(endpoint);
+            } else {
+                response->mutable_status()->add_error_msgs(endpoint + ": reset 
failed.");
             }
         }
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+        if (request->endpoints_size() != response->channels_size()) {
+            response->mutable_status()->set_status_code(1);
+        }
     }
 }
 
-void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* 
controller,
+void PInternalServiceImpl::hand_shake(google::protobuf::RpcController* 
cntl_base,
                                       const PHandShakeRequest* request,
                                       PHandShakeResponse* response,
                                       google::protobuf::Closure* done) {
-    bool ret = _light_work_pool.offer([request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        if (request->has_hello()) {
-            response->set_hello(request->hello());
-        }
-        response->mutable_status()->set_status_code(0);
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
+    brpc::ClosureGuard closure_guard(done);
+    if (request->has_hello()) {
+        response->set_hello(request->hello());
     }
+    response->mutable_status()->set_status_code(0);
 }
 
 void PInternalServiceImpl::request_slave_tablet_pull_rowset(
@@ -1061,8 +836,7 @@ void 
PInternalServiceImpl::request_slave_tablet_pull_rowset(
     int64_t brpc_port = request->brpc_port();
     std::string token = request->token();
     int64_t node_id = request->node_id();
-    bool ret = _heavy_work_pool.offer([rowset_meta_pb, host, brpc_port, 
node_id, segments_size,
-                                       http_port, token, rowset_path, this]() {
+    _slave_replica_worker_pool.offer([=]() {
         TabletSharedPtr tablet = 
StorageEngine::instance()->tablet_manager()->get_tablet(
                 rowset_meta_pb.tablet_id(), 
rowset_meta_pb.tablet_schema_hash());
         if (tablet == nullptr) {
@@ -1205,12 +979,6 @@ void 
PInternalServiceImpl::request_slave_tablet_pull_rowset(
         _response_pull_slave_rowset(host, brpc_port, rowset_meta->txn_id(),
                                     rowset_meta->tablet_id(), node_id, true);
     });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
-    }
     Status::OK().to_protobuf(response->mutable_status());
 }
 
@@ -1269,22 +1037,14 @@ void 
PInternalServiceImpl::_response_pull_slave_rowset(const std::string& remote
 void PInternalServiceImpl::response_slave_tablet_pull_rowset(
         google::protobuf::RpcController* controller, const 
PTabletWriteSlaveDoneRequest* request,
         PTabletWriteSlaveDoneResult* response, google::protobuf::Closure* 
done) {
-    bool ret = _heavy_work_pool.offer([request, response, done]() {
-        brpc::ClosureGuard closure_guard(done);
-        VLOG_CRITICAL << "receive the result of slave replica pull rowset from 
slave replica. "
-                         "slave server="
-                      << request->node_id() << ", is_succeed=" << 
request->is_succeed()
-                      << ", tablet_id=" << request->tablet_id() << ", txn_id=" 
<< request->txn_id();
-        
StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
-                request->txn_id(), request->tablet_id(), request->node_id(), 
request->is_succeed());
-        Status::OK().to_protobuf(response->mutable_status());
-    });
-    if (!ret) {
-        LOG(WARNING) << "fail to offer request to the work pool";
-        brpc::ClosureGuard closure_guard(done);
-        response->mutable_status()->set_status_code(TStatusCode::CANCELLED);
-        response->mutable_status()->add_error_msgs("fail to offer request to 
the work pool");
-    }
+    brpc::ClosureGuard closure_guard(done);
+    VLOG_CRITICAL
+            << "receive the result of slave replica pull rowset from slave 
replica. slave server="
+            << request->node_id() << ", is_succeed=" << request->is_succeed()
+            << ", tablet_id=" << request->tablet_id() << ", txn_id=" << 
request->txn_id();
+    StorageEngine::instance()->txn_manager()->finish_slave_tablet_pull_rowset(
+            request->txn_id(), request->tablet_id(), request->node_id(), 
request->is_succeed());
+    Status::OK().to_protobuf(response->mutable_status());
 }
 
 } // namespace doris
diff --git a/be/src/service/internal_service.h 
b/be/src/service/internal_service.h
index e5855d98f3..3ea3655974 100644
--- a/be/src/service/internal_service.h
+++ b/be/src/service/internal_service.h
@@ -194,13 +194,8 @@ private:
 
 private:
     ExecEnv* _exec_env;
-
-    // every brpc service request should put into thread pool
-    // the reason see issue #16634
-    // define the interface for reading and writing data as heavy interface
-    // otherwise as light interface
-    PriorityThreadPool _heavy_work_pool;
-    PriorityThreadPool _light_work_pool;
+    PriorityThreadPool _tablet_worker_pool;
+    PriorityThreadPool _slave_replica_worker_pool;
 };
 
 } // namespace doris
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 648414f3e1..da02613000 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -212,16 +212,6 @@ public:
     IntCounter* upload_rowset_count;
     IntCounter* upload_fail_count;
 
-    UIntGauge* light_work_pool_queue_size;
-    UIntGauge* heavy_work_pool_queue_size;
-    UIntGauge* heavy_work_active_threads;
-    UIntGauge* light_work_active_threads;
-
-    UIntGauge* heavy_work_pool_max_queue_size;
-    UIntGauge* light_work_pool_max_queue_size;
-    UIntGauge* heavy_work_max_threads;
-    UIntGauge* light_work_max_threads;
-
     static DorisMetrics* instance() {
         static DorisMetrics instance;
         return &instance;
diff --git a/be/src/util/priority_thread_pool.hpp 
b/be/src/util/priority_thread_pool.hpp
index 077b9757c8..32fc637970 100644
--- a/be/src/util/priority_thread_pool.hpp
+++ b/be/src/util/priority_thread_pool.hpp
@@ -54,7 +54,7 @@ public:
     //     queue exceeds this size, subsequent calls to Offer will block until 
there is
     //     capacity available.
     PriorityThreadPool(uint32_t num_threads, uint32_t queue_size, const 
std::string& name)
-            : _work_queue(queue_size), _shutdown(false), _name(name), 
_active_threads(0) {
+            : _work_queue(queue_size), _shutdown(false), _name(name) {
         for (int i = 0; i < num_threads; ++i) {
             _threads.create_thread(
                     
std::bind<void>(std::mem_fn(&PriorityThreadPool::work_thread), this, i));
@@ -100,7 +100,6 @@ public:
     virtual void join() { _threads.join_all(); }
 
     virtual uint32_t get_queue_size() const { return _work_queue.get_size(); }
-    virtual uint32_t get_active_threads() const { return _active_threads; }
 
     // Blocks until the work queue is empty, and then calls shutdown to stop 
the worker
     // threads and Join to wait until they are finished.
@@ -136,9 +135,7 @@ private:
         while (!is_shutdown()) {
             Task task;
             if (_work_queue.blocking_get(&task)) {
-                _active_threads++;
                 task.work_function();
-                _active_threads--;
             }
             if (_work_queue.get_size() == 0) {
                 _empty_cv.notify_all();
@@ -153,7 +150,6 @@ private:
     // Set to true when threads should stop doing work and terminate.
     std::atomic<bool> _shutdown;
     std::string _name;
-    std::atomic<int> _active_threads;
 };
 
 } // namespace doris
diff --git 
a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md 
b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
index 464c59c309..610125abe3 100644
--- a/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
+++ b/docs/zh-CN/docs/admin-manual/maint-monitor/monitor-metrics/metrics.md
@@ -295,12 +295,6 @@ curl http://be_host:webserver_port/metrics?type=json
 |`fragment_thread_pool_queue_size`| | Num | 当前查询执行线程池等待队列的长度 | 
如果大于零,则说明查询线程已耗尽,查询会出现堆积 | P0 |
 |`doris_be_all_rowsets_num`| | Num | 当前所有 rowset 的个数 | | P0 |
 |`doris_be_all_segments_num`| | Num | 当前所有 segment 的个数 | | P0 |
-|`doris_be_heavy_work_max_threads`| | Num | brpc heavy线程池线程个数| | p0 |
-|`doris_be_light_work_max_threads`| | Num | brpc light线程池线程个数| | p0 | 
-|`doris_be_heavy_work_pool_queue_size`| | Num | brpc 
heavy线程池队列最大长度,超过则阻塞提交work| | p0 |
-|`doris_be_light_work_pool_queue_size`| | Num | brpc 
light线程池队列最大长度,超过则阻塞提交work| | p0 |
-|`doris_be_heavy_work_active_threads`| | Num | brpc heavy线程池活跃线程数| | p0 |
-|`doris_be_light_work_active_threads`| | Num | brpc light线程池活跃线程数| | p0 |
 
 ### 机器监控
 
diff --git a/gensrc/proto/internal_service.proto 
b/gensrc/proto/internal_service.proto
index be17516c64..0396e8944b 100644
--- a/gensrc/proto/internal_service.proto
+++ b/gensrc/proto/internal_service.proto
@@ -243,7 +243,6 @@ enum PCacheStatus {
     INVALID_KEY_RANGE = 6;
     DATA_OVERDUE = 7;
     EMPTY_DATA = 8;
-    CANCELED = 9;
 };
 
 enum CacheType {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to