This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 03a1d4ed70c branch-4.1: [fix](load) replace tablet writer close
polling with event wakeup #64221 (#64747)
03a1d4ed70c is described below
commit 03a1d4ed70cf373a69e25db4431742c3f37cb8f6
Author: github-actions[bot]
<41898282+github-actions[bot]@users.noreply.github.com>
AuthorDate: Wed Jun 24 14:51:06 2026 +0800
branch-4.1: [fix](load) replace tablet writer close polling with event
wakeup #64221 (#64747)
Cherry-picked from #64221
Co-authored-by: hui lai <[email protected]>
---
be/src/exec/sink/load_stream_map_pool.cpp | 17 ++++++++++---
be/src/exec/sink/load_stream_map_pool.h | 5 ++++
be/src/exec/sink/load_stream_stub.cpp | 32 ++++++++++++++++++++++--
be/src/exec/sink/load_stream_stub.h | 36 ++++++++++++++++++++++-----
be/src/exec/sink/writer/vtablet_writer.cpp | 26 +++++++++++++++++--
be/src/exec/sink/writer/vtablet_writer.h | 13 ++++++++++
be/src/exec/sink/writer/vtablet_writer_v2.cpp | 11 ++++++--
be/test/exec/sink/vtablet_writer_v2_test.cpp | 21 ++++++++++++++++
8 files changed, 145 insertions(+), 16 deletions(-)
diff --git a/be/src/exec/sink/load_stream_map_pool.cpp
b/be/src/exec/sink/load_stream_map_pool.cpp
index eb9a3c669b6..a135aee6a0f 100644
--- a/be/src/exec/sink/load_stream_map_pool.cpp
+++ b/be/src/exec/sink/load_stream_map_pool.cpp
@@ -32,7 +32,8 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t
src_id, int num_streams,
_num_incremental_streams(0),
_pool(pool),
_tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()),
- _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {
+ _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()),
+ _close_wait_notifier(std::make_shared<CloseWaitNotifier>()) {
DCHECK(num_streams > 0) << "stream num should be greater than 0";
DCHECK(num_use > 0) << "use num should be greater than 0";
}
@@ -46,9 +47,9 @@ std::shared_ptr<LoadStreamStubs>
LoadStreamMap::get_or_create(int64_t dst_id, bo
if (incremental) {
_num_incremental_streams.fetch_add(1);
}
- streams = std::make_shared<LoadStreamStubs>(_num_streams, _load_id,
_src_id,
- _tablet_schema_for_index,
- _enable_unique_mow_for_index,
incremental);
+ streams = std::make_shared<LoadStreamStubs>(
+ _num_streams, _load_id, _src_id, _tablet_schema_for_index,
_enable_unique_mow_for_index,
+ incremental, _close_wait_notifier);
_streams_for_node[dst_id] = streams;
return streams;
}
@@ -130,6 +131,14 @@ void LoadStreamMap::close_load(bool incremental) {
}
}
+int64_t LoadStreamMap::close_wait_version() const {
+ return _close_wait_notifier->close_wait_version();
+}
+
+void LoadStreamMap::wait_for_close_event(int64_t observed_version, int64_t
timeout_ms) {
+ _close_wait_notifier->wait_for_close_event(observed_version, timeout_ms);
+}
+
LoadStreamMapPool::LoadStreamMapPool() = default;
LoadStreamMapPool::~LoadStreamMapPool() = default;
diff --git a/be/src/exec/sink/load_stream_map_pool.h
b/be/src/exec/sink/load_stream_map_pool.h
index cb210b3663c..7cd447d615f 100644
--- a/be/src/exec/sink/load_stream_map_pool.h
+++ b/be/src/exec/sink/load_stream_map_pool.h
@@ -97,6 +97,10 @@ public:
// only call this method after release() returns true.
void close_load(bool incremental);
+ int64_t close_wait_version() const;
+
+ void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
+
std::unordered_map<int64_t, std::shared_ptr<LoadStreamStubs>>
get_streams_for_node() {
decltype(_streams_for_node) snapshot;
{
@@ -117,6 +121,7 @@ private:
LoadStreamMapPool* _pool = nullptr;
std::shared_ptr<IndexToTabletSchema> _tablet_schema_for_index;
std::shared_ptr<IndexToEnableMoW> _enable_unique_mow_for_index;
+ std::shared_ptr<CloseWaitNotifier> _close_wait_notifier;
std::mutex _tablets_to_commit_mutex;
std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>>
_tablets_to_commit;
diff --git a/be/src/exec/sink/load_stream_stub.cpp
b/be/src/exec/sink/load_stream_stub.cpp
index bc54ebd16fc..3e462badd4b 100644
--- a/be/src/exec/sink/load_stream_stub.cpp
+++ b/be/src/exec/sink/load_stream_stub.cpp
@@ -31,6 +31,24 @@
namespace doris {
#include "common/compile_check_begin.h"
+int64_t CloseWaitNotifier::close_wait_version() const {
+ return _close_wait_version.load(std::memory_order_acquire);
+}
+
+void CloseWaitNotifier::wait_for_close_event(int64_t observed_version, int64_t
timeout_ms) {
+ std::unique_lock<bthread::Mutex> lock(_close_wait_mutex);
+ if (observed_version != close_wait_version()) {
+ return;
+ }
+ static_cast<void>(_close_wait_cv.wait_for(lock, timeout_ms * 1000));
+}
+
+void CloseWaitNotifier::notify_close_wait() {
+ _close_wait_version.fetch_add(1, std::memory_order_acq_rel);
+ std::lock_guard<bthread::Mutex> lock(_close_wait_mutex);
+ _close_wait_cv.notify_all();
+}
+
int LoadStreamReplyHandler::on_received_messages(brpc::StreamId id,
butil::IOBuf* const messages[],
size_t size) {
auto stub = _stub.lock();
@@ -119,6 +137,7 @@ void LoadStreamReplyHandler::on_closed(brpc::StreamId id) {
return;
}
stub->_is_closed.store(true);
+ stub->notify_close_wait();
}
inline std::ostream& operator<<(std::ostream& ostr, const
LoadStreamReplyHandler& handler) {
@@ -129,12 +148,16 @@ inline std::ostream& operator<<(std::ostream& ostr, const
LoadStreamReplyHandler
LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
- std::shared_ptr<IndexToEnableMoW> mow_map, bool
incremental)
+ std::shared_ptr<IndexToEnableMoW> mow_map, bool
incremental,
+ std::shared_ptr<CloseWaitNotifier>
close_wait_notifier)
: _load_id(load_id),
_src_id(src_id),
_tablet_schema_for_index(schema_map),
_enable_unique_mow_for_index(mow_map),
- _is_incremental(incremental) {};
+ _is_incremental(incremental),
+ _close_wait_notifier(std::move(close_wait_notifier)) {
+ DCHECK(_close_wait_notifier != nullptr);
+};
LoadStreamStub::~LoadStreamStub() {
if (_is_open.load() && !_is_closed.load()) {
@@ -365,6 +388,10 @@ Status LoadStreamStub::close_finish_check(RuntimeState*
state, bool* is_closed)
return Status::OK();
}
+void LoadStreamStub::notify_close_wait() {
+ _close_wait_notifier->notify_close_wait();
+}
+
void LoadStreamStub::cancel(Status reason) {
LOG(WARNING) << *this << " is cancelled because of " << reason;
if (_is_open.load()) {
@@ -376,6 +403,7 @@ void LoadStreamStub::cancel(Status reason) {
_is_cancelled.store(true);
}
_is_closed.store(true);
+ notify_close_wait();
}
Status LoadStreamStub::_encode_and_send(PStreamHeader& header, std::span<const
Slice> data) {
diff --git a/be/src/exec/sink/load_stream_stub.h
b/be/src/exec/sink/load_stream_stub.h
index 4dfe2e252d9..638d82882a7 100644
--- a/be/src/exec/sink/load_stream_stub.h
+++ b/be/src/exec/sink/load_stream_stub.h
@@ -82,6 +82,20 @@ using IndexToEnableMoW =
std::allocator<phmap::Pair<const
int64_t, bool>>, 4,
std::mutex>;
+class CloseWaitNotifier {
+public:
+ int64_t close_wait_version() const;
+
+ void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
+
+ void notify_close_wait();
+
+private:
+ std::atomic<int64_t> _close_wait_version {0};
+ bthread::Mutex _close_wait_mutex;
+ bthread::ConditionVariable _close_wait_cv;
+};
+
class LoadStreamReplyHandler : public brpc::StreamInputHandler {
public:
LoadStreamReplyHandler(PUniqueId load_id, int64_t dst_id,
std::weak_ptr<LoadStreamStub> stub)
@@ -109,12 +123,17 @@ public:
// construct new stub
LoadStreamStub(PUniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
- std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental
= false);
+ std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental
= false,
+ std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
+ std::make_shared<CloseWaitNotifier>());
LoadStreamStub(UniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
- std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental
= false)
- : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map,
incremental) {};
+ std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental
= false,
+ std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
+ std::make_shared<CloseWaitNotifier>())
+ : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map,
incremental,
+ std::move(close_wait_notifier)) {};
// for mock this class in UT
#ifdef BE_TEST
@@ -244,6 +263,8 @@ public:
tablet_load_infos);
private:
+ void notify_close_wait();
+
Status _encode_and_send(PStreamHeader& header, std::span<const Slice> data
= {});
Status _send_with_buffer(butil::IOBuf& buf, bool sync = false);
Status _send_with_retry(butil::IOBuf& buf);
@@ -282,6 +303,7 @@ protected:
std::unordered_map<int64_t, Status> _failed_tablets;
bool _is_incremental = false;
+ std::shared_ptr<CloseWaitNotifier> _close_wait_notifier;
bthread::Mutex _write_mutex;
size_t _bytes_written = 0;
@@ -294,12 +316,14 @@ class LoadStreamStubs {
public:
LoadStreamStubs(size_t num_streams, UniqueId load_id, int64_t src_id,
std::shared_ptr<IndexToTabletSchema> schema_map,
- std::shared_ptr<IndexToEnableMoW> mow_map, bool
incremental = false)
+ std::shared_ptr<IndexToEnableMoW> mow_map, bool
incremental = false,
+ std::shared_ptr<CloseWaitNotifier> close_wait_notifier =
+ std::make_shared<CloseWaitNotifier>())
: _is_incremental(incremental) {
_streams.reserve(num_streams);
for (size_t i = 0; i < num_streams; i++) {
- _streams.emplace_back(
- new LoadStreamStub(load_id, src_id, schema_map, mow_map,
incremental));
+ _streams.emplace_back(new LoadStreamStub(load_id, src_id,
schema_map, mow_map,
+ incremental,
close_wait_notifier));
}
}
diff --git a/be/src/exec/sink/writer/vtablet_writer.cpp
b/be/src/exec/sink/writer/vtablet_writer.cpp
index 1af9e77272c..803082fa7ee 100644
--- a/be/src/exec/sink/writer/vtablet_writer.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer.cpp
@@ -103,6 +103,8 @@ bvar::PerSecond<bvar::Adder<int64_t>>
g_sink_write_rows_per_second("sink_through
bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms(
"load_back_pressure_version_time_ms");
+static constexpr int64_t CLOSE_WAIT_EVENT_FALLBACK_MS = 1000;
+
Status IndexChannel::init(RuntimeState* state, const
std::vector<TTabletWithPartition>& tablets,
bool incremental) {
SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
@@ -310,6 +312,20 @@ static Status
cancel_channel_and_check_intolerable_failure(Status status,
return status;
}
+void IndexChannel::wait_for_close_event(int64_t observed_version, int64_t
timeout_ms) {
+ std::unique_lock<bthread::Mutex> lock(_close_wait_mutex);
+ if (observed_version != close_wait_version()) {
+ return;
+ }
+ static_cast<void>(_close_wait_cv.wait_for(lock, timeout_ms * 1000));
+}
+
+void IndexChannel::notify_close_wait() {
+ _close_wait_version.fetch_add(1, std::memory_order_acq_rel);
+ std::lock_guard<bthread::Mutex> lock(_close_wait_mutex);
+ _close_wait_cv.notify_all();
+}
+
Status IndexChannel::close_wait(
RuntimeState* state, WriterStats* writer_stats,
std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map,
@@ -331,6 +347,7 @@ Status IndexChannel::close_wait(
}
}
while (true) {
+ int64_t close_wait_version = this->close_wait_version();
RETURN_IF_ERROR(check_each_node_channel_close(
&unfinished_node_channel_ids, node_add_batch_counter_map,
writer_stats, status));
bool quorum_success = _quorum_success(unfinished_node_channel_ids,
need_finish_tablets);
@@ -341,7 +358,7 @@ Status IndexChannel::close_wait(
<< ", load_id: " << print_id(_parent->_load_id);
break;
}
- bthread_usleep(1000 * 10);
+ wait_for_close_event(close_wait_version, CLOSE_WAIT_EVENT_FALLBACK_MS);
}
// 2. wait for all node channel to complete as much as possible
@@ -349,6 +366,7 @@ Status IndexChannel::close_wait(
int64_t arrival_quorum_success_time = UnixMillis();
int64_t max_wait_time_ms =
_calc_max_wait_time_ms(unfinished_node_channel_ids);
while (true) {
+ int64_t close_wait_version = this->close_wait_version();
RETURN_IF_ERROR(check_each_node_channel_close(&unfinished_node_channel_ids,
node_add_batch_counter_map, writer_stats,
status));
@@ -372,7 +390,8 @@ Status IndexChannel::close_wait(
<< unfinished_node_channel_host_str.str();
break;
}
- bthread_usleep(1000 * 10);
+ wait_for_close_event(close_wait_version,
std::min(CLOSE_WAIT_EVENT_FALLBACK_MS,
+ max_wait_time_ms
- elapsed_ms));
}
}
return status;
@@ -873,6 +892,7 @@ void VNodeChannel::_cancel_with_msg(const std::string& msg)
{
}
}
_cancelled = true;
+ _index_channel->notify_close_wait();
}
void VNodeChannel::_refresh_back_pressure_version_wait_time(
@@ -1135,6 +1155,7 @@ void VNodeChannel::_add_block_success_callback(const
PTabletWriterAddBlockResult
}
}
_add_batches_finished = true;
+ _index_channel->notify_close_wait();
}
} else {
_cancel_with_msg(fmt::format("{}, add batch req success but status
isn't ok, err: {}",
@@ -1185,6 +1206,7 @@ void VNodeChannel::_add_block_failed_callback(const
WriteBlockCallbackContext& c
// if this is last rpc, will must set _add_batches_finished.
otherwise, node channel's close_wait
// will be blocked.
_add_batches_finished = true;
+ _index_channel->notify_close_wait();
}
}
diff --git a/be/src/exec/sink/writer/vtablet_writer.h
b/be/src/exec/sink/writer/vtablet_writer.h
index d3e6e8da0f1..69b78fa280c 100644
--- a/be/src/exec/sink/writer/vtablet_writer.h
+++ b/be/src/exec/sink/writer/vtablet_writer.h
@@ -31,6 +31,7 @@
#include <google/protobuf/stubs/callback.h>
// IWYU pragma: no_include <bits/chrono.h>
+#include <bthread/condition_variable.h>
#include <bthread/mutex.h>
#include <atomic>
@@ -524,6 +525,14 @@ public:
std::unordered_set<int64_t> unfinished_node_channel_ids,
bool need_wait_after_quorum_success);
+ int64_t close_wait_version() const {
+ return _close_wait_version.load(std::memory_order_acquire);
+ }
+
+ void wait_for_close_event(int64_t observed_version, int64_t timeout_ms);
+
+ void notify_close_wait();
+
Status check_each_node_channel_close(
std::unordered_set<int64_t>* unfinished_node_channel_ids,
std::unordered_map<int64_t, AddBatchCounter>*
node_add_batch_counter_map,
@@ -614,6 +623,10 @@ private:
std::map<int64_t, std::vector<std::pair<int64_t, int64_t>>>
_tablets_filtered_rows;
int64_t _start_time = 0;
+
+ std::atomic<int64_t> _close_wait_version {0};
+ bthread::Mutex _close_wait_mutex;
+ bthread::ConditionVariable _close_wait_cv;
};
} // namespace doris
diff --git a/be/src/exec/sink/writer/vtablet_writer_v2.cpp
b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
index 37c9b94ddf2..610500aeffa 100644
--- a/be/src/exec/sink/writer/vtablet_writer_v2.cpp
+++ b/be/src/exec/sink/writer/vtablet_writer_v2.cpp
@@ -24,6 +24,7 @@
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
+#include <algorithm>
#include <cstdint>
#include <mutex>
#include <ranges>
@@ -59,6 +60,8 @@ namespace doris {
extern bvar::Adder<int64_t> g_sink_load_back_pressure_version_time_ms;
+static constexpr int64_t CLOSE_WAIT_EVENT_FALLBACK_MS = 1000;
+
VTabletWriterV2::VTabletWriterV2(const TDataSink& t_sink, const
VExprContextSPtrs& output_exprs,
std::shared_ptr<Dependency> dep,
std::shared_ptr<Dependency> fin_dep)
@@ -821,6 +824,7 @@ Status VTabletWriterV2::_close_wait(
}
}
while (true) {
+ int64_t close_wait_version = _load_stream_map->close_wait_version();
RETURN_IF_ERROR(_check_timeout());
RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status,
streams_for_node));
bool quorum_success = _quorum_success(unfinished_streams,
need_finish_tablets);
@@ -830,7 +834,7 @@ Status VTabletWriterV2::_close_wait(
<< ", txn_id: " << _txn_id << ", load_id: " <<
print_id(_load_id);
break;
}
- bthread_usleep(1000 * 10);
+ _load_stream_map->wait_for_close_event(close_wait_version,
CLOSE_WAIT_EVENT_FALLBACK_MS);
}
// 2. then wait for remaining streams as much as possible
@@ -838,6 +842,7 @@ Status VTabletWriterV2::_close_wait(
int64_t arrival_quorum_success_time = UnixMillis();
int64_t max_wait_time_ms = _calc_max_wait_time_ms(streams_for_node,
unfinished_streams);
while (true) {
+ int64_t close_wait_version =
_load_stream_map->close_wait_version();
RETURN_IF_ERROR(_check_timeout());
RETURN_IF_ERROR(_check_streams_finish(unfinished_streams, status,
streams_for_node));
if (unfinished_streams.empty()) {
@@ -856,7 +861,9 @@ Status VTabletWriterV2::_close_wait(
<< ", unfinished streams: " <<
unfinished_streams_str.str();
break;
}
- bthread_usleep(1000 * 10);
+ _load_stream_map->wait_for_close_event(
+ close_wait_version,
+ std::min(CLOSE_WAIT_EVENT_FALLBACK_MS, max_wait_time_ms -
elapsed_ms));
}
}
diff --git a/be/test/exec/sink/vtablet_writer_v2_test.cpp
b/be/test/exec/sink/vtablet_writer_v2_test.cpp
index 2ddde5bc4b3..10814c4b9db 100644
--- a/be/test/exec/sink/vtablet_writer_v2_test.cpp
+++ b/be/test/exec/sink/vtablet_writer_v2_test.cpp
@@ -294,6 +294,27 @@ TEST_F(TestVTabletWriterV2,
shared_delta_writer_should_not_access_destroyed_crea
current_writer->_cancel(Status::Cancelled("test cleanup"));
}
+TEST_F(TestVTabletWriterV2,
close_wait_notifier_should_be_scoped_to_load_stream_map) {
+ UniqueId load_id1;
+ UniqueId load_id2;
+ load_id2.lo = 1;
+ std::shared_ptr<LoadStreamMap> load_stream_map1 =
+ std::make_shared<LoadStreamMap>(load_id1, src_id, 1, 1, nullptr);
+ std::shared_ptr<LoadStreamMap> load_stream_map2 =
+ std::make_shared<LoadStreamMap>(load_id2, src_id, 1, 1, nullptr);
+ auto streams1 = load_stream_map1->get_or_create(1001);
+ auto streams2 = load_stream_map2->get_or_create(1002);
+ streams1->mark_open();
+ streams2->mark_open();
+
+ int64_t version1 = load_stream_map1->close_wait_version();
+ int64_t version2 = load_stream_map2->close_wait_version();
+ streams1->select_one_stream()->cancel(Status::Cancelled("test"));
+
+ ASSERT_GT(load_stream_map1->close_wait_version(), version1);
+ ASSERT_EQ(load_stream_map2->close_wait_version(), version2);
+}
+
TEST_F(TestVTabletWriterV2, one_replica) {
UniqueId load_id;
std::vector<TTabletCommitInfo> tablet_commit_infos;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]