This is an automated email from the ASF dual-hosted git repository.
caiconghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new aeee738 Revert "[Refactor][agent_task] Remove etl mgr and etl job
pool from be (#8635)" (#8666)
aeee738 is described below
commit aeee738af02ab9fdb1b84c6595287b2e919f6690
Author: yiguolei <[email protected]>
AuthorDate: Fri Mar 25 18:32:50 2022 +0800
Revert "[Refactor][agent_task] Remove etl mgr and etl job pool from be
(#8635)" (#8666)
This reverts commit 6bc982c37436acf288f566cf10e084731b80fa44.
---
be/src/agent/agent_server.cpp | 42 +++++
be/src/agent/agent_server.h | 6 +
be/src/runtime/CMakeLists.txt | 1 +
be/src/runtime/etl_job_mgr.cpp | 302 +++++++++++++++++++++++++++++++++++
be/src/runtime/etl_job_mgr.h | 102 ++++++++++++
be/src/runtime/exec_env.h | 5 +
be/src/runtime/exec_env_init.cpp | 12 ++
be/src/service/backend_service.h | 17 ++
be/src/util/doris_metrics.h | 1 +
be/test/runtime/CMakeLists.txt | 1 +
be/test/runtime/data_stream_test.cpp | 9 ++
be/test/runtime/etl_job_mgr_test.cpp | 232 +++++++++++++++++++++++++++
gensrc/thrift/BackendService.thrift | 7 +
13 files changed, 737 insertions(+)
diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index f2d0bdd..a3205f1 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -27,6 +27,7 @@
#include "common/status.h"
#include "gutil/strings/substitute.h"
#include "olap/snapshot_manager.h"
+#include "runtime/etl_job_mgr.h"
using std::string;
using std::vector;
@@ -251,4 +252,45 @@ void AgentServer::publish_cluster_state(TAgentResult&
t_agent_result,
status.to_thrift(&t_agent_result.status);
}
+void AgentServer::submit_etl_task(TAgentResult& t_agent_result,
+ const TMiniLoadEtlTaskRequest& request) {
+ Status status = _exec_env->etl_job_mgr()->start_job(request);
+ auto fragment_instance_id = request.params.params.fragment_instance_id;
+ if (status.ok()) {
+ VLOG_RPC << "success to submit etl task. id=" << fragment_instance_id;
+ } else {
+ VLOG_RPC << "fail to submit etl task. id=" << fragment_instance_id
+ << ", err_msg=" << status.get_error_msg();
+ }
+ status.to_thrift(&t_agent_result.status);
+}
+
+void AgentServer::get_etl_status(TMiniLoadEtlStatusResult& t_agent_result,
+ const TMiniLoadEtlStatusRequest& request) {
+ Status status =
_exec_env->etl_job_mgr()->get_job_state(request.mini_load_id, &t_agent_result);
+ if (!status.ok()) {
+ LOG(WARNING) << "fail to get job state. [id=" << request.mini_load_id
<< "]";
+ }
+
+ VLOG_RPC << "success to get job state. [id=" << request.mini_load_id
+ << ", status=" << t_agent_result.status.status_code
+ << ", etl_state=" << t_agent_result.etl_state << ", files=";
+ for (auto& item : t_agent_result.file_map) {
+ VLOG_RPC << item.first << ":" << item.second << ";";
+ }
+ VLOG_RPC << "]";
+}
+
+void AgentServer::delete_etl_files(TAgentResult& t_agent_result,
+ const TDeleteEtlFilesRequest& request) {
+ Status status = _exec_env->etl_job_mgr()->erase_job(request);
+ if (!status.ok()) {
+ LOG(WARNING) << "fail to delete etl files. because " <<
status.get_error_msg()
+ << " with request " << request;
+ }
+
+ VLOG_RPC << "success to delete etl files. request=" << request;
+ status.to_thrift(&t_agent_result.status);
+}
+
} // namespace doris
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index f4300f2..5ca15e5 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -47,6 +47,12 @@ public:
// TODO(lingbin): This method is deprecated, should be removed later.
void publish_cluster_state(TAgentResult& agent_result, const
TAgentPublishRequest& request);
+ // Multi-Load will still use the following 3 methods for now.
+ void submit_etl_task(TAgentResult& agent_result, const
TMiniLoadEtlTaskRequest& request);
+ void get_etl_status(TMiniLoadEtlStatusResult& agent_result,
+ const TMiniLoadEtlStatusRequest& request);
+ void delete_etl_files(TAgentResult& result, const TDeleteEtlFilesRequest&
request);
+
private:
DISALLOW_COPY_AND_ASSIGN(AgentServer);
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index f9dafab..414f405 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -60,6 +60,7 @@ set(RUNTIME_FILES
qsorter.cpp
fragment_mgr.cpp
dpp_sink_internal.cpp
+ etl_job_mgr.cpp
load_path_mgr.cpp
types.cpp
tmp_file_mgr.cc
diff --git a/be/src/runtime/etl_job_mgr.cpp b/be/src/runtime/etl_job_mgr.cpp
new file mode 100644
index 0000000..ce4029b
--- /dev/null
+++ b/be/src/runtime/etl_job_mgr.cpp
@@ -0,0 +1,302 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/etl_job_mgr.h"
+
+#include <filesystem>
+#include <functional>
+
+#include "gen_cpp/FrontendService.h"
+#include "gen_cpp/HeartbeatService_types.h"
+#include "gen_cpp/MasterService_types.h"
+#include "gen_cpp/Status_types.h"
+#include "gen_cpp/Types_types.h"
+#include "runtime/client_cache.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+#include "runtime/plan_fragment_executor.h"
+#include "runtime/runtime_state.h"
+#include "service/backend_options.h"
+#include "util/file_utils.h"
+#include "util/uid_util.h"
+
+namespace doris {
+
+#define VLOG_ETL VLOG_CRITICAL
+
+std::string EtlJobMgr::to_http_path(const std::string& file_name) {
+ std::stringstream url;
+ url << "http://" << BackendOptions::get_localhost() << ":" <<
config::webserver_port
+ << "/api/_download_load?"
+ << "token=" << _exec_env->token() << "&file=" << file_name;
+ return url.str();
+}
+
+std::string EtlJobMgr::to_load_error_http_path(const std::string& file_name) {
+ std::stringstream url;
+ url << "http://" << BackendOptions::get_localhost() << ":" <<
config::webserver_port
+ << "/api/_load_error_log?"
+ << "file=" << file_name;
+ return url.str();
+}
+
+const std::string DPP_NORMAL_ALL = "dpp.norm.ALL";
+const std::string DPP_ABNORMAL_ALL = "dpp.abnorm.ALL";
+const std::string ERROR_FILE_PREFIX = "error_log";
+
+EtlJobMgr::EtlJobMgr(ExecEnv* exec_env)
+ : _exec_env(exec_env), _success_jobs(5000), _failed_jobs(5000) {}
+
+EtlJobMgr::~EtlJobMgr() {}
+
+Status EtlJobMgr::init() {
+ return Status::OK();
+}
+
+Status EtlJobMgr::start_job(const TMiniLoadEtlTaskRequest& req) {
+ const TUniqueId& id = req.params.params.fragment_instance_id;
+ std::lock_guard<std::mutex> l(_lock);
+ auto it = _running_jobs.find(id);
+ if (it != _running_jobs.end()) {
+ // Already have this job, return what???
+ LOG(INFO) << "Duplicated etl job(" << id << ")";
+ return Status::OK();
+ }
+
+ // If already success, we return Status::OK()
+ // and wait master ask me success information
+ if (_success_jobs.exists(id)) {
+ // Already success
+ LOG(INFO) << "Already successful etl job(" << id << ")";
+ return Status::OK();
+ }
+
+ RETURN_IF_ERROR(_exec_env->fragment_mgr()->exec_plan_fragment(
+ req.params, std::bind<void>(&EtlJobMgr::finalize_job, this,
std::placeholders::_1)));
+
+ // redo this job if failed before
+ if (_failed_jobs.exists(id)) {
+ _failed_jobs.erase(id);
+ }
+
+ VLOG_ETL << "Job id(" << id << ") insert to EtlJobMgr.";
+ _running_jobs.insert(id);
+
+ return Status::OK();
+}
+
+void EtlJobMgr::report_to_master(PlanFragmentExecutor* executor) {
+ TUpdateMiniEtlTaskStatusRequest request;
+ RuntimeState* state = executor->runtime_state();
+ request.protocolVersion = FrontendServiceVersion::V1;
+ request.etlTaskId = state->fragment_instance_id();
+ Status status = get_job_state(state->fragment_instance_id(),
&request.etlTaskStatus);
+ if (!status.ok()) {
+ return;
+ }
+ const TNetworkAddress& master_address =
_exec_env->master_info()->network_address;
+ FrontendServiceConnection client(_exec_env->frontend_client_cache(),
master_address,
+ config::thrift_rpc_timeout_ms, &status);
+ if (!status.ok()) {
+ std::stringstream ss;
+ ss << "Connect master failed, with address(" <<
master_address.hostname << ":"
+ << master_address.port << ")";
+ LOG(WARNING) << ss.str();
+ return;
+ }
+ TFeResult res;
+ try {
+ try {
+ client->updateMiniEtlTaskStatus(res, request);
+ } catch (apache::thrift::transport::TTransportException& e) {
+ LOG(WARNING) << "Retrying report etl jobs status to master(" <<
master_address.hostname
+ << ":" << master_address.port << ") because: " <<
e.what();
+ status = client.reopen(config::thrift_rpc_timeout_ms);
+ if (!status.ok()) {
+ LOG(WARNING) << "Client repoen failed. with address(" <<
master_address.hostname
+ << ":" << master_address.port << ")";
+ return;
+ }
+ client->updateMiniEtlTaskStatus(res, request);
+ }
+ } catch (apache::thrift::TException& e) {
+ // failed when retry.
+ // reopen to disable this connection
+ client.reopen(config::thrift_rpc_timeout_ms);
+ std::stringstream ss;
+ ss << "Report etl task to master(" << master_address.hostname << ":"
<< master_address.port
+ << ") failed because: " << e.what();
+ LOG(WARNING) << ss.str();
+ }
+ // TODO(lingbin): check status of 'res' here.
+ // because there are some checks in updateMiniEtlTaskStatus, for example
max_filter_ratio.
+ LOG(INFO) << "Successfully report elt job status to master.id=" <<
print_id(request.etlTaskId);
+}
+
+void EtlJobMgr::finalize_job(PlanFragmentExecutor* executor) {
+ EtlJobResult result;
+
+ RuntimeState* state = executor->runtime_state();
+ if (executor->status().ok()) {
+ // Get files
+ for (auto& it : state->output_files()) {
+ int64_t file_size = std::filesystem::file_size(it);
+ result.file_map[to_http_path(it)] = file_size;
+ }
+ // set statistics
+ result.process_normal_rows = state->num_rows_load_success();
+ result.process_abnormal_rows = state->num_rows_load_filtered();
+ } else {
+ // get debug path
+ result.process_normal_rows = state->num_rows_load_success();
+ result.process_abnormal_rows = state->num_rows_load_filtered();
+ }
+
+ result.debug_path = state->get_error_log_file_path();
+
+ finish_job(state->fragment_instance_id(), executor->status(), result);
+
+ // Try to report this finished task to master
+ report_to_master(executor);
+}
+
+Status EtlJobMgr::cancel_job(const TUniqueId& id) {
+ std::lock_guard<std::mutex> l(_lock);
+ auto it = _running_jobs.find(id);
+ if (it == _running_jobs.end()) {
+ // Nothing to do
+ LOG(INFO) << "No such job id, just print to info " << id;
+ return Status::OK();
+ }
+ _running_jobs.erase(it);
+ VLOG_ETL << "id(" << id << ") have been removed from EtlJobMgr.";
+ EtlJobCtx job_ctx;
+ job_ctx.finish_status = Status::Cancelled("Cancelled");
+ _failed_jobs.put(id, job_ctx);
+ return Status::OK();
+}
+
+Status EtlJobMgr::finish_job(const TUniqueId& id, const Status& finish_status,
+ const EtlJobResult& result) {
+ std::lock_guard<std::mutex> l(_lock);
+
+ auto it = _running_jobs.find(id);
+ if (it == _running_jobs.end()) {
+ std::stringstream ss;
+ ss << "Unknown job id(" << id << ").";
+ return Status::InternalError(ss.str());
+ }
+ _running_jobs.erase(it);
+
+ EtlJobCtx ctx;
+ ctx.finish_status = finish_status;
+ ctx.result = result;
+ if (finish_status.ok()) {
+ _success_jobs.put(id, ctx);
+ } else {
+ _failed_jobs.put(id, ctx);
+ }
+
+ VLOG_ETL << "Move job(" << id << ") from running to "
+ << (finish_status.ok() ? "success jobs" : "failed jobs");
+
+ return Status::OK();
+}
+
+Status EtlJobMgr::get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult*
result) {
+ std::lock_guard<std::mutex> l(_lock);
+ auto it = _running_jobs.find(id);
+ if (it != _running_jobs.end()) {
+ result->status.__set_status_code(TStatusCode::OK);
+ result->__set_etl_state(TEtlState::RUNNING);
+ return Status::OK();
+ }
+ // Successful
+ if (_success_jobs.exists(id)) {
+ EtlJobCtx ctx;
+ _success_jobs.get(id, &ctx);
+ result->status.__set_status_code(TStatusCode::OK);
+ result->__set_etl_state(TEtlState::FINISHED);
+ result->__set_file_map(ctx.result.file_map);
+
+ // set counter
+ std::map<std::string, std::string> counter;
+ counter[DPP_NORMAL_ALL] =
std::to_string(ctx.result.process_normal_rows);
+ counter[DPP_ABNORMAL_ALL] =
std::to_string(ctx.result.process_abnormal_rows);
+ result->__set_counters(counter);
+
+ if (!ctx.result.debug_path.empty()) {
+
result->__set_tracking_url(to_load_error_http_path(ctx.result.debug_path));
+ }
+ return Status::OK();
+ }
+ // failed information
+ if (_failed_jobs.exists(id)) {
+ EtlJobCtx ctx;
+ _failed_jobs.get(id, &ctx);
+ result->status.__set_status_code(TStatusCode::OK);
+ result->__set_etl_state(TEtlState::CANCELLED);
+
+ if (!ctx.result.debug_path.empty()) {
+ result->__set_tracking_url(to_http_path(ctx.result.debug_path));
+ }
+ return Status::OK();
+ }
+ // NO this jobs
+ result->status.__set_status_code(TStatusCode::OK);
+ result->__set_etl_state(TEtlState::CANCELLED);
+ return Status::OK();
+}
+
+Status EtlJobMgr::erase_job(const TDeleteEtlFilesRequest& req) {
+ std::lock_guard<std::mutex> l(_lock);
+ const TUniqueId& id = req.mini_load_id;
+ auto it = _running_jobs.find(id);
+ if (it != _running_jobs.end()) {
+ std::stringstream ss;
+ ss << "Job(" << id << ") is running, can not be deleted.";
+ return Status::InternalError(ss.str());
+ }
+ _success_jobs.erase(id);
+ _failed_jobs.erase(id);
+
+ return Status::OK();
+}
+
+void EtlJobMgr::debug(std::stringstream& ss) {
+ // Make things easy
+ std::lock_guard<std::mutex> l(_lock);
+
+ // Debug summary
+ ss << "we have " << _running_jobs.size() << " jobs Running\n";
+ ss << "we have " << _failed_jobs.size() << " jobs Failed\n";
+ ss << "we have " << _success_jobs.size() << " jobs Successful\n";
+ // Debug running jobs
+ for (auto& it : _running_jobs) {
+ ss << "running jobs: " << it << "\n";
+ }
+ // Debug success jobs
+ for (auto& it : _success_jobs) {
+ ss << "successful jobs: " << it.first << "\n";
+ }
+ // Debug failed jobs
+ for (auto& it : _failed_jobs) {
+ ss << "failed jobs: " << it.first << "\n";
+ }
+}
+
+} // namespace doris
diff --git a/be/src/runtime/etl_job_mgr.h b/be/src/runtime/etl_job_mgr.h
new file mode 100644
index 0000000..d930c73
--- /dev/null
+++ b/be/src/runtime/etl_job_mgr.h
@@ -0,0 +1,102 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef DORIS_BE_RUNTIME_ETL_JOB_MGR_H
+#define DORIS_BE_RUNTIME_ETL_JOB_MGR_H
+
+#include <pthread.h>
+
+#include <mutex>
+#include <string>
+#include <unordered_set>
+#include <vector>
+
+#include "common/status.h"
+#include "gen_cpp/Types_types.h"
+#include "http/rest_monitor_iface.h"
+#include "util/hash_util.hpp"
+#include "util/lru_cache.hpp"
+
+namespace doris {
+
+// used to report to master
+struct EtlJobResult {
+ EtlJobResult() : process_normal_rows(0), process_abnormal_rows(0) {}
+ std::string debug_path;
+ std::map<std::string, int64_t> file_map;
+ int64_t process_normal_rows;
+ int64_t process_abnormal_rows;
+};
+
+// used to report to master
+struct EtlJobCtx {
+ Status finish_status;
+ EtlJobResult result;
+};
+
+class TMiniLoadEtlStatusResult;
+class TMiniLoadEtlTaskRequest;
+class ExecEnv;
+class PlanFragmentExecutor;
+class TDeleteEtlFilesRequest;
+
+// manager of all the Etl job
+// used this because master may loop be to check if a load job is finished.
+class EtlJobMgr : public RestMonitorIface {
+public:
+ EtlJobMgr(ExecEnv* exec_env);
+
+ virtual ~EtlJobMgr();
+
+ // make trash directory for collect
+ Status init();
+
+ // Make a job to running state
+ // If this job is successful, return OK
+ // If this job is failed, move this job from _failed_jobs to _running_jobs
+ // Otherwise, put it to _running_jobs
+ Status start_job(const TMiniLoadEtlTaskRequest& req);
+
+ // Make a running job to failed job
+ Status cancel_job(const TUniqueId& id);
+
+ Status finish_job(const TUniqueId& id, const Status& finish_status, const
EtlJobResult& result);
+
+ Status get_job_state(const TUniqueId& id, TMiniLoadEtlStatusResult*
result);
+
+ Status erase_job(const TDeleteEtlFilesRequest& req);
+
+ void finalize_job(PlanFragmentExecutor* executor);
+
+ virtual void debug(std::stringstream& ss);
+
+private:
+ std::string to_http_path(const std::string& file_path);
+ std::string to_load_error_http_path(const std::string& file_path);
+
+ void report_to_master(PlanFragmentExecutor* executor);
+
+ ExecEnv* _exec_env;
+ std::mutex _lock;
+ std::unordered_set<TUniqueId> _running_jobs;
+ LruCache<TUniqueId, EtlJobCtx> _success_jobs;
+ LruCache<TUniqueId, EtlJobCtx> _failed_jobs;
+};
+
+} // namespace doris
+
+#endif
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 5c79423..36c0528 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -38,6 +38,7 @@ class BufferPool;
class CgroupsMgr;
class DataStreamMgr;
class DiskIoMgr;
+class EtlJobMgr;
class EvHttpServer;
class ExternalScanContextMgr;
class FragmentMgr;
@@ -126,11 +127,13 @@ public:
ThreadResourceMgr* thread_mgr() { return _thread_mgr; }
PriorityThreadPool* scan_thread_pool() { return _scan_thread_pool; }
ThreadPool* limited_scan_thread_pool() { return
_limited_scan_thread_pool.get(); }
+ PriorityThreadPool* etl_thread_pool() { return _etl_thread_pool; }
ThreadPool* send_batch_thread_pool() { return
_send_batch_thread_pool.get(); }
CgroupsMgr* cgroups_mgr() { return _cgroups_mgr; }
FragmentMgr* fragment_mgr() { return _fragment_mgr; }
ResultCache* result_cache() { return _result_cache; }
TMasterInfo* master_info() { return _master_info; }
+ EtlJobMgr* etl_job_mgr() { return _etl_job_mgr; }
LoadPathMgr* load_path_mgr() { return _load_path_mgr; }
DiskIoMgr* disk_io_mgr() { return _disk_io_mgr; }
TmpFileMgr* tmp_file_mgr() { return _tmp_file_mgr; }
@@ -206,10 +209,12 @@ private:
std::unique_ptr<ThreadPool> _limited_scan_thread_pool;
std::unique_ptr<ThreadPool> _send_batch_thread_pool;
+ PriorityThreadPool* _etl_thread_pool = nullptr;
CgroupsMgr* _cgroups_mgr = nullptr;
FragmentMgr* _fragment_mgr = nullptr;
ResultCache* _result_cache = nullptr;
TMasterInfo* _master_info = nullptr;
+ EtlJobMgr* _etl_job_mgr = nullptr;
LoadPathMgr* _load_path_mgr = nullptr;
DiskIoMgr* _disk_io_mgr = nullptr;
TmpFileMgr* _tmp_file_mgr = nullptr;
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index cdd3417..0d483ea 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -35,6 +35,7 @@
#include "runtime/client_cache.h"
#include "runtime/data_stream_mgr.h"
#include "runtime/disk_io_mgr.h"
+#include "runtime/etl_job_mgr.h"
#include "runtime/exec_env.h"
#include "runtime/external_scan_context_mgr.h"
#include "runtime/fold_constant_executor.h"
@@ -68,6 +69,7 @@
namespace doris {
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(scanner_thread_pool_queue_size,
MetricUnit::NOUNIT);
+DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(etl_thread_pool_queue_size,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_thread_num,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(send_batch_thread_pool_queue_size,
MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(query_mem_consumption, MetricUnit::BYTES,
"", mem_consumption,
@@ -127,11 +129,14 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
.set_max_queue_size(config::send_batch_thread_pool_queue_size)
.build(&_send_batch_thread_pool);
+ _etl_thread_pool = new PriorityThreadPool(config::etl_thread_pool_size,
+
config::etl_thread_pool_queue_size);
_cgroups_mgr = new CgroupsMgr(this, config::doris_cgroups);
_fragment_mgr = new FragmentMgr(this);
_result_cache = new ResultCache(config::query_cache_max_size_mb,
config::query_cache_elasticity_size_mb);
_master_info = new TMasterInfo();
+ _etl_job_mgr = new EtlJobMgr(this);
_load_path_mgr = new LoadPathMgr(this);
_disk_io_mgr = new DiskIoMgr();
_tmp_file_mgr = new TmpFileMgr(this);
@@ -151,6 +156,7 @@ Status ExecEnv::_init(const std::vector<StorePath>&
store_paths) {
_extdatasource_client_cache->init_metrics("extdatasource");
_result_mgr->init();
_cgroups_mgr->init_cgroups();
+ _etl_job_mgr->init();
Status status = _load_path_mgr->init();
if (!status.ok()) {
LOG(ERROR) << "load path mgr init failed." << status.get_error_msg();
@@ -281,6 +287,9 @@ void ExecEnv::_register_metrics() {
REGISTER_HOOK_METRIC(scanner_thread_pool_queue_size,
[this]() { return
_scan_thread_pool->get_queue_size(); });
+ REGISTER_HOOK_METRIC(etl_thread_pool_queue_size,
+ [this]() { return _etl_thread_pool->get_queue_size();
});
+
REGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num,
[this]() { return
_send_batch_thread_pool->num_threads(); });
@@ -290,6 +299,7 @@ void ExecEnv::_register_metrics() {
void ExecEnv::_deregister_metrics() {
DEREGISTER_HOOK_METRIC(scanner_thread_pool_queue_size);
+ DEREGISTER_HOOK_METRIC(etl_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_thread_num);
DEREGISTER_HOOK_METRIC(send_batch_thread_pool_queue_size);
}
@@ -309,9 +319,11 @@ void ExecEnv::_destroy() {
SAFE_DELETE(_tmp_file_mgr);
SAFE_DELETE(_disk_io_mgr);
SAFE_DELETE(_load_path_mgr);
+ SAFE_DELETE(_etl_job_mgr);
SAFE_DELETE(_master_info);
SAFE_DELETE(_fragment_mgr);
SAFE_DELETE(_cgroups_mgr);
+ SAFE_DELETE(_etl_thread_pool);
SAFE_DELETE(_scan_thread_pool);
SAFE_DELETE(_thread_mgr);
SAFE_DELETE(_broker_client_cache);
diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h
index ca481b5..3c9b3bd 100644
--- a/be/src/service/backend_service.h
+++ b/be/src/service/backend_service.h
@@ -98,6 +98,23 @@ public:
_agent_server->publish_cluster_state(result, request);
}
+ virtual void submit_etl_task(TAgentResult& result,
+ const TMiniLoadEtlTaskRequest& request)
override {
+ VLOG_RPC << "submit_etl_task. request is "
+ << apache::thrift::ThriftDebugString(request).c_str();
+ _agent_server->submit_etl_task(result, request);
+ }
+
+ virtual void get_etl_status(TMiniLoadEtlStatusResult& result,
+ const TMiniLoadEtlStatusRequest& request)
override {
+ _agent_server->get_etl_status(result, request);
+ }
+
+ virtual void delete_etl_files(TAgentResult& result,
+ const TDeleteEtlFilesRequest& request)
override {
+ _agent_server->delete_etl_files(result, request);
+ }
+
// DorisServer service
virtual void exec_plan_fragment(TExecPlanFragmentResult& return_val,
const TExecPlanFragmentParams& params)
override;
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 8f96046..ca5d05c 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -198,6 +198,7 @@ public:
UIntGauge* query_cache_partition_total_count;
UIntGauge* scanner_thread_pool_queue_size;
+ UIntGauge* etl_thread_pool_queue_size;
UIntGauge* add_batch_task_queue_size;
UIntGauge* send_batch_thread_pool_thread_num;
UIntGauge* send_batch_thread_pool_queue_size;
diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt
index d2a0320..06a1214 100644
--- a/be/test/runtime/CMakeLists.txt
+++ b/be/test/runtime/CMakeLists.txt
@@ -38,6 +38,7 @@ ADD_BE_TEST(fragment_mgr_test)
#ADD_BE_TEST(dpp_sink_internal_test)
#ADD_BE_TEST(dpp_sink_test)
#ADD_BE_TEST(data_spliter_test)
+#ADD_BE_TEST(etl_job_mgr_test)
#ADD_BE_TEST(tmp_file_mgr_test)
#ADD_BE_TEST(disk_io_mgr_test)
diff --git a/be/test/runtime/data_stream_test.cpp
b/be/test/runtime/data_stream_test.cpp
index e236666..e6d4a7a 100644
--- a/be/test/runtime/data_stream_test.cpp
+++ b/be/test/runtime/data_stream_test.cpp
@@ -94,6 +94,15 @@ public:
virtual void publish_cluster_state(TAgentResult& return_val,
const TAgentPublishRequest& request) {}
+ virtual void submit_etl_task(TAgentResult& return_val, const
TMiniLoadEtlTaskRequest& request) {
+ }
+
+ virtual void get_etl_status(TMiniLoadEtlStatusResult& return_val,
+ const TMiniLoadEtlStatusRequest& request) {}
+
+ virtual void delete_etl_files(TAgentResult& return_val, const
TDeleteEtlFilesRequest& request) {
+ }
+
virtual void register_pull_load_task(TStatus& _return, const TUniqueId& id,
const int32_t num_senders) {}
diff --git a/be/test/runtime/etl_job_mgr_test.cpp
b/be/test/runtime/etl_job_mgr_test.cpp
new file mode 100644
index 0000000..e8bcec1
--- /dev/null
+++ b/be/test/runtime/etl_job_mgr_test.cpp
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/etl_job_mgr.h"
+
+#include <gtest/gtest.h>
+
+#include "gen_cpp/Types_types.h"
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
+#include "util/cpu_info.h"
+
+namespace doris {
+// Mock fragment mgr
+Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params,
FinishCallback cb) {
+ return Status::OK();
+}
+
+FragmentMgr::FragmentMgr(ExecEnv* exec_env) : _thread_pool(10, 128) {}
+
+FragmentMgr::~FragmentMgr() {}
+
+void FragmentMgr::debug(std::stringstream& ss) {}
+
+class EtlJobMgrTest : public testing::Test {
+public:
+ EtlJobMgrTest() {}
+
+private:
+ ExecEnv _exec_env;
+};
+
+TEST_F(EtlJobMgrTest, NormalCase) {
+ EtlJobMgr mgr(&_exec_env);
+ TUniqueId id;
+ id.lo = 1;
+ id.hi = 1;
+
+ TMiniLoadEtlStatusResult res;
+ TMiniLoadEtlTaskRequest req;
+ TDeleteEtlFilesRequest del_req;
+ del_req.mini_load_id = id;
+ req.params.params.fragment_instance_id = id;
+
+ // make it running
+ ASSERT_TRUE(mgr.start_job(req).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+ // make it finishing
+ EtlJobResult job_result;
+ job_result.file_map["abc"] = 100L;
+ ASSERT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::FINISHED, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+ ASSERT_EQ(1, res.file_map.size());
+ ASSERT_EQ(100, res.file_map["abc"]);
+
+ // erase it
+ ASSERT_TRUE(mgr.erase_job(del_req).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::CANCELLED, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+}
+
+TEST_F(EtlJobMgrTest, DuplicateCase) {
+ EtlJobMgr mgr(&_exec_env);
+ TUniqueId id;
+ id.lo = 1;
+ id.hi = 1;
+
+ TMiniLoadEtlStatusResult res;
+ TMiniLoadEtlTaskRequest req;
+ req.params.params.fragment_instance_id = id;
+
+ // make it running
+ ASSERT_TRUE(mgr.start_job(req).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+ // Put it twice
+ ASSERT_TRUE(mgr.start_job(req).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+}
+
+TEST_F(EtlJobMgrTest, RunAfterSuccess) {
+ EtlJobMgr mgr(&_exec_env);
+ TUniqueId id;
+ id.lo = 1;
+ id.hi = 1;
+
+ TMiniLoadEtlStatusResult res;
+ TMiniLoadEtlTaskRequest req;
+ TDeleteEtlFilesRequest del_req;
+ del_req.mini_load_id = id;
+ req.params.params.fragment_instance_id = id;
+
+ // make it running
+ ASSERT_TRUE(mgr.start_job(req).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+ // make it finishing
+ EtlJobResult job_result;
+ job_result.file_map["abc"] = 100L;
+ ASSERT_TRUE(mgr.finish_job(id, Status::OK(), job_result).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::FINISHED, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+ ASSERT_EQ(1, res.file_map.size());
+ ASSERT_EQ(100, res.file_map["abc"]);
+
+ // Put it twice
+ ASSERT_TRUE(mgr.start_job(req).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::FINISHED, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+ ASSERT_EQ(1, res.file_map.size());
+ ASSERT_EQ(100, res.file_map["abc"]);
+}
+
+TEST_F(EtlJobMgrTest, RunAfterFail) {
+ EtlJobMgr mgr(&_exec_env);
+ TUniqueId id;
+ id.lo = 1;
+ id.hi = 1;
+
+ TMiniLoadEtlStatusResult res;
+ TMiniLoadEtlTaskRequest req;
+ req.params.params.fragment_instance_id = id;
+
+ // make it running
+ ASSERT_TRUE(mgr.start_job(req).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+ // make it finishing
+ EtlJobResult job_result;
+ job_result.debug_path = "abc";
+ ASSERT_TRUE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc error"),
job_result).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::CANCELLED, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+ // Put it twice
+ ASSERT_TRUE(mgr.start_job(req).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+}
+
+TEST_F(EtlJobMgrTest, CancelJob) {
+ EtlJobMgr mgr(&_exec_env);
+ TUniqueId id;
+ id.lo = 1;
+ id.hi = 1;
+
+ TMiniLoadEtlStatusResult res;
+ TMiniLoadEtlTaskRequest req;
+ req.params.params.fragment_instance_id = id;
+
+ // make it running
+ ASSERT_TRUE(mgr.start_job(req).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+ // make it finishing
+ EtlJobResult job_result;
+ job_result.debug_path = "abc";
+ ASSERT_TRUE(mgr.cancel_job(id).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::CANCELLED, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+
+ // Put it twice
+ ASSERT_TRUE(mgr.start_job(req).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::RUNNING, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+}
+
+TEST_F(EtlJobMgrTest, FinishUnknownJob) {
+ EtlJobMgr mgr(&_exec_env);
+ TUniqueId id;
+ id.lo = 1;
+ id.hi = 1;
+
+ TMiniLoadEtlStatusResult res;
+
+ // make it finishing
+ EtlJobResult job_result;
+ job_result.debug_path = "abc";
+ ASSERT_FALSE(mgr.finish_job(id, Status::ThriftRpcError("Thrift rpc
error"), job_result).ok());
+ ASSERT_TRUE(mgr.get_job_state(id, &res).ok());
+ ASSERT_EQ(TEtlState::CANCELLED, res.etl_state);
+ ASSERT_EQ(TStatusCode::OK, res.status.status_code);
+}
+
+} // namespace doris
+
+int main(int argc, char** argv) {
+ std::string conffile = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
+ if (!doris::config::init(conffile.c_str(), false)) {
+ fprintf(stderr, "error read config file. \n");
+ return -1;
+ }
+ doris::CpuInfo::init();
+ ::testing::InitGoogleTest(&argc, argv);
+ return RUN_ALL_TESTS();
+}
diff --git a/gensrc/thrift/BackendService.thrift
b/gensrc/thrift/BackendService.thrift
index 47b5997..4517076 100644
--- a/gensrc/thrift/BackendService.thrift
+++ b/gensrc/thrift/BackendService.thrift
@@ -141,6 +141,13 @@ service BackendService {
AgentService.TAgentResult
publish_cluster_state(1:AgentService.TAgentPublishRequest request);
+ AgentService.TAgentResult
submit_etl_task(1:AgentService.TMiniLoadEtlTaskRequest request);
+
+ AgentService.TMiniLoadEtlStatusResult get_etl_status(
+ 1:AgentService.TMiniLoadEtlStatusRequest request);
+
+ AgentService.TAgentResult
delete_etl_files(1:AgentService.TDeleteEtlFilesRequest request);
+
Status.TStatus submit_export_task(1:TExportTaskRequest request);
PaloInternalService.TExportStatusResult
get_export_status(1:Types.TUniqueId task_id);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]