This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 09a7a355478 [fix] (streamload) fixed the issue of data loss due to
concurrency when importing data from streamload (#48948) (#49666)
09a7a355478 is described below
commit 09a7a355478bd5f18572da508b6780f28817c02b
Author: Xin Liao <[email protected]>
AuthorDate: Mon Mar 31 11:18:42 2025 +0800
[fix] (streamload) fixed the issue of data loss due to concurrency when
importing data from streamload (#48948) (#49666)
Cherry-picked from #48948
Co-authored-by: kang <[email protected]>
Co-authored-by: lik40 <[email protected]>
---
be/src/vec/sink/writer/async_result_writer.cpp | 54 +++++++++++++-------------
1 file changed, 28 insertions(+), 26 deletions(-)
diff --git a/be/src/vec/sink/writer/async_result_writer.cpp
b/be/src/vec/sink/writer/async_result_writer.cpp
index 29877cb268a..0cc37e3458b 100644
--- a/be/src/vec/sink/writer/async_result_writer.cpp
+++ b/be/src/vec/sink/writer/async_result_writer.cpp
@@ -108,41 +108,43 @@ void AsyncResultWriter::process_block(RuntimeState*
state, RuntimeProfile* profi
}
DCHECK(_dependency);
- if (_writer_status.ok()) {
- while (true) {
- ThreadCpuStopWatch cpu_time_stop_watch;
- cpu_time_stop_watch.start();
- Defer defer {[&]() {
- if (state && state->get_query_ctx()) {
-
state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time());
- }
- }};
- if (!_eos && _data_queue.empty() && _writer_status.ok()) {
- std::unique_lock l(_m);
- while (!_eos && _data_queue.empty() && _writer_status.ok()) {
- // Add 1s to check to avoid lost signal
- _cv.wait_for(l, std::chrono::seconds(1));
- }
+ while (_writer_status.ok()) {
+ ThreadCpuStopWatch cpu_time_stop_watch;
+ cpu_time_stop_watch.start();
+ Defer defer {[&]() {
+ if (state && state->get_query_ctx()) {
+
state->get_query_ctx()->update_cpu_time(cpu_time_stop_watch.elapsed_time());
+ }
+ }};
+
+ //1) wait scan operator write data
+ {
+ std::unique_lock l(_m);
+ while (!_eos && _data_queue.empty() && _writer_status.ok()) {
+ // Add 1s to check to avoid lost signal
+ _cv.wait_for(l, std::chrono::seconds(1));
}
+ //check if eos or writer error
if ((_eos && _data_queue.empty()) || !_writer_status.ok()) {
_data_queue.clear();
break;
}
+ }
- auto block = _get_block_from_queue();
- auto status = write(state, *block);
- if (!status.ok()) [[unlikely]] {
- std::unique_lock l(_m);
- _writer_status.update(status);
- if (_is_finished()) {
- _dependency->set_ready();
- }
- break;
+ //2) get the block from data queue and write to downstream
+ auto block = _get_block_from_queue();
+ auto status = write(state, *block);
+ if (!status.ok()) [[unlikely]] {
+ std::unique_lock l(_m);
+ _writer_status.update(status);
+ if (_is_finished()) {
+ _dependency->set_ready();
}
-
- _return_free_block(std::move(block));
+ break;
}
+
+ _return_free_block(std::move(block));
}
bool need_finish = false;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]