This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new fcfff0d5919 [fix](routine-load) fix be core when partial table load
failed #34712 (#35621)
fcfff0d5919 is described below
commit fcfff0d59196d40ba27723ce1295e96247b1ecce
Author: HHoflittlefish777 <[email protected]>
AuthorDate: Thu May 30 12:59:36 2024 +0800
[fix](routine-load) fix be core when partial table load failed #34712
(#35621)
---
be/src/io/fs/multi_table_pipe.cpp | 2 +-
.../routine_load/routine_load_task_executor.cpp | 34 ++++++++++++++++++----
2 files changed, 30 insertions(+), 6 deletions(-)
diff --git a/be/src/io/fs/multi_table_pipe.cpp
b/be/src/io/fs/multi_table_pipe.cpp
index 97f88161bc9..35fa8b02cf9 100644
--- a/be/src/io/fs/multi_table_pipe.cpp
+++ b/be/src/io/fs/multi_table_pipe.cpp
@@ -222,7 +222,6 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<ExecParam> para
_unplanned_pipes.size(), _planned_pipes.size(),
params.size());
_unplanned_pipes.clear();
- _inflight_cnt += params.size();
for (auto& plan : params) {
if (!plan.__isset.table_name ||
_planned_pipes.find(plan.table_name) == _planned_pipes.end()) {
@@ -243,6 +242,7 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env,
std::vector<ExecParam> para
CHECK(false);
}
+ _inflight_cnt++;
exec_env->fragment_mgr()->exec_plan_fragment(plan,
[this](RuntimeState* state,
Status*
status) {
{
diff --git a/be/src/runtime/routine_load/routine_load_task_executor.cpp
b/be/src/runtime/routine_load/routine_load_task_executor.cpp
index 7226dcfa484..c61b81cdfc8 100644
--- a/be/src/runtime/routine_load/routine_load_task_executor.cpp
+++ b/be/src/runtime/routine_load/routine_load_task_executor.cpp
@@ -284,6 +284,20 @@ void
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
} \
} while (false);
+#define HANDLE_MULTI_TABLE_ERROR(stmt, err_msg) \
+ do { \
+ Status _status_ = (stmt); \
+ if (UNLIKELY(!_status_.ok() && !_status_.is<PUBLISH_TIMEOUT>())) { \
+ err_handler(ctx, _status_, err_msg); \
+ cb(ctx); \
+ _status_ = ctx->future.get(); \
+ if (!_status_.ok()) { \
+ LOG(ERROR) << "failed to get future, " << ctx->brief(); \
+ } \
+ return; \
+ } \
+ } while (false);
+
LOG(INFO) << "begin to execute routine load task: " << ctx->brief();
// create data consumer group
@@ -338,17 +352,27 @@ void
RoutineLoadTaskExecutor::exec_task(std::shared_ptr<StreamLoadContext> ctx,
std::shared_ptr<io::KafkaConsumerPipe> kafka_pipe =
std::static_pointer_cast<io::KafkaConsumerPipe>(ctx->body_sink);
- // start to consume, this may block a while
- HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming failed");
-
if (ctx->is_multi_table) {
+ Status st;
// plan the rest of unplanned data
auto multi_table_pipe =
std::static_pointer_cast<io::MultiTablePipe>(ctx->body_sink);
- HANDLE_ERROR(multi_table_pipe->request_and_exec_plans(),
- "multi tables task executes plan error");
+ // start to consume, this may block a while
+ st = consumer_grp->start_all(ctx, kafka_pipe);
+ if (!st.ok()) {
+ multi_table_pipe->handle_consume_finished();
+ HANDLE_MULTI_TABLE_ERROR(st, "consuming failed");
+ }
+ st = multi_table_pipe->request_and_exec_plans();
+ if (!st.ok()) {
+ multi_table_pipe->handle_consume_finished();
+ HANDLE_MULTI_TABLE_ERROR(st, "multi tables task executes plan
error");
+ }
// need memory order
multi_table_pipe->handle_consume_finished();
HANDLE_ERROR(kafka_pipe->finish(), "finish multi table task failed");
+ } else {
+ // start to consume, this may block a while
+ HANDLE_ERROR(consumer_grp->start_all(ctx, kafka_pipe), "consuming
failed");
}
// wait for all consumers finished
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]