github-actions[bot] commented on code in PR #64784:
URL: https://github.com/apache/doris/pull/64784#discussion_r3466327306
##########
be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp:
##########
@@ -818,14 +837,27 @@ Result<std::vector<SharedListenableFuture<Void>>>
TimeSharingTaskExecutor::enque
RETURN_IF_ERROR_RESULT(prioritized_split->init());
if (intermediate) {
if (handle->record_intermediate_split(prioritized_split)) {
- _start_intermediate_split(prioritized_split, lock);
+
RETURN_IF_ERROR_RESULT(_start_intermediate_split(prioritized_split, lock));
} else {
splits_to_destroy.push_back(prioritized_split);
}
} else {
if (handle->enqueue_split(prioritized_split)) {
- _schedule_task_if_necessary(handle, lock);
- _add_new_entrants(lock);
+ std::shared_ptr<PrioritizedSplitRunner> failed_split;
+ Status status = _schedule_task_if_necessary(handle, lock,
&failed_split);
+ if (!status.ok()) {
+ if (failed_split == prioritized_split) {
+ return unexpected(std::move(status));
+ }
+ LOG(WARNING)
+ << "_schedule_task_if_necessary failed after
enqueueing split: "
+ << status;
+ }
Review Comment:
This branch can still hide a synchronous submit failure from the caller. If
per-task guaranteed concurrency is zero but global min concurrency is positive,
`_schedule_task_if_necessary()` does not start the current split;
`_add_new_entrants()` can then pick that same split, `_start_split()` can close
it with the `_do_submit()` error, and this code only logs the error before
returning a successful `Result` containing the split's failed future. Callers
such as `submit_scan_task()` check only `has_value()` to decide whether the
split was accepted, so they can miss the failure. Please return the error when
`_add_new_entrants()` failed a split submitted by this call, or otherwise make
the enqueue API consistently report synchronous submit failures.
##########
be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp:
##########
@@ -920,26 +967,38 @@ void
TimeSharingTaskExecutor::_add_new_entrants(std::unique_lock<std::mutex>& lo
if (!split) {
break;
}
- _start_split(split, lock);
+ RETURN_IF_ERROR(_start_split(split, lock));
}
+ return Status::OK();
}
-void TimeSharingTaskExecutor::_start_intermediate_split(
+Status TimeSharingTaskExecutor::_start_intermediate_split(
std::shared_ptr<PrioritizedSplitRunner> split,
std::unique_lock<std::mutex>& lock) {
- _start_split(split, lock);
+ Status status = _start_split(split, lock);
+ if (!status.ok()) {
+ return status;
+ }
_intermediate_splits.insert(split);
+ return Status::OK();
}
-void
TimeSharingTaskExecutor::_start_split(std::shared_ptr<PrioritizedSplitRunner>
split,
Review Comment:
The new submit-failure cleanup only covers `_start_split()` callers, but
resubmitted scan splits still bypass it. `re_enqueue_split()` fetches the
already-running `PrioritizedSplitRunner` from the task handle and returns
`_do_submit(prioritized_split)` directly; if this is a cold restart
(`min_thread_num == 0` and the previous worker idled out) and first-worker
creation fails, `_do_submit()` now returns before queueing, but that path does
not erase `_all_splits`, does not call `task_handle->split_finished(split)`,
and does not close the split future. The scan task submit returns an error to
the caller while the executor/task handle still think the split is running, so
later scheduling/removal can observe stale retained work. Please route
`re_enqueue_split()` through the same rollback helper, or add equivalent
cleanup/close logic for `_do_submit()` failure on resubmission, with a
failure-injection test where a split runs once, all workers exit, and
`re_enqueue_split()` hits `TimeSh
aringTaskExecutor.create_thread.inject_error`.
##########
be/src/exec/scan/task_executor/time_sharing/time_sharing_task_executor.cpp:
##########
@@ -920,26 +967,38 @@ void
TimeSharingTaskExecutor::_add_new_entrants(std::unique_lock<std::mutex>& lo
if (!split) {
break;
}
- _start_split(split, lock);
+ RETURN_IF_ERROR(_start_split(split, lock));
}
+ return Status::OK();
}
-void TimeSharingTaskExecutor::_start_intermediate_split(
+Status TimeSharingTaskExecutor::_start_intermediate_split(
std::shared_ptr<PrioritizedSplitRunner> split,
std::unique_lock<std::mutex>& lock) {
- _start_split(split, lock);
+ Status status = _start_split(split, lock);
+ if (!status.ok()) {
+ return status;
+ }
_intermediate_splits.insert(split);
+ return Status::OK();
}
-void
TimeSharingTaskExecutor::_start_split(std::shared_ptr<PrioritizedSplitRunner>
split,
- std::unique_lock<std::mutex>& lock)
{
+Status
TimeSharingTaskExecutor::_start_split(std::shared_ptr<PrioritizedSplitRunner>
split,
+ std::unique_lock<std::mutex>&
lock) {
_all_splits.insert(split);
lock.unlock();
Status submit_status = _do_submit(split);
Review Comment:
This rollback path treats a submit failure as a completed split. For leaf
splits, `poll_next_split()` has already put the split in
`_running_leaf_splits`, so calling `split_finished()` here enters
`SplitConcurrencyController::split_finished()` before removing it. Since a
split that never reached `process()` has `scheduled_nanos() == 0`,
`_should_adjust(0)` is immediately true, and with the scanner task handle's `[]
{ return 0.0; }` utilization supplier this can increase the task's target
concurrency even though the only event was thread creation failure. Please
remove the split from the task handle without feeding the adaptive controller
on submit failure, or make `split_finished()` distinguish never-started submit
rollback from real split completion.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]