This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 0f0567eb730 Revert "[fix](move-memtable) fix initial use count of streams for auto partition (#33165) (#33236)" 0f0567eb730 is described below commit 0f0567eb7303fe2006581c99a92d47ba2739f8ee Author: yiguolei <yiguo...@gmail.com> AuthorDate: Thu Apr 4 00:35:18 2024 +0800 Revert "[fix](move-memtable) fix initial use count of streams for auto partition (#33165) (#33236)" This reverts commit df197c6a1429a2b113e23193a4c8a32b0b4726fe. --- be/src/vec/sink/load_stream_stub.cpp | 33 +++- be/src/vec/sink/load_stream_stub.h | 14 +- be/src/vec/sink/load_stream_stub_pool.cpp | 110 ++++---------- be/src/vec/sink/load_stream_stub_pool.h | 50 +++--- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 192 ++++++++++++------------ be/src/vec/sink/writer/vtablet_writer_v2.h | 10 +- be/test/io/fs/stream_sink_file_writer_test.cpp | 4 +- be/test/vec/exec/load_stream_stub_pool_test.cpp | 31 ++-- 8 files changed, 188 insertions(+), 256 deletions(-) diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 7708e3b255f..6eb91e46853 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -125,13 +125,19 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler return ostr; } -LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, - std::shared_ptr<IndexToTabletSchema> schema_map, - std::shared_ptr<IndexToEnableMoW> mow_map) - : _load_id(load_id), +LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use) + : _use_cnt(num_use), + _load_id(load_id), _src_id(src_id), - _tablet_schema_for_index(schema_map), - _enable_unique_mow_for_index(mow_map) {}; + _tablet_schema_for_index(std::make_shared<IndexToTabletSchema>()), + _enable_unique_mow_for_index(std::make_shared<IndexToEnableMoW>()) {}; + +LoadStreamStub::LoadStreamStub(LoadStreamStub& stub) + : _use_cnt(stub._use_cnt.load()), + _load_id(stub._load_id), + _src_id(stub._src_id), + _tablet_schema_for_index(stub._tablet_schema_for_index), + _enable_unique_mow_for_index(stub._enable_unique_mow_for_index) {}; LoadStreamStub::~LoadStreamStub() { if (_is_init.load() && !_is_closed.load()) { @@ -235,12 +241,23 @@ Status LoadStreamStub::add_segment(int64_t partition_id, int64_t index_id, int64 // CLOSE_LOAD Status LoadStreamStub::close_load(const std::vector<PTabletID>& tablets_to_commit) { + { + std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex); + _tablets_to_commit.insert(_tablets_to_commit.end(), tablets_to_commit.begin(), + tablets_to_commit.end()); + } + if (--_use_cnt > 0) { + return Status::OK(); + } PStreamHeader header; *header.mutable_load_id() = _load_id; header.set_src_id(_src_id); header.set_opcode(doris::PStreamHeader::CLOSE_LOAD); - for (const auto& tablet : tablets_to_commit) { - *header.add_tablets() = tablet; + { + std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex); + for (const auto& tablet : _tablets_to_commit) { + *header.add_tablets() = tablet; + } } return _encode_and_send(header); } diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index aa8b850760e..f20b0e6ea3d 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -109,14 +109,10 @@ class LoadStreamStub { public: // construct new stub - LoadStreamStub(PUniqueId load_id, int64_t src_id, - std::shared_ptr<IndexToTabletSchema> schema_map, - std::shared_ptr<IndexToEnableMoW> mow_map); + LoadStreamStub(PUniqueId load_id, int64_t src_id, int num_use); - LoadStreamStub(UniqueId load_id, int64_t src_id, - std::shared_ptr<IndexToTabletSchema> schema_map, - std::shared_ptr<IndexToEnableMoW> mow_map) - : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) {}; + // copy constructor, shared_ptr members are shared + LoadStreamStub(LoadStreamStub& stub); // for mock this class in UT #ifdef BE_TEST @@ -217,6 +213,7 @@ protected: std::atomic<bool> _is_closed; std::atomic<bool> _is_cancelled; std::atomic<bool> _is_eos; + std::atomic<int> _use_cnt; PUniqueId _load_id; brpc::StreamId _stream_id; @@ -229,6 +226,9 @@ protected: bthread::Mutex _cancel_mutex; bthread::ConditionVariable _close_cv; + std::mutex _tablets_to_commit_mutex; + std::vector<PTabletID> _tablets_to_commit; + std::mutex _buffer_mutex; std::mutex _send_mutex; butil::IOBuf _buffer; diff --git a/be/src/vec/sink/load_stream_stub_pool.cpp b/be/src/vec/sink/load_stream_stub_pool.cpp index 3eae49aff77..d76402b57d5 100644 --- a/be/src/vec/sink/load_stream_stub_pool.cpp +++ b/be/src/vec/sink/load_stream_stub_pool.cpp @@ -23,104 +23,50 @@ namespace doris { class TExpr; -LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, - LoadStreamStubPool* pool) - : _load_id(load_id), - _src_id(src_id), - _num_streams(num_streams), - _use_cnt(num_use), - _pool(pool) { - DCHECK(num_streams > 0) << "stream num should be greater than 0"; - DCHECK(num_use > 0) << "use num should be greater than 0"; -} - -std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) { - std::lock_guard<std::mutex> lock(_mutex); - std::shared_ptr<Streams> streams = _streams_for_node[dst_id]; - if (streams != nullptr) { - return streams; - } - streams = std::make_shared<Streams>(); - auto schema_map = std::make_shared<IndexToTabletSchema>(); - auto mow_map = std::make_shared<IndexToEnableMoW>(); - for (int i = 0; i < _num_streams; i++) { - streams->emplace_back(new LoadStreamStub(_load_id, _src_id, schema_map, mow_map)); - } - _streams_for_node[dst_id] = streams; - return streams; -} - -std::shared_ptr<Streams> LoadStreamMap::at(int64_t dst_id) { - std::lock_guard<std::mutex> lock(_mutex); - return _streams_for_node.at(dst_id); -} - -bool LoadStreamMap::contains(int64_t dst_id) { - std::lock_guard<std::mutex> lock(_mutex); - return _streams_for_node.contains(dst_id); -} +LoadStreams::LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool) + : _load_id(load_id), _dst_id(dst_id), _use_cnt(num_use), _pool(pool) {} -void LoadStreamMap::for_each(std::function<void(int64_t, const Streams&)> fn) { - std::lock_guard<std::mutex> lock(_mutex); - for (auto& [dst_id, streams] : _streams_for_node) { - fn(dst_id, *streams); - } -} - -Status LoadStreamMap::for_each_st(std::function<Status(int64_t, const Streams&)> fn) { - std::lock_guard<std::mutex> lock(_mutex); - for (auto& [dst_id, streams] : _streams_for_node) { - RETURN_IF_ERROR(fn(dst_id, *streams)); - } - return Status::OK(); -} - -void LoadStreamMap::save_tablets_to_commit(int64_t dst_id, - const std::vector<PTabletID>& tablets_to_commit) { - std::lock_guard<std::mutex> lock(_tablets_to_commit_mutex); - auto& tablets = _tablets_to_commit[dst_id]; - tablets.insert(tablets.end(), tablets_to_commit.begin(), tablets_to_commit.end()); -} - -bool LoadStreamMap::release() { +void LoadStreams::release() { int num_use = --_use_cnt; + DBUG_EXECUTE_IF("LoadStreams.release.keeping_streams", { num_use = 1; }); if (num_use == 0) { - LOG(INFO) << "releasing streams, load_id=" << _load_id; - _pool->erase(_load_id); - return true; + LOG(INFO) << "releasing streams, load_id=" << _load_id << ", dst_id=" << _dst_id; + _pool->erase(_load_id, _dst_id); + } else { + LOG(INFO) << "keeping streams, load_id=" << _load_id << ", dst_id=" << _dst_id + << ", use_cnt=" << num_use; } - LOG(INFO) << "keeping streams, load_id=" << _load_id << ", use_cnt=" << num_use; - return false; -} - -Status LoadStreamMap::close_load() { - return for_each_st([this](int64_t dst_id, const Streams& streams) -> Status { - const auto& tablets = _tablets_to_commit[dst_id]; - for (auto& stream : streams) { - RETURN_IF_ERROR(stream->close_load(tablets)); - } - return Status::OK(); - }); } LoadStreamStubPool::LoadStreamStubPool() = default; LoadStreamStubPool::~LoadStreamStubPool() = default; -std::shared_ptr<LoadStreamMap> LoadStreamStubPool::get_or_create(UniqueId load_id, int64_t src_id, - int num_streams, int num_use) { + +std::shared_ptr<LoadStreams> LoadStreamStubPool::get_or_create(PUniqueId load_id, int64_t src_id, + int64_t dst_id, int num_streams, + int num_sink) { + auto key = std::make_pair(UniqueId(load_id), dst_id); std::lock_guard<std::mutex> lock(_mutex); - std::shared_ptr<LoadStreamMap> streams = _pool[load_id]; - if (streams != nullptr) { + std::shared_ptr<LoadStreams> streams = _pool[key]; + if (streams) { return streams; } - streams = std::make_shared<LoadStreamMap>(load_id, src_id, num_streams, num_use, this); - _pool[load_id] = streams; + DCHECK(num_streams > 0) << "stream num should be greater than 0"; + DCHECK(num_sink > 0) << "sink num should be greater than 0"; + auto [it, _] = _template_stubs.emplace(load_id, new LoadStreamStub {load_id, src_id, num_sink}); + streams = std::make_shared<LoadStreams>(load_id, dst_id, num_sink, this); + for (int32_t i = 0; i < num_streams; i++) { + // copy construct, internal tablet schema map will be shared among all stubs + streams->streams().emplace_back(new LoadStreamStub {*it->second}); + } + _pool[key] = streams; return streams; } -void LoadStreamStubPool::erase(UniqueId load_id) { +void LoadStreamStubPool::erase(UniqueId load_id, int64_t dst_id) { std::lock_guard<std::mutex> lock(_mutex); - _pool.erase(load_id); + _pool.erase(std::make_pair(load_id, dst_id)); + _template_stubs.erase(load_id); } } // namespace doris diff --git a/be/src/vec/sink/load_stream_stub_pool.h b/be/src/vec/sink/load_stream_stub_pool.h index 65f3bb66cd2..662fc5bc1a1 100644 --- a/be/src/vec/sink/load_stream_stub_pool.h +++ b/be/src/vec/sink/load_stream_stub_pool.h @@ -72,41 +72,20 @@ class LoadStreamStubPool; using Streams = std::vector<std::shared_ptr<LoadStreamStub>>; -class LoadStreamMap { +class LoadStreams { public: - LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, - LoadStreamStubPool* pool); + LoadStreams(UniqueId load_id, int64_t dst_id, int num_use, LoadStreamStubPool* pool); - std::shared_ptr<Streams> get_or_create(int64_t dst_id); + void release(); - std::shared_ptr<Streams> at(int64_t dst_id); - - bool contains(int64_t dst_id); - - void for_each(std::function<void(int64_t, const Streams&)> fn); - - Status for_each_st(std::function<Status(int64_t, const Streams&)> fn); - - void save_tablets_to_commit(int64_t dst_id, const std::vector<PTabletID>& tablets_to_commit); - - // Return true if the last instance is just released. - bool release(); - - // send CLOSE_LOAD to all streams, return ERROR if any. - // only call this method after release() returns true. - Status close_load(); + Streams& streams() { return _streams; } private: - const UniqueId _load_id; - const int64_t _src_id; - const int _num_streams; + Streams _streams; + UniqueId _load_id; + int64_t _dst_id; std::atomic<int> _use_cnt; - std::mutex _mutex; - std::unordered_map<int64_t, std::shared_ptr<Streams>> _streams_for_node; LoadStreamStubPool* _pool = nullptr; - - std::mutex _tablets_to_commit_mutex; - std::unordered_map<int64_t, std::vector<PTabletID>> _tablets_to_commit; }; class LoadStreamStubPool { @@ -115,19 +94,26 @@ public: ~LoadStreamStubPool(); - std::shared_ptr<LoadStreamMap> get_or_create(UniqueId load_id, int64_t src_id, int num_streams, - int num_use); + std::shared_ptr<LoadStreams> get_or_create(PUniqueId load_id, int64_t src_id, int64_t dst_id, + int num_streams, int num_sink); - void erase(UniqueId load_id); + void erase(UniqueId load_id, int64_t dst_id); size_t size() { std::lock_guard<std::mutex> lock(_mutex); return _pool.size(); } + // for UT only + size_t templates_size() { + std::lock_guard<std::mutex> lock(_mutex); + return _template_stubs.size(); + } + private: std::mutex _mutex; - std::unordered_map<UniqueId, std::shared_ptr<LoadStreamMap>> _pool; + std::unordered_map<UniqueId, std::unique_ptr<LoadStreamStub>> _template_stubs; + std::unordered_map<std::pair<UniqueId, int64_t>, std::shared_ptr<LoadStreams>> _pool; }; } // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 315df063ea0..68506ca161e 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -98,7 +98,7 @@ Status VTabletWriterV2::_incremental_open_streams( tablet.set_partition_id(partition->id); tablet.set_index_id(index.index_id); tablet.set_tablet_id(tablet_id); - if (!_streams_for_node->contains(node)) { + if (!_streams_for_node.contains(node)) { new_backends.insert(node); } _tablets_for_node[node].emplace(tablet_id, tablet); @@ -111,9 +111,11 @@ Status VTabletWriterV2::_incremental_open_streams( } } } - for (int64_t dst_id : new_backends) { - auto streams = _streams_for_node->get_or_create(dst_id); - RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); + for (int64_t node_id : new_backends) { + auto load_streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( + _load_id, _backend_id, node_id, _stream_per_node, _num_local_sink); + RETURN_IF_ERROR(_open_streams_to_backend(node_id, *load_streams)); + _streams_for_node[node_id] = load_streams; } return Status::OK(); } @@ -240,8 +242,6 @@ Status VTabletWriterV2::_init(RuntimeState* state, RuntimeProfile* profile) { } else { _delta_writer_for_tablet = std::make_shared<DeltaWriterV2Map>(_load_id); } - _streams_for_node = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( - _load_id, _backend_id, _stream_per_node, _num_local_sink); return Status::OK(); } @@ -253,21 +253,23 @@ Status VTabletWriterV2::open(RuntimeState* state, RuntimeProfile* profile) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get()); RETURN_IF_ERROR(_build_tablet_node_mapping()); - RETURN_IF_ERROR(_open_streams()); + RETURN_IF_ERROR(_open_streams(_backend_id)); RETURN_IF_ERROR(_init_row_distribution()); return Status::OK(); } -Status VTabletWriterV2::_open_streams() { +Status VTabletWriterV2::_open_streams(int64_t src_id) { for (auto& [dst_id, _] : _tablets_for_node) { - auto streams = _streams_for_node->get_or_create(dst_id); + auto streams = ExecEnv::GetInstance()->load_stream_stub_pool()->get_or_create( + _load_id, src_id, dst_id, _stream_per_node, _num_local_sink); RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); + _streams_for_node[dst_id] = streams; } return Status::OK(); } -Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& streams) { +Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, LoadStreams& streams) { const auto* node_info = _nodes_info->find_node(dst_id); DBUG_EXECUTE_IF("VTabletWriterV2._open_streams_to_backend.node_info_null", { node_info = nullptr; }); @@ -276,14 +278,14 @@ Status VTabletWriterV2::_open_streams_to_backend(int64_t dst_id, Streams& stream } auto idle_timeout_ms = _state->execution_timeout() * 1000; // get tablet schema from each backend only in the 1st stream - for (auto& stream : streams | std::ranges::views::take(1)) { + for (auto& stream : streams.streams() | std::ranges::views::take(1)) { const std::vector<PTabletID>& tablets_for_schema = _indexes_from_node[node_info->id]; RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, tablets_for_schema, _total_streams, idle_timeout_ms, _state->enable_profile())); } // for the rest streams, open without getting tablet schema - for (auto& stream : streams | std::ranges::views::drop(1)) { + for (auto& stream : streams.streams() | std::ranges::views::drop(1)) { RETURN_IF_ERROR(stream->open(stream, _state->exec_env()->brpc_internal_client_cache(), *node_info, _txn_id, *_schema, {}, _total_streams, idle_timeout_ms, _state->enable_profile())); @@ -358,7 +360,7 @@ Status VTabletWriterV2::_select_streams(int64_t tablet_id, int64_t partition_id, tablet.set_index_id(index_id); tablet.set_tablet_id(tablet_id); _tablets_for_node[node_id].emplace(tablet_id, tablet); - streams.emplace_back(_streams_for_node->at(node_id)->at(_stream_index)); + streams.emplace_back(_streams_for_node.at(node_id)->streams().at(_stream_index)); RETURN_IF_ERROR(streams[0]->wait_for_schema(partition_id, index_id, tablet_id)); } _stream_index = (_stream_index + 1) % _stream_per_node; @@ -467,13 +469,11 @@ Status VTabletWriterV2::_cancel(Status status) { _delta_writer_for_tablet->cancel(status); _delta_writer_for_tablet.reset(); } - if (_streams_for_node) { - _streams_for_node->for_each([status](int64_t dst_id, const Streams& streams) { - for (auto& stream : streams) { - stream->cancel(status); - } - }); - _streams_for_node->release(); + for (const auto& [_, streams] : _streams_for_node) { + for (const auto& stream : streams->streams()) { + stream->cancel(status); + } + streams->release(); } return Status::OK(); } @@ -527,83 +527,87 @@ Status VTabletWriterV2::close(Status exec_status) { COUNTER_SET(_row_distribution_timer, (int64_t)_row_distribution_watch.elapsed_time()); COUNTER_SET(_validate_data_timer, _block_convertor->validate_data_ns()); - // close DeltaWriters + // defer stream release to prevent memory leak + Defer defer([&] { + for (const auto& [_, streams] : _streams_for_node) { + streams->release(); + } + _streams_for_node.clear(); + }); + { SCOPED_TIMER(_close_writer_timer); // close all delta writers if this is the last user - auto st = _delta_writer_for_tablet->close(_profile); + RETURN_IF_ERROR(_delta_writer_for_tablet->close(_profile)); _delta_writer_for_tablet.reset(); - if (!st.ok()) { - RETURN_IF_ERROR(_cancel(st)); - } } - _calc_tablets_to_commit(); - const bool is_last_sink = _streams_for_node->release(); - LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink - << ", load_id=" << print_id(_load_id); + { + // send CLOSE_LOAD to all streams, return ERROR if any + for (const auto& [_, streams] : _streams_for_node) { + RETURN_IF_ERROR(_close_load(streams->streams())); + } + } - // send CLOSE_LOAD and close_wait on all streams - if (is_last_sink) { - RETURN_IF_ERROR(_streams_for_node->close_load()); + { SCOPED_TIMER(_close_load_timer); - RETURN_IF_ERROR(_streams_for_node->for_each_st( - [this](int64_t dst_id, const Streams& streams) -> Status { - for (auto& stream : streams) { - int64_t remain_ms = - static_cast<int64_t>(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); - } - return Status::OK(); - })); + for (const auto& [_, streams] : _streams_for_node) { + for (const auto& stream : streams->streams()) { + int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" + << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); + } + RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); + } + } } - // calculate and submit commit info - if (is_last_sink) { - std::unordered_map<int64_t, int> failed_tablets; - std::unordered_map<int64_t, Status> failed_reason; - std::vector<TTabletCommitInfo> tablet_commit_infos; + std::unordered_map<int64_t, int> failed_tablets; - _streams_for_node->for_each([&](int64_t dst_id, const Streams& streams) { + std::vector<TTabletCommitInfo> tablet_commit_infos; + for (const auto& [node_id, streams] : _streams_for_node) { + for (const auto& stream : streams->streams()) { std::unordered_set<int64_t> known_tablets; - for (const auto& stream : streams) { - for (auto [tablet_id, reason] : stream->failed_tablets()) { - if (known_tablets.contains(tablet_id)) { - continue; - } - known_tablets.insert(tablet_id); - failed_tablets[tablet_id]++; - failed_reason[tablet_id] = reason; + for (auto [tablet_id, _] : stream->failed_tablets()) { + if (known_tablets.contains(tablet_id)) { + continue; } - for (auto tablet_id : stream->success_tablets()) { - if (known_tablets.contains(tablet_id)) { - continue; - } - known_tablets.insert(tablet_id); - TTabletCommitInfo commit_info; - commit_info.tabletId = tablet_id; - commit_info.backendId = dst_id; - tablet_commit_infos.emplace_back(std::move(commit_info)); + known_tablets.insert(tablet_id); + failed_tablets[tablet_id]++; + } + for (auto tablet_id : stream->success_tablets()) { + if (known_tablets.contains(tablet_id)) { + continue; } + known_tablets.insert(tablet_id); + TTabletCommitInfo commit_info; + commit_info.tabletId = tablet_id; + commit_info.backendId = node_id; + tablet_commit_infos.emplace_back(std::move(commit_info)); } - }); - - for (auto [tablet_id, replicas] : failed_tablets) { - if (replicas > (_num_replicas - 1) / 2) { - return failed_reason.at(tablet_id); + } + } + for (auto [tablet_id, replicas] : failed_tablets) { + if (replicas <= (_num_replicas - 1) / 2) { + continue; + } + auto backends = _location->find_tablet(tablet_id)->node_ids; + for (auto& backend_id : backends) { + for (const auto& stream : _streams_for_node[backend_id]->streams()) { + const auto& failed_tablets = stream->failed_tablets(); + if (failed_tablets.contains(tablet_id)) { + return failed_tablets.at(tablet_id); + } } } - _state->tablet_commit_infos().insert( - _state->tablet_commit_infos().end(), - std::make_move_iterator(tablet_commit_infos.begin()), - std::make_move_iterator(tablet_commit_infos.end())); + DCHECK(false) << "failed tablet " << tablet_id << " should have failed reason"; } + _state->tablet_commit_infos().insert(_state->tablet_commit_infos().end(), + std::make_move_iterator(tablet_commit_infos.begin()), + std::make_move_iterator(tablet_commit_infos.end())); // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node int64_t num_rows_load_total = _number_input_rows + _state->num_rows_load_filtered() + @@ -625,28 +629,18 @@ Status VTabletWriterV2::close(Status exec_status) { return status; } -void VTabletWriterV2::_calc_tablets_to_commit() { - for (const auto& [dst_id, tablets] : _tablets_for_node) { - std::vector<PTabletID> tablets_to_commit; - std::vector<int64_t> partition_ids; - for (const auto& [tablet_id, tablet] : tablets) { - if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { - if (VLOG_DEBUG_IS_ON) { - partition_ids.push_back(tablet.partition_id()); - } - tablets_to_commit.push_back(tablet); - } +Status VTabletWriterV2::_close_load(const Streams& streams) { + auto node_id = streams[0]->dst_id(); + std::vector<PTabletID> tablets_to_commit; + for (auto [tablet_id, tablet] : _tablets_for_node[node_id]) { + if (_tablet_finder->partition_ids().contains(tablet.partition_id())) { + tablets_to_commit.push_back(tablet); } - if (VLOG_DEBUG_IS_ON) { - std::string msg("close load partitions: "); - msg.reserve(partition_ids.size() * 7); - for (auto v : partition_ids) { - msg.append(std::to_string(v) + ", "); - } - LOG(WARNING) << msg; - } - _streams_for_node->save_tablets_to_commit(dst_id, tablets_to_commit); } + for (const auto& stream : streams) { + RETURN_IF_ERROR(stream->close_load(tablets_to_commit)); + } + return Status::OK(); } } // namespace doris::vectorized diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index 7785733bf4a..460b3acc33f 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -69,7 +69,7 @@ namespace doris { class DeltaWriterV2; class LoadStreamStub; -class LoadStreamMap; +class LoadStreams; class ObjectPool; class RowDescriptor; class RuntimeState; @@ -121,9 +121,9 @@ private: Status _init(RuntimeState* state, RuntimeProfile* profile); - Status _open_streams(); + Status _open_streams(int64_t src_id); - Status _open_streams_to_backend(int64_t dst_id, Streams& streams); + Status _open_streams_to_backend(int64_t dst_id, LoadStreams& streams); Status _incremental_open_streams(const std::vector<TOlapTablePartition>& partitions); @@ -140,7 +140,7 @@ private: Status _select_streams(int64_t tablet_id, int64_t partition_id, int64_t index_id, Streams& streams); - void _calc_tablets_to_commit(); + Status _close_load(const Streams& streams); Status _cancel(Status status); @@ -217,7 +217,7 @@ private: std::unordered_map<int64_t, std::unordered_map<int64_t, PTabletID>> _tablets_for_node; std::unordered_map<int64_t, std::vector<PTabletID>> _indexes_from_node; - std::shared_ptr<LoadStreamMap> _streams_for_node; + std::unordered_map<int64_t, std::shared_ptr<LoadStreams>> _streams_for_node; size_t _stream_index = 0; std::shared_ptr<DeltaWriterV2Map> _delta_writer_for_tablet; diff --git a/be/test/io/fs/stream_sink_file_writer_test.cpp b/be/test/io/fs/stream_sink_file_writer_test.cpp index ad6e496c56f..7e5bdd350f5 100644 --- a/be/test/io/fs/stream_sink_file_writer_test.cpp +++ b/be/test/io/fs/stream_sink_file_writer_test.cpp @@ -51,9 +51,7 @@ static std::atomic<int64_t> g_num_request; class StreamSinkFileWriterTest : public testing::Test { class MockStreamStub : public LoadStreamStub { public: - MockStreamStub(PUniqueId load_id, int64_t src_id) - : LoadStreamStub(load_id, src_id, std::make_shared<IndexToTabletSchema>(), - std::make_shared<IndexToEnableMoW>()) {}; + MockStreamStub(PUniqueId load_id, int64_t src_id) : LoadStreamStub(load_id, src_id, 1) {}; virtual ~MockStreamStub() = default; diff --git a/be/test/vec/exec/load_stream_stub_pool_test.cpp b/be/test/vec/exec/load_stream_stub_pool_test.cpp index e576db3bdaa..24da3bb6999 100644 --- a/be/test/vec/exec/load_stream_stub_pool_test.cpp +++ b/be/test/vec/exec/load_stream_stub_pool_test.cpp @@ -32,29 +32,20 @@ TEST_F(LoadStreamStubPoolTest, test) { LoadStreamStubPool pool; int64_t src_id = 100; PUniqueId load_id; - load_id.set_lo(1); + load_id.set_hi(1); load_id.set_hi(2); - PUniqueId load_id2; - load_id2.set_lo(2); - load_id2.set_hi(1); - auto streams_for_node1 = pool.get_or_create(load_id, src_id, 5, 2); - auto streams_for_node2 = pool.get_or_create(load_id, src_id, 5, 2); - EXPECT_EQ(1, pool.size()); - auto streams_for_node3 = pool.get_or_create(load_id2, src_id, 8, 1); + auto streams1 = pool.get_or_create(load_id, src_id, 101, 5, 1); + auto streams2 = pool.get_or_create(load_id, src_id, 102, 5, 1); + auto streams3 = pool.get_or_create(load_id, src_id, 101, 5, 1); EXPECT_EQ(2, pool.size()); - EXPECT_EQ(streams_for_node1, streams_for_node2); - EXPECT_NE(streams_for_node1, streams_for_node3); - - EXPECT_EQ(5, streams_for_node1->get_or_create(101)->size()); - EXPECT_EQ(5, streams_for_node2->get_or_create(102)->size()); - EXPECT_EQ(8, streams_for_node3->get_or_create(101)->size()); - - EXPECT_TRUE(streams_for_node3->release()); - EXPECT_EQ(1, pool.size()); - EXPECT_FALSE(streams_for_node1->release()); - EXPECT_EQ(1, pool.size()); - EXPECT_TRUE(streams_for_node2->release()); + EXPECT_EQ(1, pool.templates_size()); + EXPECT_EQ(streams1, streams3); + EXPECT_NE(streams1, streams2); + streams1->release(); + streams2->release(); + streams3->release(); EXPECT_EQ(0, pool.size()); + EXPECT_EQ(0, pool.templates_size()); } } // namespace doris --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org