This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 2acf0938183 branch-3.0: [fix](load) Fix the issue of insert into 
select concurrent reporting causing no error url #49669 (#49687)
2acf0938183 is described below

commit 2acf09381836e80097b31a21f13d53819987742d
Author: Xin Liao <[email protected]>
AuthorDate: Mon Mar 31 19:47:09 2025 +0800

    branch-3.0: [fix](load) Fix the issue of insert into select concurrent 
reporting causing no error url #49669 (#49687)
    
    cherry pick from #49669
---
 be/src/pipeline/pipeline_fragment_context.cpp      | 26 +++++++++
 be/src/pipeline/pipeline_fragment_context.h        |  2 +
 be/src/runtime/fragment_mgr.cpp                    | 27 +++------
 be/src/runtime/query_context.cpp                   | 10 ++++
 be/src/runtime/query_context.h                     |  7 +++
 be/src/runtime/runtime_state.cpp                   | 10 ++--
 .../test_insert_error_url.groovy                   | 67 ++++++++++++++++++++++
 7 files changed, 125 insertions(+), 24 deletions(-)

diff --git a/be/src/pipeline/pipeline_fragment_context.cpp 
b/be/src/pipeline/pipeline_fragment_context.cpp
index 0c38cc7f0d6..220474c91e4 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -189,6 +189,10 @@ void PipelineFragmentContext::cancel(const Status reason) {
                     debug_string());
     }
 
+    if (auto error_url = get_load_error_url(); !error_url.empty()) {
+        _query_ctx->set_load_error_url(error_url);
+    }
+
     _query_ctx->cancel(reason, _fragment_id);
     if (reason.is<ErrorCode::LIMIT_REACH>()) {
         _is_report_on_cancel = false;
@@ -1749,6 +1753,23 @@ void 
PipelineFragmentContext::close_a_pipeline(PipelineId pipeline_id) {
     }
 }
 
+std::string PipelineFragmentContext::get_load_error_url() {
+    if (const auto& str = _runtime_state->get_error_log_file_path(); 
!str.empty()) {
+        return to_load_error_http_path(str);
+    }
+    for (auto& task_states : _task_runtime_states) {
+        for (auto& task_state : task_states) {
+            if (!task_state) {
+                continue;
+            }
+            if (const auto& str = task_state->get_error_log_file_path(); 
!str.empty()) {
+                return to_load_error_http_path(str);
+            }
+        }
+    }
+    return "";
+}
+
 Status PipelineFragmentContext::send_report(bool done) {
     Status exec_status = _query_ctx->exec_status();
     // If plan is done successfully, but _is_report_success is false,
@@ -1775,6 +1796,10 @@ Status PipelineFragmentContext::send_report(bool done) {
         }
     }
 
+    std::string load_eror_url = _query_ctx->get_load_error_url().empty()
+                                        ? get_load_error_url()
+                                        : _query_ctx->get_load_error_url();
+
     ReportStatusRequest req {exec_status,
                              runtime_states,
                              done || !exec_status.ok(),
@@ -1784,6 +1809,7 @@ Status PipelineFragmentContext::send_report(bool done) {
                              TUniqueId(),
                              -1,
                              _runtime_state.get(),
+                             load_eror_url,
                              [this](const Status& reason) { cancel(reason); }};
 
     return _report_status_cb(
diff --git a/be/src/pipeline/pipeline_fragment_context.h 
b/be/src/pipeline/pipeline_fragment_context.h
index 1674afa886d..f119384b2fb 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -123,6 +123,8 @@ public:
         }
     }
 
+    std::string get_load_error_url();
+
 private:
     Status _build_pipelines(ObjectPool* pool, const 
doris::TPipelineFragmentParams& request,
                             const DescriptorTbl& descs, OperatorPtr* root, 
PipelinePtr cur_pipe);
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index ce37faf91eb..97fef234331 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -456,26 +456,13 @@ void FragmentMgr::coordinator_callback(const 
ReportStatusRequest& req) {
     params.load_counters.emplace(s_dpp_abnormal_all, 
std::to_string(num_rows_load_filtered));
     params.load_counters.emplace(s_unselected_rows, 
std::to_string(num_rows_load_unselected));
 
-    if (!req.runtime_state->get_error_log_file_path().empty()) {
-        std::string error_log_url =
-                
to_load_error_http_path(req.runtime_state->get_error_log_file_path());
-        LOG(INFO) << "error log file path: " << error_log_url
-                  << ", query id: " << print_id(req.query_id)
-                  << ", fragment instance id: " << 
print_id(req.fragment_instance_id);
-        params.__set_tracking_url(error_log_url);
-    } else if (!req.runtime_states.empty()) {
-        for (auto* rs : req.runtime_states) {
-            if (!rs->get_error_log_file_path().empty()) {
-                std::string error_log_url = 
to_load_error_http_path(rs->get_error_log_file_path());
-                LOG(INFO) << "error log file path: " << error_log_url
-                          << ", query id: " << print_id(req.query_id)
-                          << ", fragment instance id: " << 
print_id(rs->fragment_instance_id());
-                params.__set_tracking_url(error_log_url);
-            }
-            if (rs->wal_id() > 0) {
-                params.__set_txn_id(rs->wal_id());
-                params.__set_label(rs->import_label());
-            }
+    if (!req.load_error_url.empty()) {
+        params.__set_tracking_url(req.load_error_url);
+    }
+    for (auto* rs : req.runtime_states) {
+        if (rs->wal_id() > 0) {
+            params.__set_txn_id(rs->wal_id());
+            params.__set_label(rs->import_label());
         }
     }
     if (!req.runtime_state->export_output_files().empty()) {
diff --git a/be/src/runtime/query_context.cpp b/be/src/runtime/query_context.cpp
index 29a93aa40c1..c2a4cfca565 100644
--- a/be/src/runtime/query_context.cpp
+++ b/be/src/runtime/query_context.cpp
@@ -225,6 +225,16 @@ void QueryContext::cancel(Status new_status, int 
fragment_id) {
     cancel_all_pipeline_context(new_status, fragment_id);
 }
 
+void QueryContext::set_load_error_url(std::string error_url) {
+    std::lock_guard<std::mutex> lock(_error_url_lock);
+    _load_error_url = error_url;
+}
+
+std::string QueryContext::get_load_error_url() {
+    std::lock_guard<std::mutex> lock(_error_url_lock);
+    return _load_error_url;
+}
+
 void QueryContext::cancel_all_pipeline_context(const Status& reason, int 
fragment_id) {
     std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> 
ctx_to_cancel;
     {
diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h
index f7d5a515c52..a89ccc461bd 100644
--- a/be/src/runtime/query_context.h
+++ b/be/src/runtime/query_context.h
@@ -57,6 +57,7 @@ struct ReportStatusRequest {
     TUniqueId fragment_instance_id;
     int backend_num;
     RuntimeState* runtime_state;
+    std::string load_error_url;
     std::function<void(const Status&)> cancel_fn;
 };
 
@@ -244,6 +245,9 @@ public:
         }
     }
 
+    void set_load_error_url(std::string error_url);
+    std::string get_load_error_url();
+
 private:
     int _timeout_second;
     TUniqueId _query_id;
@@ -321,6 +325,9 @@ private:
     std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
     _collect_realtime_query_profile() const;
 
+    std::mutex _error_url_lock;
+    std::string _load_error_url;
+
 public:
     // when fragment of pipeline is closed, it will register its profile to 
this map by using add_fragment_profile
     void add_fragment_profile(
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 6c495fa79c7..5aa25e133d2 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -442,6 +442,11 @@ Status 
RuntimeState::append_error_msg_to_file(std::function<std::string()> line,
 }
 
 std::string RuntimeState::get_error_log_file_path() {
+    DBUG_EXECUTE_IF("RuntimeState::get_error_log_file_path.block", {
+        if (!_error_log_file_path.empty()) {
+            std::this_thread::sleep_for(std::chrono::seconds(1));
+        }
+    });
     std::lock_guard<std::mutex> l(_s3_error_log_file_lock);
     if (_s3_error_fs && _error_log_file && _error_log_file->is_open()) {
         // close error log file
@@ -450,10 +455,7 @@ std::string RuntimeState::get_error_log_file_path() {
                 
_exec_env->load_path_mgr()->get_load_error_absolute_path(_error_log_file_path);
         // upload error log file to s3
         Status st = _s3_error_fs->upload(error_log_absolute_path, 
_s3_error_log_file_path);
-        if (st.ok()) {
-            // remove local error log file
-            std::filesystem::remove(error_log_absolute_path);
-        } else {
+        if (!st.ok()) {
             // upload failed and return local error log file path
             LOG(WARNING) << "Fail to upload error file to s3, 
error_log_file_path="
                          << _error_log_file_path << ", error=" << st;
diff --git 
a/regression-test/suites/fault_injection_p0/test_insert_error_url.groovy 
b/regression-test/suites/fault_injection_p0/test_insert_error_url.groovy
new file mode 100644
index 00000000000..aa96642ac80
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_insert_error_url.groovy
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_insert_error_url", "nonConcurrent") {
+    def tableName = "test_insert_error_url_tbl"
+    sql """drop table if exists ${tableName}"""
+    sql """
+    CREATE TABLE IF NOT EXISTS ${tableName} (
+         L_ORDERKEY    INTEGER NOT NULL,
+         L_PARTKEY     INTEGER NOT NULL,
+         L_SUPPKEY     INTEGER NOT NULL,
+         L_LINENUMBER  INTEGER NOT NULL,
+         L_QUANTITY    DECIMAL(15,2) NOT NULL,
+         L_EXTENDEDPRICE  DECIMAL(15,2) NOT NULL,
+         L_DISCOUNT    DECIMAL(15,2) NOT NULL,
+         L_TAX         DECIMAL(15,2) NOT NULL,
+         L_RETURNFLAG  CHAR(1) NOT NULL,
+         L_LINESTATUS  CHAR(1) NOT NULL,
+         L_SHIPDATE    DATE NOT NULL,
+         L_COMMITDATE  DATE NOT NULL,
+         L_RECEIPTDATE DATE NOT NULL,
+         L_SHIPINSTRUCT CHAR(25) NOT NULL,
+         L_SHIPMODE     CHAR(10) NOT NULL,
+         L_COMMENT      VARCHAR(44) NOT NULL
+       )
+       UNIQUE KEY(L_ORDERKEY, L_PARTKEY, L_SUPPKEY, L_LINENUMBER)
+       PARTITION BY RANGE(L_ORDERKEY) (
+           PARTITION p2023 VALUES LESS THAN ("5000000") 
+       )
+       DISTRIBUTED BY HASH(L_ORDERKEY) BUCKETS 3
+       PROPERTIES (
+         "replication_num" = "1"
+       );
+     """
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("RuntimeState::get_error_log_file_path.block")
+        expectExceptionLike({
+            sql """
+                 insert into ${tableName} select * from S3(
+                     "uri" = 
"http://${getS3BucketName()}.${getS3Endpoint()}/regression/tpch/sf1/lineitem.csv.split01.gz",
+                     "s3.access_key"= "${getS3AK()}",
+                     "s3.secret_key" = "${getS3SK()}",
+                     "s3.region" = "${getS3Region()}",
+                     "format" = "csv",
+                     "column_separator" = "|"
+                    );
+            """
+        }, "error_log")
+    } finally {
+       
GetDebugPoint().disableDebugPointForAllBEs("RuntimeState::get_error_log_file_path.block")
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to