This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 54fe1a166b9 [Refactor](scan) refactor scan scheduler to improve
performance (#27948)
54fe1a166b9 is described below
commit 54fe1a166b984fa349c76d5becad94859a2d7fc7
Author: HappenLee <[email protected]>
AuthorDate: Tue Dec 5 13:03:16 2023 +0800
[Refactor](scan) refactor scan scheduler to improve performance (#27948)
* [Refactor](scan) refactor scan scheduler to improve performance
* fix pipeline x core
---
be/src/runtime/runtime_state.h | 3 +
be/src/vec/exec/scan/pip_scanner_context.h | 22 ++++
be/src/vec/exec/scan/scanner_context.cpp | 125 ++++++++++++---------
be/src/vec/exec/scan/scanner_scheduler.cpp | 19 +++-
.../main/java/org/apache/doris/qe/Coordinator.java | 1 +
.../java/org/apache/doris/qe/SessionVariable.java | 9 ++
gensrc/thrift/PaloInternalService.thrift | 2 +
7 files changed, 125 insertions(+), 56 deletions(-)
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index b8ab49ff276..e37883abbe1 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -107,6 +107,9 @@ public:
const DescriptorTbl& desc_tbl() const { return *_desc_tbl; }
void set_desc_tbl(const DescriptorTbl* desc_tbl) { _desc_tbl = desc_tbl; }
int batch_size() const { return _query_options.batch_size; }
+ int wait_full_block_schedule_times() const {
+ return _query_options.wait_full_block_schedule_times;
+ }
bool abort_on_error() const { return _query_options.abort_on_error; }
bool abort_on_default_limit_exceeded() const {
return _query_options.abort_on_default_limit_exceeded;
diff --git a/be/src/vec/exec/scan/pip_scanner_context.h
b/be/src/vec/exec/scan/pip_scanner_context.h
index 681fc09739a..57d632a03ea 100644
--- a/be/src/vec/exec/scan/pip_scanner_context.h
+++ b/be/src/vec/exec/scan/pip_scanner_context.h
@@ -63,6 +63,7 @@ public:
}
}
+ std::vector<vectorized::BlockUPtr> merge_blocks;
{
std::unique_lock<std::mutex> l(*_queue_mutexs[id]);
if (_blocks_queues[id].empty()) {
@@ -76,6 +77,18 @@ public:
*block = std::move(_blocks_queues[id].front());
_blocks_queues[id].pop_front();
+ auto rows = (*block)->rows();
+ while (!_blocks_queues[id].empty()) {
+ const auto add_rows = (*_blocks_queues[id].front()).rows();
+ if (rows + add_rows < state->batch_size()) {
+ rows += add_rows;
+
merge_blocks.emplace_back(std::move(_blocks_queues[id].front()));
+ _blocks_queues[id].pop_front();
+ } else {
+ break;
+ }
+ }
+
if (_blocks_queues[id].empty()) {
this->reschedule_scanner_ctx();
if (_dependency) {
@@ -83,7 +96,16 @@ public:
}
}
}
+
_current_used_bytes -= (*block)->allocated_bytes();
+ if (!merge_blocks.empty()) {
+ vectorized::MutableBlock m(block->get());
+ for (auto& merge_block : merge_blocks) {
+ _current_used_bytes -= merge_block->allocated_bytes();
+ static_cast<void>(m.merge(*merge_block));
+ return_free_block(std::move(merge_block));
+ }
+ }
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index 0d9b0351dd8..7cad8242c1f 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -209,65 +209,90 @@ bool ScannerContext::empty_in_queue(int id) {
Status ScannerContext::get_block_from_queue(RuntimeState* state,
vectorized::BlockUPtr* block,
bool* eos, int id, bool wait) {
- std::unique_lock l(_transfer_lock);
- // Normally, the scanner scheduler will schedule ctx.
- // But when the amount of data in the blocks queue exceeds the upper limit,
- // the scheduler will stop scheduling.
- // (if the scheduler continues to schedule, it will cause a lot of busy
running).
- // At this point, consumers are required to trigger new scheduling to
ensure that
- // data can be continuously fetched.
- int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
- int32_t serving_blocks_num = _serving_blocks_num;
- bool to_be_schedule = should_be_scheduled();
- int num_running_scanners = _num_running_scanners;
-
- bool is_scheduled = false;
- if (to_be_schedule && _num_running_scanners == 0) {
- is_scheduled = true;
- auto state = _scanner_scheduler->submit(this);
- if (state.ok()) {
- _num_scheduling_ctx++;
- } else {
- set_status_on_error(state, false);
+ std::vector<vectorized::BlockUPtr> merge_blocks;
+ {
+ std::unique_lock l(_transfer_lock);
+ // Normally, the scanner scheduler will schedule ctx.
+ // But when the amount of data in the blocks queue exceeds the upper
limit,
+ // the scheduler will stop scheduling.
+ // (if the scheduler continues to schedule, it will cause a lot of
busy running).
+ // At this point, consumers are required to trigger new scheduling to
ensure that
+ // data can be continuously fetched.
+ int64_t cur_bytes_in_queue = _cur_bytes_in_queue;
+ int32_t serving_blocks_num = _serving_blocks_num;
+ bool to_be_schedule = should_be_scheduled();
+ int num_running_scanners = _num_running_scanners;
+
+ bool is_scheduled = false;
+ if (to_be_schedule && _num_running_scanners == 0) {
+ is_scheduled = true;
+ auto state = _scanner_scheduler->submit(this);
+ if (state.ok()) {
+ _num_scheduling_ctx++;
+ } else {
+ set_status_on_error(state, false);
+ }
}
- }
- // Wait for block from queue
- if (wait) {
- // scanner batch wait time
- SCOPED_TIMER(_scanner_wait_batch_timer);
- while (!(!_blocks_queue.empty() || _is_finished || !status().ok() ||
- state->is_cancelled())) {
- if (!is_scheduled && _num_running_scanners == 0 &&
should_be_scheduled()) {
- LOG(INFO) << "fatal, cur_bytes_in_queue " << cur_bytes_in_queue
- << ", serving_blocks_num " << serving_blocks_num
- << ", num_running_scanners " << num_running_scanners
- << ", to_be_scheudle " << to_be_schedule <<
(void*)this;
+ // Wait for block from queue
+ if (wait) {
+ // scanner batch wait time
+ SCOPED_TIMER(_scanner_wait_batch_timer);
+ while (!(!_blocks_queue.empty() || _is_finished || !status().ok()
||
+ state->is_cancelled())) {
+ if (!is_scheduled && _num_running_scanners == 0 &&
should_be_scheduled()) {
+ LOG(INFO) << "fatal, cur_bytes_in_queue " <<
cur_bytes_in_queue
+ << ", serving_blocks_num " << serving_blocks_num
+ << ", num_running_scanners " <<
num_running_scanners
+ << ", to_be_scheudle " << to_be_schedule <<
(void*)this;
+ }
+ _blocks_queue_added_cv.wait_for(l, 1s);
}
- _blocks_queue_added_cv.wait_for(l, 1s);
}
- }
- if (state->is_cancelled()) {
- set_status_on_error(Status::Cancelled("cancelled"), false);
- }
-
- if (!status().ok()) {
- return status();
- }
+ if (state->is_cancelled()) {
+ set_status_on_error(Status::Cancelled("cancelled"), false);
+ }
- if (!_blocks_queue.empty()) {
- *block = std::move(_blocks_queue.front());
- _blocks_queue.pop_front();
+ if (!status().ok()) {
+ return status();
+ }
- auto block_bytes = (*block)->allocated_bytes();
- _cur_bytes_in_queue -= block_bytes;
+ if (!_blocks_queue.empty()) {
+ *block = std::move(_blocks_queue.front());
+ _blocks_queue.pop_front();
+ auto block_bytes = (*block)->allocated_bytes();
+ _cur_bytes_in_queue -= block_bytes;
+ _queued_blocks_memory_usage->add(-block_bytes);
+
+ auto rows = (*block)->rows();
+ while (!_blocks_queue.empty()) {
+ auto& add_block = _blocks_queue.front();
+ const auto add_rows = (*add_block).rows();
+ if (rows + add_rows < state->batch_size()) {
+ rows += add_rows;
+ block_bytes = (*add_block).allocated_bytes();
+ _cur_bytes_in_queue -= block_bytes;
+ _queued_blocks_memory_usage->add(-block_bytes);
+ merge_blocks.emplace_back(std::move(add_block));
+ _blocks_queue.pop_front();
+ } else {
+ break;
+ }
+ }
+ } else {
+ *eos = _is_finished;
+ }
+ }
- _queued_blocks_memory_usage->add(-block_bytes);
- return Status::OK();
- } else {
- *eos = _is_finished;
+ if (!merge_blocks.empty()) {
+ vectorized::MutableBlock m(block->get());
+ for (auto& merge_block : merge_blocks) {
+ static_cast<void>(m.merge(*merge_block));
+ return_free_block(std::move(merge_block));
+ }
}
+
return Status::OK();
}
diff --git a/be/src/vec/exec/scan/scanner_scheduler.cpp
b/be/src/vec/exec/scan/scanner_scheduler.cpp
index b3cfc2e48a6..6b7bc232a2e 100644
--- a/be/src/vec/exec/scan/scanner_scheduler.cpp
+++ b/be/src/vec/exec/scan/scanner_scheduler.cpp
@@ -347,8 +347,18 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
bool should_stop = false;
// Has to wait at least one full block, or it will cause a lot of schedule
task in priority
// queue, it will affect query latency and query concurrency for example
ssb 3.3.
- while (!eos && raw_bytes_read < raw_bytes_threshold && raw_rows_read <
raw_rows_threshold &&
- num_rows_in_block < state->batch_size()) {
+ auto should_do_scan = [&, batch_size = state->batch_size(),
+ time = state->wait_full_block_schedule_times()]() {
+ if (raw_bytes_read < raw_bytes_threshold && raw_rows_read <
raw_rows_threshold) {
+ return true;
+ } else if (num_rows_in_block < batch_size) {
+ return raw_bytes_read < raw_bytes_threshold * time &&
+ raw_rows_read < raw_rows_threshold * time;
+ }
+ return false;
+ };
+
+ while (!eos && should_do_scan()) {
// TODO llj task group should should_yield?
if (UNLIKELY(ctx->done())) {
// No need to set status on error here.
@@ -384,10 +394,7 @@ void ScannerScheduler::_scanner_scan(ScannerScheduler*
scheduler, ScannerContext
ctx->return_free_block(std::move(block));
} else {
if (!blocks.empty() && blocks.back()->rows() + block->rows() <=
state->batch_size()) {
- status =
vectorized::MutableBlock(blocks.back().get()).merge(*block);
- if (!status.ok()) {
- break;
- }
+
static_cast<void>(vectorized::MutableBlock(blocks.back().get()).merge(*block));
ctx->return_free_block(std::move(block));
} else {
blocks.push_back(std::move(block));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index f3622b62fbb..1ca37d7e70d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -374,6 +374,7 @@ public class Coordinator implements CoordInterface {
this.queryOptions.setExecutionTimeout(context.getExecTimeout());
this.queryOptions.setEnableScanNodeRunSerial(context.getSessionVariable().isEnableScanRunSerial());
this.queryOptions.setFeProcessUuid(ExecuteEnv.getInstance().getProcessUUID());
+
this.queryOptions.setWaitFullBlockScheduleTimes(context.getSessionVariable().getWaitFullBlockScheduleTimes());
}
public ConnectContext getConnectContext() {
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c8ba8f4943b..ff957244c92 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -468,6 +468,8 @@ public class SessionVariable implements Serializable,
Writable {
// this session variable is set to true.
public static final String FALLBACK_OTHER_REPLICA_WHEN_FIXED_CORRUPT =
"fallback_other_replica_when_fixed_corrupt";
+ public static final String WAIT_FULL_BLOCK_SCHEDULE_TIMES =
"wait_full_block_schedule_times";
+
public static final List<String> DEBUG_VARIABLES = ImmutableList.of(
SKIP_DELETE_PREDICATE,
SKIP_DELETE_BITMAP,
@@ -834,6 +836,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = USE_RF_DEFAULT)
public boolean useRuntimeFilterDefaultSize = false;
+ @VariableMgr.VarAttr(name = WAIT_FULL_BLOCK_SCHEDULE_TIMES)
+ public int waitFullBlockScheduleTimes = 2;
+
public int getBeNumberForTest() {
return beNumberForTest;
}
@@ -2168,6 +2173,10 @@ public class SessionVariable implements Serializable,
Writable {
return sqlDialect;
}
+ public int getWaitFullBlockScheduleTimes() {
+ return waitFullBlockScheduleTimes;
+ }
+
public ParseDialect.Dialect getSqlParseDialect() {
return ParseDialect.Dialect.getByName(sqlDialect);
}
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index d1a779e285a..401eb548a01 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -257,6 +257,8 @@ struct TQueryOptions {
90: optional bool skip_missing_version = false;
91: optional bool runtime_filter_wait_infinitely = false;
+
+ 92: optional i32 wait_full_block_schedule_times = 1;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]