This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7718451c8e4 [enhancement](limitreach) FE receives many limit reach 
error messages and make very confused (#62842)
7718451c8e4 is described below

commit 7718451c8e44e2f10e55dcc4eb54ff977b9bbbaf
Author: yiguolei <[email protected]>
AuthorDate: Wed Jul 1 10:21:17 2026 +0800

    [enhancement](limitreach) FE receives many limit reach error messages and 
make very confused (#62842)
    
    ### What problem does this PR solve?
    
    When the query has a limti clause, Fe will send a cancel message with
    LIMIT_REACHED error code to BE, be should cancel the running query. This
    is a normal case, be should not send a error message to FE any more. But
    I find FE received many error messages:
    
    2026-06-24 16:34:27,334 WARN (thrift-server-pool-1|157)
    [AbstractJobProcessor.updateFragmentExecStatus():111] one instance
    report fail, query_id=16a47df10b0a4802-9e8a6a28b925ba36 fragment_id=0
    instance_id=0-0, be=1772676991047, error message: Status
    [errorCode=LIMIT_REACH, errorMsg=(xxxx)[LIMIT_REACH]PStatus: query reach
    limit]
    
    This is because BE does not handle the error code correctly and the code
    in BE is very complex and difficult to understand. I refactor the code
    and fix the bug.
    
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test <!-- At least one of them must be included. -->
        - [ ] Regression test
        - [ ] Unit Test
        - [ ] Manual test (add detailed scripts or steps below)
        - [ ] No need to test or manual test. Explain why:
    - [ ] This is a refactor/code format and no logic has been changed.
            - [ ] Previous test can cover this change.
            - [ ] No code files have been changed.
            - [ ] Other reason <!-- Add your reason?  -->
    
    - Behavior changed:
        - [ ] No.
        - [ ] Yes. <!-- Explain the behavior change -->
    
    - Does this need documentation?
        - [ ] No.
    - [ ] Yes. <!-- Add document PR link here. eg:
    https://github.com/apache/doris-website/pull/1214 -->
    
    ### Check List (For Reviewer who merge this PR)
    
    - [ ] Confirm the release note
    - [ ] Confirm test cases
    - [ ] Confirm document
    - [ ] Add branch pick label <!-- Add branch pick label that this PR
    should merge into -->
---
 be/src/common/status.h                             |  2 +-
 be/src/exec/pipeline/pipeline_fragment_context.cpp | 30 +++++++---------------
 be/src/exec/pipeline/pipeline_fragment_context.h   |  7 +++--
 3 files changed, 13 insertions(+), 26 deletions(-)

diff --git a/be/src/common/status.h b/be/src/common/status.h
index b979fcbb19c..d29b66459a3 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -527,7 +527,7 @@ public:
 
     void set_code(int code) { _code = code; }
 
-    bool ok() const { return _code == ErrorCode::OK || _code == 
ErrorCode::FINISHED; }
+    bool ok() const { return _code == ErrorCode::OK; }
 
     // Convert into TStatus.
     void to_thrift(TStatus* status) const;
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp 
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index 125c6e18fdd..f8bd3120846 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -145,7 +145,6 @@ PipelineFragmentContext::PipelineFragmentContext(
           _exec_env(exec_env),
           _query_ctx(std::move(query_ctx)),
           _call_back(call_back),
-          _is_report_on_cancel(true),
           _params(request),
           _parallel_instances(_params.__isset.parallel_instances ? 
_params.parallel_instances : 0),
           _need_notify_close(request.__isset.need_notify_close ? 
request.need_notify_close
@@ -240,9 +239,7 @@ void PipelineFragmentContext::cancel(const Status reason) {
     }
 
     _query_ctx->cancel(reason, _fragment_id);
-    if (reason.is<ErrorCode::LIMIT_REACH>()) {
-        _is_report_on_cancel = false;
-    } else {
+    if (!reason.is<ErrorCode::LIMIT_REACH>() && 
!reason.is<ErrorCode::FINISHED>()) {
         for (auto& id : _fragment_instance_ids) {
             LOG(WARNING) << "PipelineFragmentContext cancel instance: " << 
print_id(id);
         }
@@ -2568,26 +2565,17 @@ void 
PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r
 
 Status PipelineFragmentContext::send_report(bool done) {
     Status exec_status = _query_ctx->exec_status();
-    // If plan is done successfully, but _is_report_success is false,
-    // no need to send report.
-    // Load will set _is_report_success to true because load wants to know
-    // the process.
-    if (!_is_report_success && done && exec_status.ok()) {
-        return Status::OK();
-    }
 
-    // If both _is_report_success and _is_report_on_cancel are false,
-    // which means no matter query is success or failed, no report is needed.
-    // This may happen when the query limit reached and
-    // a internal cancellation being processed
-    // When limit is reached the fragment is also cancelled, but 
_is_report_on_cancel will
-    // be set to false, to avoid sending fault report to FE.
-    if (!_is_report_success && !_is_report_on_cancel) {
-        if (done) {
-            // if done is true, which means the query is finished 
successfully, we can safely close the fragment instance without sending report 
to FE, and just return OK status here.
+    if (!_is_report_success) {
+        // _is_report_success means this is not a load job, do not need to 
report to fe periodically.
+        if (exec_status.is<ErrorCode::LIMIT_REACH>() || 
exec_status.is<ErrorCode::FINISHED>() ||
+            exec_status.ok()) {
             return Status::OK();
+        } else {
+            // else it means there is some error in processing the query, and 
we need to send report to FE to let FE know the error.
         }
-        return Status::NeedSendAgain("");
+    } else {
+        // This is a load job, need report the process status to FE periodly, 
so that FE can know the process of the load job.
     }
 
     std::vector<RuntimeState*> runtime_states;
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.h 
b/be/src/exec/pipeline/pipeline_fragment_context.h
index 4950da0a7b0..fe1d2a6c065 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.h
+++ b/be/src/exec/pipeline/pipeline_fragment_context.h
@@ -229,6 +229,9 @@ private:
     std::atomic<int> _total_tasks = 0;
 
     std::unique_ptr<RuntimeProfile> _fragment_level_profile;
+    // This is used by loading process to report Fragment exec status to FE, 
FE need fragment status to
+    // check if the loading process is finished. And during the report, BE 
will send the loading message to FE,
+    // for example the loading error, commit rows num etc.
     bool _is_report_success = false;
 
     std::unique_ptr<RuntimeState> _runtime_state;
@@ -246,10 +249,6 @@ private:
     std::function<void(RuntimeState*, Status*)> _call_back;
     std::atomic_bool _is_fragment_instance_closed = false;
 
-    // If this is set to false, and '_is_report_success' is false as well,
-    // This executor will not report status to FE on being cancelled.
-    bool _is_report_on_cancel;
-
     // 0 indicates reporting is in progress or not required
     std::atomic_bool _disable_period_report = true;
     std::atomic_uint64_t _previous_report_time = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to