This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7718451c8e4 [enhancement](limitreach) FE receives many limit reach
error messages and make very confused (#62842)
7718451c8e4 is described below
commit 7718451c8e44e2f10e55dcc4eb54ff977b9bbbaf
Author: yiguolei <[email protected]>
AuthorDate: Wed Jul 1 10:21:17 2026 +0800
[enhancement](limitreach) FE receives many limit reach error messages and
make very confused (#62842)
### What problem does this PR solve?
When the query has a limti clause, Fe will send a cancel message with
LIMIT_REACHED error code to BE, be should cancel the running query. This
is a normal case, be should not send a error message to FE any more. But
I find FE received many error messages:
2026-06-24 16:34:27,334 WARN (thrift-server-pool-1|157)
[AbstractJobProcessor.updateFragmentExecStatus():111] one instance
report fail, query_id=16a47df10b0a4802-9e8a6a28b925ba36 fragment_id=0
instance_id=0-0, be=1772676991047, error message: Status
[errorCode=LIMIT_REACH, errorMsg=(xxxx)[LIMIT_REACH]PStatus: query reach
limit]
This is because BE does not handle the error code correctly and the code
in BE is very complex and difficult to understand. I refactor the code
and fix the bug.
### Release note
None
### Check List (For Author)
- Test <!-- At least one of them must be included. -->
- [ ] Regression test
- [ ] Unit Test
- [ ] Manual test (add detailed scripts or steps below)
- [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
- [ ] Previous test can cover this change.
- [ ] No code files have been changed.
- [ ] Other reason <!-- Add your reason? -->
- Behavior changed:
- [ ] No.
- [ ] Yes. <!-- Explain the behavior change -->
- Does this need documentation?
- [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
https://github.com/apache/doris-website/pull/1214 -->
### Check List (For Reviewer who merge this PR)
- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->
---
be/src/common/status.h | 2 +-
be/src/exec/pipeline/pipeline_fragment_context.cpp | 30 +++++++---------------
be/src/exec/pipeline/pipeline_fragment_context.h | 7 +++--
3 files changed, 13 insertions(+), 26 deletions(-)
diff --git a/be/src/common/status.h b/be/src/common/status.h
index b979fcbb19c..d29b66459a3 100644
--- a/be/src/common/status.h
+++ b/be/src/common/status.h
@@ -527,7 +527,7 @@ public:
void set_code(int code) { _code = code; }
- bool ok() const { return _code == ErrorCode::OK || _code ==
ErrorCode::FINISHED; }
+ bool ok() const { return _code == ErrorCode::OK; }
// Convert into TStatus.
void to_thrift(TStatus* status) const;
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.cpp
b/be/src/exec/pipeline/pipeline_fragment_context.cpp
index 125c6e18fdd..f8bd3120846 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/exec/pipeline/pipeline_fragment_context.cpp
@@ -145,7 +145,6 @@ PipelineFragmentContext::PipelineFragmentContext(
_exec_env(exec_env),
_query_ctx(std::move(query_ctx)),
_call_back(call_back),
- _is_report_on_cancel(true),
_params(request),
_parallel_instances(_params.__isset.parallel_instances ?
_params.parallel_instances : 0),
_need_notify_close(request.__isset.need_notify_close ?
request.need_notify_close
@@ -240,9 +239,7 @@ void PipelineFragmentContext::cancel(const Status reason) {
}
_query_ctx->cancel(reason, _fragment_id);
- if (reason.is<ErrorCode::LIMIT_REACH>()) {
- _is_report_on_cancel = false;
- } else {
+ if (!reason.is<ErrorCode::LIMIT_REACH>() &&
!reason.is<ErrorCode::FINISHED>()) {
for (auto& id : _fragment_instance_ids) {
LOG(WARNING) << "PipelineFragmentContext cancel instance: " <<
print_id(id);
}
@@ -2568,26 +2565,17 @@ void
PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& r
Status PipelineFragmentContext::send_report(bool done) {
Status exec_status = _query_ctx->exec_status();
- // If plan is done successfully, but _is_report_success is false,
- // no need to send report.
- // Load will set _is_report_success to true because load wants to know
- // the process.
- if (!_is_report_success && done && exec_status.ok()) {
- return Status::OK();
- }
- // If both _is_report_success and _is_report_on_cancel are false,
- // which means no matter query is success or failed, no report is needed.
- // This may happen when the query limit reached and
- // a internal cancellation being processed
- // When limit is reached the fragment is also cancelled, but
_is_report_on_cancel will
- // be set to false, to avoid sending fault report to FE.
- if (!_is_report_success && !_is_report_on_cancel) {
- if (done) {
- // if done is true, which means the query is finished
successfully, we can safely close the fragment instance without sending report
to FE, and just return OK status here.
+ if (!_is_report_success) {
+ // _is_report_success means this is not a load job, do not need to
report to fe periodically.
+ if (exec_status.is<ErrorCode::LIMIT_REACH>() ||
exec_status.is<ErrorCode::FINISHED>() ||
+ exec_status.ok()) {
return Status::OK();
+ } else {
+ // else it means there is some error in processing the query, and
we need to send report to FE to let FE know the error.
}
- return Status::NeedSendAgain("");
+ } else {
+ // This is a load job, need report the process status to FE periodly,
so that FE can know the process of the load job.
}
std::vector<RuntimeState*> runtime_states;
diff --git a/be/src/exec/pipeline/pipeline_fragment_context.h
b/be/src/exec/pipeline/pipeline_fragment_context.h
index 4950da0a7b0..fe1d2a6c065 100644
--- a/be/src/exec/pipeline/pipeline_fragment_context.h
+++ b/be/src/exec/pipeline/pipeline_fragment_context.h
@@ -229,6 +229,9 @@ private:
std::atomic<int> _total_tasks = 0;
std::unique_ptr<RuntimeProfile> _fragment_level_profile;
+ // This is used by loading process to report Fragment exec status to FE,
FE need fragment status to
+ // check if the loading process is finished. And during the report, BE
will send the loading message to FE,
+ // for example the loading error, commit rows num etc.
bool _is_report_success = false;
std::unique_ptr<RuntimeState> _runtime_state;
@@ -246,10 +249,6 @@ private:
std::function<void(RuntimeState*, Status*)> _call_back;
std::atomic_bool _is_fragment_instance_closed = false;
- // If this is set to false, and '_is_report_success' is false as well,
- // This executor will not report status to FE on being cancelled.
- bool _is_report_on_cancel;
-
// 0 indicates reporting is in progress or not required
std::atomic_bool _disable_period_report = true;
std::atomic_uint64_t _previous_report_time = 0;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]