This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 2acf0938183 branch-3.0: [fix](load) Fix the issue of insert into
select concurrent reporting causing no error url #49669 (#49687)
2acf0938183 is described below
commit 2acf09381836e80097b31a21f13d53819987742d
Author: Xin Liao <[email protected]>
AuthorDate: Mon Mar 31 19:47:09 2025 +0800
branch-3.0: [fix](load) Fix the issue of insert into select concurrent
reporting causing no error url #49669 (#49687)
cherry pick from #49669
---
be/src/pipeline/pipeline_fragment_context.cpp | 26 +++++++++
be/src/pipeline/pipeline_fragment_context.h | 2 +
be/src/runtime/fragment_mgr.cpp | 27 +++------
be/src/runtime/query_context.cpp | 10 ++++
be/src/runtime/query_context.h | 7 +++
be/src/runtime/runtime_state.cpp | 10 ++--
.../test_insert_error_url.groovy | 67 ++++++++++++++++++++++
7 files changed, 125 insertions(+), 24 deletions(-)
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index 0c38cc7f0d6..220474c91e4 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -189,6 +189,10 @@ void PipelineFragmentContext::cancel(const Status reason) {
debug_string());
}
+ if (auto error_url = get_load_error_url(); !error_url.empty()) {
+ _query_ctx->set_load_error_url(error_url);
+ }
+
_query_ctx->cancel(reason, _fragment_id);
if (reason.is<ErrorCode::LIMIT_REACH>()) {
_is_report_on_cancel = false;
@@ -1749,6 +1753,23 @@ void
PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
}
}
+std::string PipelineFragmentContext::get_load_error_url() {
+ if (const auto& str = _runtime_state->get_error_log_file_path();
!str.empty()) {
+ return to_load_error_http_path(str);
+ }
+ for (auto& task_states : _task_runtime_states) {
+ for (auto& task_state : task_states) {
+ if (!task_state) {
+ continue;
+ }
+ if (const auto& str = task_state->get_error_log_file_path();
!str.empty()) {
+ return to_load_error_http_path(str);
+ }
+ }
+ }
+ return "";
+}
+
Status PipelineFragmentContext::send_report(bool done) {
Status exec_status = _query_ctx->exec_status();
// If plan is done successfully, but _is_report_success is false,
@@ -1775,6 +1796,10 @@ Status PipelineFragmentContext::send_report(bool done) {
}
}
+ std::string load_eror_url = _query_ctx->get_load_error_url().empty()
+ ? get_load_error_url()
+ : _query_ctx->get_load_error_url();
+
ReportStatusRequest req {exec_status,
runtime_states,
done || !exec_status.ok(),
@@ -1784,6 +1809,7 @@ Status PipelineFragmentContext::send_report(bool done) {
TUniqueId(),
-1,
_runtime_state.get(),
+ load_eror_url,
[this](const Status& reason) { cancel(reason); }};
return _report_status_cb(
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index 1674afa886d..f119384b2fb 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -123,6 +123,8 @@ public:
}
}
+ std::string get_load_error_url();
+
private:
Status _build_pipelines(ObjectPool* pool, const
doris::TPipelineFragmentParams& request,
const DescriptorTbl& descs, OperatorPtr* root,
PipelinePtr cur_pipe);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index ce37faf91eb..97fef234331 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -456,26 +456,13 @@ void FragmentMgr::coordinator_callback(const
ReportStatusRequest& req) {
params.load_counters.emplace(s_dpp_abnormal_all,
std::to_string(num_rows_load_filtered));
params.load_counters.emplace(s_unselected_rows,
std::to_string(num_rows_load_unselected));
- if (!req.runtime_state->get_error_log_file_path().empty()) {
- std::string error_log_url =
-
to_load_error_http_path(req.runtime_state->get_error_log_file_path());
- LOG(INFO) << "error log file path: " << error_log_url
- << ", query id: " << print_id(req.query_id)
- << ", fragment instance id: " <<
print_id(req.fragment_instance_id);
- params.__set_tracking_url(error_log_url);
- } else if (!req.runtime_states.empty()) {
- for (auto* rs : req.runtime_states) {
- if (!rs->get_error_log_file_path().empty()) {
- std::string error_log_url =
to_load_error_http_path(rs->get_error_log_file_path());
- LOG(INFO) << "error log file path: " << error_log_url
- << ", query id: " << print_id(req.query_id)
- << ", fragment instance id: " <<
print_id(rs->fragment_instance_id());
- params.__set_tracking_url(error_log_url);
- }
- if (rs->wal_id() > 0) {
- params.__set_txn_id(rs->wal_id());
- params.__set_label(rs->import_label());
- }
+ if (!req.load_error_url.empty()) {
+ params.__set_tracking_url(req.load_error_url);
+ }
+ for (auto* rs : req.runtime_states) {
+ if (rs->wal_id() > 0) {
+ params.__set_txn_id(rs->wal_id());
+ params.__set_label(rs->import_label());
}
}
if (!req.runtime_state->export_output_files().empty()) {
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 29a93aa40c1..c2a4cfca565 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -225,6 +225,16 @@ void QueryContext::cancel(Status new_status, int
fragment_id) {
cancel_all_pipeline_context(new_status, fragment_id);
}
+void QueryContext::set_load_error_url(std::string error_url) {
+ std::lock_guard<std::mutex> lock(_error_url_lock);
+ _load_error_url = error_url;
+}
+
+std::string QueryContext::get_load_error_url() {
+ std::lock_guard<std::mutex> lock(_error_url_lock);
+ return _load_error_url;
+}
+
void QueryContext::cancel_all_pipeline_context(const Status& reason, int
fragment_id) {
std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>>
ctx_to_cancel;
{
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index f7d5a515c52..a89ccc461bd 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -57,6 +57,7 @@ struct ReportStatusRequest {
TUniqueId fragment_instance_id;
int backend_num;
RuntimeState* runtime_state;
+ std::string load_error_url;
std::function<void(const Status&)> cancel_fn;
};
@@ -244,6 +245,9 @@ public:
}
}
+ void set_load_error_url(std::string error_url);
+ std::string get_load_error_url();
+
private:
int _timeout_second;
TUniqueId _query_id;
@@ -321,6 +325,9 @@ private:
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
_collect_realtime_query_profile() const;
+ std::mutex _error_url_lock;
+ std::string _load_error_url;
+
public:
// when fragment of pipeline is closed, it will register its profile to
this map by using add_fragment_profile
void add_fragment_profile(
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 6c495fa79c7..5aa25e133d2 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -442,6 +442,11 @@ Status
RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
}
std::string RuntimeState::get_error_log_file_path() {
+ DBUG_EXECUTE_IF("RuntimeState::get_error_log_file_path.block", {
+ if (!_error_log_file_path.empty()) {
+ std::this_thread::sleep_for(std::chrono::seconds(1));
+ }
+ });
std::lock_guard<std::mutex> l(_s3_error_log_file_lock);
if (_s3_error_fs && _error_log_file && _error_log_file->is_open()) {
// close error log file
@@ -450,10 +455,7 @@ std::string RuntimeState::get_error_log_file_path() {
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
// upload error log file to s3
Status st = _s3_error_fs->upload(error_log_absolute_path,
_s3_error_log_file_path);
- if (st.ok()) {
- // remove local error log file
- std::filesystem::remove(error_log_absolute_path);
- } else {
+ if (!st.ok()) {
// upload failed and return local error log file path
LOG(WARNING) << "Fail to upload error file to s3,
error_log_file_path="
<< _error_log_file_path << ", error=" << st;
diff --git
a/regression-test/suites/fault_injection_p0/test_insert_error_url.groovy
b/regression-test/suites/fault_injection_p0/test_insert_error_url.groovy
new file mode 100644
index 00000000000..aa96642ac80
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_insert_error_url.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_error_url", "nonConcurrent") {
+ def tableName = "test_insert_error_url_tbl"
+ sql """drop table if exists ${tableName}"""
+ sql """
+ CREATE TABLE IF NOT EXISTS ${tableName} (
+ L_ORDERKEY INTEGER NOT NULL,
+ L_PARTKEY INTEGER NOT NULL,
+ L_SUPPKEY INTEGER NOT NULL,
+ L_LINENUMBER INTEGER NOT NULL,
+ L_QUANTITY DECIMAL(15,2) NOT NULL,
+ L_EXTENDEDPRICE DECIMAL(15,2) NOT NULL,
+ L_DISCOUNT DECIMAL(15,2) NOT NULL,
+ L_TAX DECIMAL(15,2) NOT NULL,
+ L_RETURNFLAG CHAR(1) NOT NULL,
+ L_LINESTATUS CHAR(1) NOT NULL,
+ L_SHIPDATE DATE NOT NULL,
+ L_COMMITDATE DATE NOT NULL,
+ L_RECEIPTDATE DATE NOT NULL,
+ L_SHIPINSTRUCT CHAR(25) NOT NULL,
+ L_SHIPMODE CHAR(10) NOT NULL,
+ L_COMMENT VARCHAR(44) NOT NULL
+ )
+ UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
+ PARTITION BY RANGE(L_ORDERKEY) (
+ PARTITION p2023 VALUES LESS THAN ("5000000")
+ )
+ DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
+ PROPERTIES (
+ "replication_num" = "1"
+ );
+ """
+
+ try {
+
GetDebugPoint().enableDebugPointForAllBEs("RuntimeState::get_error_log_file_path.block")
+ expectExceptionLike({
+ sql """
+ insert into ${tableName} select * from S3(
+ "uri" =
"http://${getS3BucketName()}.${getS3Endpoint()}/regression/tpch/sf1/lineitem.csv.split01.gz",
+ "s3.access_key"= "${getS3AK()}",
+ "s3.secret_key" = "${getS3SK()}",
+ "s3.region" = "${getS3Region()}",
+ "format" = "csv",
+ "column_separator" = "|"
+ );
+ """
+ }, "error_log")
+ } finally {
+
GetDebugPoint().disableDebugPointForAllBEs("RuntimeState::get_error_log_file_path.block")
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]