This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new bd47d5a6816 [branch-2.1](auto-partition) Fix auto partition load failure in multi replica (#36586) bd47d5a6816 is described below commit bd47d5a68164e26a09247baa3d749b5c8865c715 Author: zclllyybb <zhaochan...@selectdb.com> AuthorDate: Thu Jun 20 17:51:18 2024 +0800 [branch-2.1](auto-partition) Fix auto partition load failure in multi replica (#36586) this pr 1. picked #35630, which was reverted #36098 before. 2. picked #36344 from master these two pr fixed existing bug about auto partition load. --------- Co-authored-by: Kaijie Chen <c...@apache.org> --- be/src/exec/tablet_info.cpp | 17 +-- be/src/runtime/load_channel.cpp | 28 ++++- be/src/runtime/load_channel.h | 11 +- be/src/runtime/load_channel_mgr.cpp | 8 -- be/src/runtime/load_stream.cpp | 2 +- be/src/runtime/load_stream.h | 4 + be/src/runtime/tablets_channel.cpp | 55 +++++++-- be/src/runtime/tablets_channel.h | 10 +- be/src/vec/sink/load_stream_map_pool.cpp | 11 +- be/src/vec/sink/load_stream_map_pool.h | 4 +- be/src/vec/sink/load_stream_stub.cpp | 13 +- be/src/vec/sink/load_stream_stub.h | 10 +- be/src/vec/sink/writer/vtablet_writer.cpp | 136 +++++++++++++++------ be/src/vec/sink/writer/vtablet_writer.h | 67 ++++++---- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 60 ++++++--- be/src/vec/sink/writer/vtablet_writer_v2.h | 2 + .../apache/doris/catalog/ListPartitionItem.java | 2 +- .../org/apache/doris/catalog/PartitionKey.java | 7 ++ .../apache/doris/catalog/RangePartitionItem.java | 6 +- .../apache/doris/datasource/InternalCatalog.java | 4 +- .../org/apache/doris/planner/OlapTableSink.java | 111 ++++++++++++++++- .../apache/doris/service/FrontendServiceImpl.java | 14 +-- gensrc/proto/internal_service.proto | 3 + gensrc/thrift/Descriptors.thrift | 1 + .../sql/two_instance_correctness.out | 4 + .../test_auto_range_partition.groovy | 3 +- .../auto_partition/sql/multi_thread_load.groovy | 2 +- .../sql/two_instance_correctness.groovy | 45 +++++++ 28 files changed, 492 insertions(+), 148 deletions(-) diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 62ff0b2fcce..e32e9c9efcf 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -388,18 +388,21 @@ Status VOlapTablePartitionParam::init() { // for both auto/non-auto partition table. _is_in_partition = _part_type == TPartitionType::type::LIST_PARTITIONED; - // initial partitions + // initial partitions. if meet dummy partitions only for open BE nodes, not generate key of them for finding for (const auto& t_part : _t_param.partitions) { VOlapTablePartition* part = nullptr; RETURN_IF_ERROR(generate_partition_from(t_part, part)); _partitions.emplace_back(part); - if (_is_in_partition) { - for (auto& in_key : part->in_keys) { - _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); + + if (!_t_param.partitions_is_fake) { + if (_is_in_partition) { + for (auto& in_key : part->in_keys) { + _partitions_map->emplace(std::tuple {in_key.first, in_key.second, false}, part); + } + } else { + _partitions_map->emplace( + std::tuple {part->end_key.first, part->end_key.second, false}, part); } - } else { - _partitions_map->emplace(std::tuple {part->end_key.first, part->end_key.second, false}, - part); } } diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp index 146575feac9..3d8c8e1dbf3 100644 --- a/be/src/runtime/load_channel.cpp +++ b/be/src/runtime/load_channel.cpp @@ -33,11 +33,11 @@ namespace doris { bvar::Adder<int64_t> g_loadchannel_cnt("loadchannel_cnt"); LoadChannel::LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority, - const std::string& sender_ip, int64_t backend_id, bool enable_profile) + std::string sender_ip, int64_t backend_id, bool enable_profile) : _load_id(load_id), _timeout_s(timeout_s), _is_high_priority(is_high_priority), - _sender_ip(sender_ip), + _sender_ip(std::move(sender_ip)), _backend_id(backend_id), _enable_profile(enable_profile) { std::shared_ptr<QueryContext> query_context = @@ -96,6 +96,10 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) { if (it != _tablets_channels.end()) { channel = it->second; } else { + // just for VLOG + if (_txn_id == 0) [[unlikely]] { + _txn_id = params.txn_id(); + } // create a new tablets channel TabletsChannelKey key(params.id(), index_id); // TODO(plat1ko): CloudTabletsChannel @@ -161,6 +165,7 @@ Status LoadChannel::add_batch(const PTabletWriterAddBlockRequest& request, } // 3. handle eos + // if channel is incremental, maybe hang on close until all close request arrived. if (request.has_eos() && request.eos()) { st = _handle_eos(channel.get(), request, response); _report_profile(response); @@ -182,6 +187,24 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel, auto index_id = request.index_id(); RETURN_IF_ERROR(channel->close(this, request, response, &finished)); + + // for init node, we close waiting(hang on) all close request and let them return together. + if (request.has_hang_wait() && request.hang_wait()) { + DCHECK(!channel->is_incremental_channel()); + VLOG_DEBUG << fmt::format("txn {}: reciever index {} close waiting by sender {}", _txn_id, + request.index_id(), request.sender_id()); + int count = 0; + while (!channel->is_finished()) { + bthread_usleep(1000); + count++; + } + // now maybe finished or cancelled. + VLOG_TRACE << "reciever close wait finished!" << request.sender_id(); + if (count >= 1000 * _timeout_s) { // maybe config::streaming_load_rpc_max_alive_time_sec + return Status::InternalError("Tablets channel didn't wait all close"); + } + } + if (finished) { std::lock_guard<std::mutex> l(_lock); { @@ -191,6 +214,7 @@ Status LoadChannel::_handle_eos(BaseTabletsChannel* channel, std::make_pair(channel->total_received_rows(), channel->num_rows_filtered()))); _tablets_channels.erase(index_id); } + LOG(INFO) << "txn " << _txn_id << " closed tablets_channel " << index_id; _finished_channel_ids.emplace(index_id); } return Status::OK(); diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h index 4a437e51907..791e996574a 100644 --- a/be/src/runtime/load_channel.h +++ b/be/src/runtime/load_channel.h @@ -17,10 +17,8 @@ #pragma once -#include <algorithm> #include <atomic> -#include <functional> -#include <map> +#include <cstdint> #include <memory> #include <mutex> #include <ostream> @@ -28,15 +26,11 @@ #include <unordered_map> #include <unordered_set> #include <utility> -#include <vector> #include "common/status.h" -#include "olap/memtable_memory_limiter.h" -#include "runtime/exec_env.h" #include "runtime/thread_context.h" #include "util/runtime_profile.h" #include "util/spinlock.h" -#include "util/thrift_util.h" #include "util/uid_util.h" namespace doris { @@ -52,7 +46,7 @@ class BaseTabletsChannel; class LoadChannel { public: LoadChannel(const UniqueId& load_id, int64_t timeout_s, bool is_high_priority, - const std::string& sender_ip, int64_t backend_id, bool enable_profile); + std::string sender_ip, int64_t backend_id, bool enable_profile); ~LoadChannel(); // open a new load channel if not exist @@ -91,6 +85,7 @@ protected: private: UniqueId _load_id; + int64_t _txn_id = 0; SpinLock _profile_serialize_lock; std::unique_ptr<RuntimeProfile> _profile; diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp index 4b0cc32f9c9..d236645b1fe 100644 --- a/be/src/runtime/load_channel_mgr.cpp +++ b/be/src/runtime/load_channel_mgr.cpp @@ -24,25 +24,17 @@ // IWYU pragma: no_include <bits/chrono.h> #include <chrono> // IWYU pragma: keep #include <ctime> -#include <functional> -#include <map> #include <memory> #include <ostream> -#include <queue> #include <string> -#include <tuple> #include <vector> #include "common/config.h" #include "common/logging.h" #include "runtime/exec_env.h" #include "runtime/load_channel.h" -#include "runtime/memory/mem_tracker.h" #include "util/doris_metrics.h" -#include "util/mem_info.h" #include "util/metrics.h" -#include "util/perf_counters.h" -#include "util/pretty_printer.h" #include "util/thread.h" namespace doris { diff --git a/be/src/runtime/load_stream.cpp b/be/src/runtime/load_stream.cpp index 87898d95a46..8de15091ec5 100644 --- a/be/src/runtime/load_stream.cpp +++ b/be/src/runtime/load_stream.cpp @@ -364,7 +364,7 @@ LoadStream::~LoadStream() { Status LoadStream::init(const POpenLoadStreamRequest* request) { _txn_id = request->txn_id(); _total_streams = request->total_streams(); - DCHECK(_total_streams > 0) << "total streams should be greator than 0"; + _is_incremental = (_total_streams == 0); _schema = std::make_shared<OlapTableSchemaParam>(); RETURN_IF_ERROR(_schema->init(request->schema())); diff --git a/be/src/runtime/load_stream.h b/be/src/runtime/load_stream.h index c61a2d163de..b2635698379 100644 --- a/be/src/runtime/load_stream.h +++ b/be/src/runtime/load_stream.h @@ -117,6 +117,9 @@ public: void add_source(int64_t src_id) { std::lock_guard lock_guard(_lock); _open_streams[src_id]++; + if (_is_incremental) { + _total_streams++; + } } Status close(int64_t src_id, const std::vector<PTabletID>& tablets_to_commit, @@ -167,6 +170,7 @@ private: RuntimeProfile::Counter* _close_wait_timer = nullptr; LoadStreamMgr* _load_stream_mgr = nullptr; QueryThreadContext _query_thread_context; + bool _is_incremental = false; }; using LoadStreamPtr = std::unique_ptr<LoadStream>; diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp index 526c979968d..adaced0b76e 100644 --- a/be/src/runtime/tablets_channel.cpp +++ b/be/src/runtime/tablets_channel.cpp @@ -21,7 +21,8 @@ #include <fmt/format.h> #include <gen_cpp/internal_service.pb.h> #include <gen_cpp/types.pb.h> -#include <time.h> + +#include <ctime> #include "common/compiler_util.h" // IWYU pragma: keep #include "common/status.h" @@ -132,17 +133,40 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { if (_state == kOpened || _state == kFinished) { return Status::OK(); } - LOG(INFO) << "open tablets channel: " << _key << ", tablets num: " << request.tablets().size() - << ", timeout(s): " << request.load_channel_timeout_s(); + LOG(INFO) << fmt::format("open tablets channel of index {}, tablets num: {} timeout(s): {}", + _index_id, request.tablets().size(), request.load_channel_timeout_s()); _txn_id = request.txn_id(); _index_id = request.index_id(); _schema = std::make_shared<OlapTableSchemaParam>(); RETURN_IF_ERROR(_schema->init(request.schema())); _tuple_desc = _schema->tuple_desc(); - _num_remaining_senders = request.num_senders(); - _next_seqs.resize(_num_remaining_senders, 0); - _closed_senders.Reset(_num_remaining_senders); + int max_sender = request.num_senders(); + /* + * a tablets channel in reciever is related to a bulk of VNodeChannel of sender. each instance one or none. + * there are two possibilities: + * 1. there's partitions originally broadcasted by FE. so all sender(instance) know it at start. and open() will be + * called directly, not by incremental_open(). and after _state changes to kOpened. _open_by_incremental will never + * be true. in this case, _num_remaining_senders will keep same with senders number. when all sender sent close rpc, + * the tablets channel will close. and if for auto partition table, these channel's closing will hang on reciever and + * return together to avoid close-then-incremental-open problem. + * 2. this tablets channel is opened by incremental_open of sender's sink node. so only this sender will know this partition + * (this TabletsChannel) at that time. and we are not sure how many sender will know in the end. it depends on data + * distribution. in this situation open() is called by incremental_open() at first time. so _open_by_incremental is true. + * then _num_remaining_senders will not be set here. but inc every time when incremental_open() called. so it's dynamic + * and also need same number of senders' close to close. but will not hang. + */ + if (_open_by_incremental) { + DCHECK(_num_remaining_senders == 0) << _num_remaining_senders; + } else { + _num_remaining_senders = max_sender; + } + LOG(INFO) << fmt::format( + "txn {}: TabletsChannel of index {} init senders {} with incremental {}", _txn_id, + _index_id, _num_remaining_senders, _open_by_incremental ? "on" : "off"); + // just use max_sender no matter incremental or not cuz we dont know how many senders will open. + _next_seqs.resize(max_sender, 0); + _closed_senders.Reset(max_sender); RETURN_IF_ERROR(_open_all_writers(request)); @@ -152,10 +176,27 @@ Status BaseTabletsChannel::open(const PTabletWriterOpenRequest& request) { Status BaseTabletsChannel::incremental_open(const PTabletWriterOpenRequest& params) { SCOPED_TIMER(_incremental_open_timer); - if (_state == kInitialized) { // haven't opened + + // current node first opened by incremental open + if (_state == kInitialized) { + _open_by_incremental = true; RETURN_IF_ERROR(open(params)); } + std::lock_guard<std::mutex> l(_lock); + + // one sender may incremental_open many times. but only close one time. so dont count duplicately. + if (_open_by_incremental) { + if (params.has_sender_id() && !_recieved_senders.contains(params.sender_id())) { + _recieved_senders.insert(params.sender_id()); + _num_remaining_senders++; + } else if (!params.has_sender_id()) { // for compatible + _num_remaining_senders++; + } + VLOG_DEBUG << fmt::format("txn {}: TabletsChannel {} inc senders to {}", _txn_id, _index_id, + _num_remaining_senders); + } + std::vector<SlotDescriptor*>* index_slots = nullptr; int32_t schema_hash = 0; for (const auto& index : _schema->indexes()) { diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h index 27db9387602..54438be7690 100644 --- a/be/src/runtime/tablets_channel.h +++ b/be/src/runtime/tablets_channel.h @@ -21,8 +21,6 @@ #include <atomic> #include <cstdint> -#include <functional> -#include <map> #include <mutex> #include <ostream> #include <shared_mutex> @@ -113,6 +111,11 @@ public: size_t num_rows_filtered() const { return _num_rows_filtered; } + // means this tablets in this BE is incremental opened partitions. + bool is_incremental_channel() const { return _open_by_incremental; } + + bool is_finished() const { return _state == kFinished; } + protected: Status _get_current_seq(int64_t& cur_seq, const PTabletWriterAddBlockRequest& request); @@ -151,10 +154,11 @@ protected: int64_t _txn_id = -1; int64_t _index_id = -1; std::shared_ptr<OlapTableSchemaParam> _schema; - TupleDescriptor* _tuple_desc = nullptr; + bool _open_by_incremental = false; // next sequence we expect + std::set<int32_t> _recieved_senders; int _num_remaining_senders = 0; std::vector<int64_t> _next_seqs; Bitmap _closed_senders; diff --git a/be/src/vec/sink/load_stream_map_pool.cpp b/be/src/vec/sink/load_stream_map_pool.cpp index fdcfe190dbf..7a3072ade6e 100644 --- a/be/src/vec/sink/load_stream_map_pool.cpp +++ b/be/src/vec/sink/load_stream_map_pool.cpp @@ -35,7 +35,7 @@ LoadStreamMap::LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, DCHECK(num_use > 0) << "use num should be greater than 0"; } -std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) { +std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id, bool incremental) { std::lock_guard<std::mutex> lock(_mutex); std::shared_ptr<Streams> streams = _streams_for_node[dst_id]; if (streams != nullptr) { @@ -44,7 +44,7 @@ std::shared_ptr<Streams> LoadStreamMap::get_or_create(int64_t dst_id) { streams = std::make_shared<Streams>(); for (int i = 0; i < _num_streams; i++) { streams->emplace_back(new LoadStreamStub(_load_id, _src_id, _tablet_schema_for_index, - _enable_unique_mow_for_index)); + _enable_unique_mow_for_index, incremental)); } _streams_for_node[dst_id] = streams; return streams; @@ -101,10 +101,13 @@ bool LoadStreamMap::release() { return false; } -Status LoadStreamMap::close_load() { - return for_each_st([this](int64_t dst_id, const Streams& streams) -> Status { +Status LoadStreamMap::close_load(bool incremental) { + return for_each_st([this, incremental](int64_t dst_id, const Streams& streams) -> Status { const auto& tablets = _tablets_to_commit[dst_id]; for (auto& stream : streams) { + if (stream->is_incremental() != incremental) { + continue; + } RETURN_IF_ERROR(stream->close_load(tablets)); } return Status::OK(); diff --git a/be/src/vec/sink/load_stream_map_pool.h b/be/src/vec/sink/load_stream_map_pool.h index aad12dba2aa..d0f72ab7e00 100644 --- a/be/src/vec/sink/load_stream_map_pool.h +++ b/be/src/vec/sink/load_stream_map_pool.h @@ -78,7 +78,7 @@ public: LoadStreamMap(UniqueId load_id, int64_t src_id, int num_streams, int num_use, LoadStreamMapPool* pool); - std::shared_ptr<Streams> get_or_create(int64_t dst_id); + std::shared_ptr<Streams> get_or_create(int64_t dst_id, bool incremental = false); std::shared_ptr<Streams> at(int64_t dst_id); @@ -95,7 +95,7 @@ public: // send CLOSE_LOAD to all streams, return ERROR if any. // only call this method after release() returns true. - Status close_load(); + Status close_load(bool incremental); private: const UniqueId _load_id; diff --git a/be/src/vec/sink/load_stream_stub.cpp b/be/src/vec/sink/load_stream_stub.cpp index 92670c1c930..caebb381db6 100644 --- a/be/src/vec/sink/load_stream_stub.cpp +++ b/be/src/vec/sink/load_stream_stub.cpp @@ -127,11 +127,12 @@ inline std::ostream& operator<<(std::ostream& ostr, const LoadStreamReplyHandler LoadStreamStub::LoadStreamStub(PUniqueId load_id, int64_t src_id, std::shared_ptr<IndexToTabletSchema> schema_map, - std::shared_ptr<IndexToEnableMoW> mow_map) + std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental) : _load_id(load_id), _src_id(src_id), _tablet_schema_for_index(schema_map), - _enable_unique_mow_for_index(mow_map) {}; + _enable_unique_mow_for_index(mow_map), + _is_incremental(incremental) {}; LoadStreamStub::~LoadStreamStub() { if (_is_init.load() && !_is_closed.load()) { @@ -168,7 +169,13 @@ Status LoadStreamStub::open(BrpcClientCache<PBackendService_Stub>* client_cache, request.set_src_id(_src_id); request.set_txn_id(txn_id); request.set_enable_profile(enable_profile); - request.set_total_streams(total_streams); + if (_is_incremental) { + request.set_total_streams(0); + } else if (total_streams > 0) { + request.set_total_streams(total_streams); + } else { + return Status::InternalError("total_streams should be greator than 0"); + } request.set_idle_timeout_ms(idle_timeout_ms); schema.to_protobuf(request.mutable_schema()); for (auto& tablet : tablets_for_schema) { diff --git a/be/src/vec/sink/load_stream_stub.h b/be/src/vec/sink/load_stream_stub.h index 1f0d2e459d3..1bf0fac4e38 100644 --- a/be/src/vec/sink/load_stream_stub.h +++ b/be/src/vec/sink/load_stream_stub.h @@ -111,12 +111,12 @@ public: // construct new stub LoadStreamStub(PUniqueId load_id, int64_t src_id, std::shared_ptr<IndexToTabletSchema> schema_map, - std::shared_ptr<IndexToEnableMoW> mow_map); + std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false); LoadStreamStub(UniqueId load_id, int64_t src_id, std::shared_ptr<IndexToTabletSchema> schema_map, - std::shared_ptr<IndexToEnableMoW> mow_map) - : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map) {}; + std::shared_ptr<IndexToEnableMoW> mow_map, bool incremental = false) + : LoadStreamStub(load_id.to_proto(), src_id, schema_map, mow_map, incremental) {}; // for mock this class in UT #ifdef BE_TEST @@ -195,6 +195,8 @@ public: int64_t dst_id() const { return _dst_id; } + bool is_incremental() const { return _is_incremental; } + friend std::ostream& operator<<(std::ostream& ostr, const LoadStreamStub& stub); std::string to_string(); @@ -255,6 +257,8 @@ protected: bthread::Mutex _failed_tablets_mutex; std::vector<int64_t> _success_tablets; std::unordered_map<int64_t, Status> _failed_tablets; + + bool _is_incremental = false; }; } // namespace doris diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index 64fb092e736..3aa9c799216 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -35,11 +35,9 @@ #include <sys/param.h> #include <algorithm> -#include <exception> #include <initializer_list> #include <memory> #include <mutex> -#include <ranges> #include <sstream> #include <string> #include <unordered_map> @@ -50,23 +48,18 @@ #include "util/runtime_profile.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" -#include "vec/runtime/vdatetime_value.h" -#include "vec/sink/volap_table_sink.h" #include "vec/sink/vrow_distribution.h" #ifdef DEBUG #include <unordered_set> #endif -#include "bvar/bvar.h" #include "common/compiler_util.h" // IWYU pragma: keep #include "common/logging.h" #include "common/object_pool.h" #include "common/signal_handler.h" #include "common/status.h" #include "exec/tablet_info.h" -#include "runtime/client_cache.h" -#include "runtime/define_primitive_type.h" #include "runtime/descriptors.h" #include "runtime/exec_env.h" #include "runtime/runtime_state.h" @@ -86,13 +79,7 @@ #include "util/uid_util.h" #include "vec/columns/column.h" #include "vec/columns/column_const.h" -#include "vec/columns/column_decimal.h" -#include "vec/columns/column_nullable.h" -#include "vec/columns/column_vector.h" -#include "vec/columns/columns_number.h" -#include "vec/common/assert_cast.h" #include "vec/core/block.h" -#include "vec/core/types.h" #include "vec/data_types/data_type_nullable.h" #include "vec/exprs/vexpr.h" #include "vec/sink/vtablet_block_convertor.h" @@ -110,7 +97,8 @@ bvar::Adder<int64_t> g_sink_write_rows; bvar::PerSecond<bvar::Adder<int64_t>> g_sink_write_rows_per_second("sink_throughput_row", &g_sink_write_rows, 60); -Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets) { +Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets, + bool incremental) { SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get()); for (const auto& tablet : tablets) { // First find the location BEs of this tablet @@ -128,8 +116,15 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart // NodeChannel is not added to the _parent->_pool. // Because the deconstruction of NodeChannel may take a long time to wait rpc finish. // but the ObjectPool will hold a spin lock to delete objects. - channel = std::make_shared<VNodeChannel>(_parent, this, replica_node_id); + channel = + std::make_shared<VNodeChannel>(_parent, this, replica_node_id, incremental); _node_channels.emplace(replica_node_id, channel); + // incremental opened new node. when close we have use two-stage close. + if (incremental) { + _has_inc_node = true; + } + LOG(INFO) << "init new node for instance " << _parent->_sender_id + << ", incremantal:" << incremental; } else { channel = it->second; } @@ -359,22 +354,23 @@ Status VNodeChannel::init(RuntimeState* state) { // add block closure // Has to using value to capture _task_exec_ctx because tablet writer may destroyed during callback. _send_block_callback = WriteBlockCallback<PTabletWriterAddBlockResult>::create_shared(); - _send_block_callback->addFailedHandler([&, task_exec_ctx = _task_exec_ctx](bool is_last_rpc) { - auto ctx_lock = task_exec_ctx.lock(); - if (ctx_lock == nullptr) { - return; - } - _add_block_failed_callback(is_last_rpc); - }); + _send_block_callback->addFailedHandler( + [&, task_exec_ctx = _task_exec_ctx](const WriteBlockCallbackContext& ctx) { + std::shared_ptr<TaskExecutionContext> ctx_lock = task_exec_ctx.lock(); + if (ctx_lock == nullptr) { + return; + } + _add_block_failed_callback(ctx); + }); _send_block_callback->addSuccessHandler( [&, task_exec_ctx = _task_exec_ctx](const PTabletWriterAddBlockResult& result, - bool is_last_rpc) { - auto ctx_lock = task_exec_ctx.lock(); + const WriteBlockCallbackContext& ctx) { + std::shared_ptr<TaskExecutionContext> ctx_lock = task_exec_ctx.lock(); if (ctx_lock == nullptr) { return; } - _add_block_success_callback(result, is_last_rpc); + _add_block_success_callback(result, ctx); }); _name = fmt::format("VNodeChannel[{}-{}]", _index_channel->_index_id, _node_id); @@ -398,6 +394,7 @@ void VNodeChannel::_open_internal(bool is_incremental) { request->set_allocated_id(&_parent->_load_id); request->set_index_id(_index_channel->_index_id); request->set_txn_id(_parent->_txn_id); + request->set_sender_id(_parent->_sender_id); request->set_allocated_schema(_parent->_schema->to_protobuf()); std::set<int64_t> deduper; @@ -432,6 +429,8 @@ void VNodeChannel::_open_internal(bool is_incremental) { if (config::tablet_writer_ignore_eovercrowded) { open_callback->cntl_->ignore_eovercrowded(); } + VLOG_DEBUG << fmt::format("txn {}: open NodeChannel to {}, incremental: {}, senders: {}", + _parent->_txn_id, _node_id, is_incremental, _parent->_num_senders); // the real transmission here. the corresponding BE's load mgr will open load channel for it. _stub->tablet_writer_open(open_closure->cntl_.get(), open_closure->request_.get(), open_closure->response_.get(), open_closure.get()); @@ -677,6 +676,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { } // eos request must be the last request-> it's a signal makeing callback function to set _add_batch_finished true. + // end_mark makes is_last_rpc true when rpc finished and call callbacks. _send_block_callback->end_mark(); _send_finished = true; CHECK(_pending_batches_num == 0) << _pending_batches_num; @@ -726,7 +726,7 @@ void VNodeChannel::try_send_pending_block(RuntimeState* state) { } void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult& result, - bool is_last_rpc) { + const WriteBlockCallbackContext& ctx) { std::lock_guard<std::mutex> l(this->_closed_lock); if (this->_is_closed) { // if the node channel is closed, no need to call the following logic, @@ -744,7 +744,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(st.to_string()); - } else if (is_last_rpc) { + } else if (ctx._is_last_rpc) { for (const auto& tablet : result.tablet_vec()) { TTabletCommitInfo commit_info; commit_info.tabletId = tablet.tablet_id(); @@ -802,7 +802,7 @@ void VNodeChannel::_add_block_success_callback(const PTabletWriterAddBlockResult } } -void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) { +void VNodeChannel::_add_block_failed_callback(const WriteBlockCallbackContext& ctx) { std::lock_guard<std::mutex> l(this->_closed_lock); if (this->_is_closed) { // if the node channel is closed, no need to call `mark_as_failed`, @@ -819,7 +819,7 @@ void VNodeChannel::_add_block_failed_callback(bool is_last_rpc) { Status st = _index_channel->check_intolerable_failure(); if (!st.ok()) { _cancel_with_msg(fmt::format("{}, err: {}", channel_info(), st.to_string())); - } else if (is_last_rpc) { + } else if (ctx._is_last_rpc) { // if this is last rpc, will must set _add_batches_finished. otherwise, node channel's close_wait // will be blocked. _add_batches_finished = true; @@ -892,12 +892,14 @@ Status VNodeChannel::close_wait(RuntimeState* state) { } } - // waiting for finished, it may take a long time, so we couldn't set a timeout + // Waiting for finished until _add_batches_finished changed by rpc's finished callback. + // it may take a long time, so we couldn't set a timeout // For pipeline engine, the close is called in async writer's process block method, // so that it will not block pipeline thread. while (!_add_batches_finished && !_cancelled && !state->is_cancelled()) { bthread_usleep(1000); } + VLOG_CRITICAL << _parent->_sender_id << " close wait finished"; _close_time_ms = UnixMillis() - _close_time_ms; if (_cancelled || state->is_cancelled()) { @@ -925,17 +927,18 @@ void VNodeChannel::_close_check() { CHECK(_cur_mutable_block == nullptr) << name(); } -void VNodeChannel::mark_close() { +void VNodeChannel::mark_close(bool hang_wait) { auto st = none_of({_cancelled, _eos_is_produced}); if (!st.ok()) { return; } _cur_add_block_request->set_eos(true); + _cur_add_block_request->set_hang_wait(hang_wait); { std::lock_guard<std::mutex> l(_pending_batches_lock); if (!_cur_mutable_block) [[unlikely]] { - // add a dummy block + // never had a block arrived. add a dummy block _cur_mutable_block = vectorized::MutableBlock::create_unique(); } auto tmp_add_block_request = @@ -1168,7 +1171,7 @@ Status VTabletWriter::_init(RuntimeState* state, RuntimeProfile* profile) { return Status::InternalError("unknown destination tuple descriptor"); } - if (_vec_output_expr_ctxs.size() > 0 && + if (!_vec_output_expr_ctxs.empty() && _output_tuple_desc->slots().size() != _vec_output_expr_ctxs.size()) { LOG(WARNING) << "output tuple slot num should be equal to num of output exprs, " << "output_tuple_slot_num " << _output_tuple_desc->slots().size() @@ -1279,7 +1282,7 @@ Status VTabletWriter::_incremental_open_node_channel( // update and reinit for existing channels. std::shared_ptr<IndexChannel> channel = _index_id_to_channel[index->index_id]; DCHECK(channel != nullptr); - RETURN_IF_ERROR(channel->init(_state, tablets)); // add tablets into it + RETURN_IF_ERROR(channel->init(_state, tablets, true)); // add tablets into it } fmt::memory_buffer buf; @@ -1374,14 +1377,71 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status _try_close = true; // will stop periodic thread if (status.ok()) { + // BE id -> add_batch method counter + std::unordered_map<int64_t, AddBatchCounter> node_add_batch_counter_map; + // only if status is ok can we call this _profile->total_time_counter(). // if status is not ok, this sink may not be prepared, so that _profile is null SCOPED_TIMER(_profile->total_time_counter()); - { - for (const auto& index_channel : _channels) { + for (const auto& index_channel : _channels) { + // two-step mark close. first we send close_origin to recievers to close all originly exist TabletsChannel. + // when they all closed, we are sure all Writer of instances called _do_try_close. that means no new channel + // will be opened. the refcount of recievers will be monotonically decreasing. then we are safe to close all + // our channels. + if (index_channel->has_incremental_node_channel()) { + if (!status.ok()) { + break; + } + VLOG_TRACE << _sender_id << " first stage close start " << _txn_id; + index_channel->for_init_node_channel( + [&index_channel, &status, this](const std::shared_ptr<VNodeChannel>& ch) { + if (!status.ok() || ch->is_closed()) { + return; + } + VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host() + << "mark close1 for inits " << _txn_id; + ch->mark_close(true); + if (ch->is_cancelled()) { + status = cancel_channel_and_check_intolerable_failure( + status, ch->get_cancel_msg(), index_channel, ch); + } + }); + if (!status.ok()) { + break; + } + index_channel->for_init_node_channel( + [this, &index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) { + if (!status.ok() || ch->is_closed()) { + return; + } + auto s = ch->close_wait(_state); + VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host() + << "close1 wait finished!"; + if (!s.ok()) { + status = cancel_channel_and_check_intolerable_failure( + status, s.to_string(), index_channel, ch); + } + }); if (!status.ok()) { break; } + VLOG_DEBUG << _sender_id << " first stage finished. closeing inc nodes " << _txn_id; + index_channel->for_inc_node_channel( + [&index_channel, &status, this](const std::shared_ptr<VNodeChannel>& ch) { + if (!status.ok() || ch->is_closed()) { + return; + } + // only first try close, all node channels will mark_close() + VLOG_DEBUG << index_channel->_parent->_sender_id << "'s " << ch->host() + << "mark close2 for inc " << _txn_id; + ch->mark_close(); + if (ch->is_cancelled()) { + status = cancel_channel_and_check_intolerable_failure( + status, ch->get_cancel_msg(), index_channel, ch); + } + }); + } else { // not has_incremental_node_channel + VLOG_TRACE << _sender_id << " has no incremental channels " << _txn_id; index_channel->for_each_node_channel( [&index_channel, &status](const std::shared_ptr<VNodeChannel>& ch) { if (!status.ok() || ch->is_closed()) { @@ -1394,8 +1454,8 @@ void VTabletWriter::_do_try_close(RuntimeState* state, const Status& exec_status status, ch->get_cancel_msg(), index_channel, ch); } }); - } // end for index channels - } + } + } // end for index channels } if (!status.ok()) { diff --git a/be/src/vec/sink/writer/vtablet_writer.h b/be/src/vec/sink/writer/vtablet_writer.h index bcc5228457a..603034cea6d 100644 --- a/be/src/vec/sink/writer/vtablet_writer.h +++ b/be/src/vec/sink/writer/vtablet_writer.h @@ -36,13 +36,11 @@ #include <cstddef> #include <cstdint> #include <functional> -#include <initializer_list> #include <map> #include <memory> #include <mutex> #include <ostream> #include <queue> -#include <set> #include <sstream> #include <string> #include <thread> @@ -55,23 +53,17 @@ #include "common/status.h" #include "exec/data_sink.h" #include "exec/tablet_info.h" -#include "gutil/ref_counted.h" -#include "runtime/decimalv2_value.h" #include "runtime/exec_env.h" #include "runtime/memory/mem_tracker.h" #include "runtime/thread_context.h" -#include "runtime/types.h" -#include "util/countdown_latch.h" #include "util/ref_count_closure.h" #include "util/runtime_profile.h" #include "util/spinlock.h" #include "util/stopwatch.hpp" #include "vec/columns/column.h" -#include "vec/common/allocator.h" #include "vec/core/block.h" #include "vec/data_types/data_type.h" #include "vec/exprs/vexpr_fwd.h" -#include "vec/runtime/vfile_format_transformer.h" #include "vec/sink/vrow_distribution.h" #include "vec/sink/vtablet_block_convertor.h" #include "vec/sink/vtablet_finder.h" @@ -114,6 +106,10 @@ struct AddBatchCounter { } }; +struct WriteBlockCallbackContext { + std::atomic<bool> _is_last_rpc {false}; +}; + // It's very error-prone to guarantee the handler capture vars' & this closure's destruct sequence. // So using create() to get the closure pointer is recommended. We can delete the closure ptr before the capture vars destruction. // Delete this point is safe, don't worry about RPC callback will run after WriteBlockCallback deleted. @@ -127,8 +123,13 @@ public: WriteBlockCallback() : cid(INVALID_BTHREAD_ID) {} ~WriteBlockCallback() override = default; - void addFailedHandler(const std::function<void(bool)>& fn) { failed_handler = fn; } - void addSuccessHandler(const std::function<void(const T&, bool)>& fn) { success_handler = fn; } + void addFailedHandler(const std::function<void(const WriteBlockCallbackContext&)>& fn) { + failed_handler = fn; + } + void addSuccessHandler( + const std::function<void(const T&, const WriteBlockCallbackContext&)>& fn) { + success_handler = fn; + } void join() override { // We rely on in_flight to assure one rpc is running, @@ -165,8 +166,8 @@ public: bool is_packet_in_flight() { return _packet_in_flight; } void end_mark() { - DCHECK(_is_last_rpc == false); - _is_last_rpc = true; + DCHECK(_ctx._is_last_rpc == false); + _ctx._is_last_rpc = true; } void call() override { @@ -175,9 +176,9 @@ public: LOG(WARNING) << "failed to send brpc batch, error=" << berror(::doris::DummyBrpcCallback<T>::cntl_->ErrorCode()) << ", error_text=" << ::doris::DummyBrpcCallback<T>::cntl_->ErrorText(); - failed_handler(_is_last_rpc); + failed_handler(_ctx); } else { - success_handler(*(::doris::DummyBrpcCallback<T>::response_), _is_last_rpc); + success_handler(*(::doris::DummyBrpcCallback<T>::response_), _ctx); } clear_in_flight(); } @@ -185,9 +186,9 @@ public: private: brpc::CallId cid; std::atomic<bool> _packet_in_flight {false}; - std::atomic<bool> _is_last_rpc {false}; - std::function<void(bool)> failed_handler; - std::function<void(const T&, bool)> success_handler; + WriteBlockCallbackContext _ctx; + std::function<void(const WriteBlockCallbackContext&)> failed_handler; + std::function<void(const T&, const WriteBlockCallbackContext&)> success_handler; }; class IndexChannel; @@ -258,7 +259,8 @@ public: // two ways to stop channel: // 1. mark_close()->close_wait() PS. close_wait() will block waiting for the last AddBatch rpc response. // 2. just cancel() - void mark_close(); + // hang_wait = true will make reciever hang until all sender mark_closed. + void mark_close(bool hang_wait = false); bool is_closed() const { return _is_closed; } bool is_cancelled() const { return _cancelled; } @@ -320,8 +322,9 @@ protected: void _close_check(); void _cancel_with_msg(const std::string& msg); - void _add_block_success_callback(const PTabletWriterAddBlockResult& result, bool is_last_rpc); - void _add_block_failed_callback(bool is_last_rpc); + void _add_block_success_callback(const PTabletWriterAddBlockResult& result, + const WriteBlockCallbackContext& ctx); + void _add_block_failed_callback(const WriteBlockCallbackContext& ctx); VTabletWriter* _parent = nullptr; IndexChannel* _index_channel = nullptr; @@ -425,7 +428,8 @@ public: ~IndexChannel() = default; // allow to init multi times, for incremental open more tablets for one index(table) - Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets); + Status init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets, + bool incremental = false); void for_each_node_channel( const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) { @@ -434,6 +438,26 @@ public: } } + void for_init_node_channel( + const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) { + for (auto& it : _node_channels) { + if (!it.second->is_incremental()) { + func(it.second); + } + } + } + + void for_inc_node_channel( + const std::function<void(const std::shared_ptr<VNodeChannel>&)>& func) { + for (auto& it : _node_channels) { + if (it.second->is_incremental()) { + func(it.second); + } + } + } + + bool has_incremental_node_channel() const { return _has_inc_node; } + void mark_as_failed(const VNodeChannel* node_channel, const std::string& err, int64_t tablet_id = -1); Status check_intolerable_failure(); @@ -492,6 +516,7 @@ private: std::unordered_map<int64_t, std::shared_ptr<VNodeChannel>> _node_channels; // from tablet_id to backend channel std::unordered_map<int64_t, std::vector<std::shared_ptr<VNodeChannel>>> _channels_by_tablet; + bool _has_inc_node = false; // lock to protect _failed_channels and _failed_channels_msgs mutable doris::SpinLock _fail_lock; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index c1b43722c33..1b14a57d154 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -111,7 +111,7 @@ Status VTabletWriterV2::_incremental_open_streams( } } for (int64_t dst_id : new_backends) { - auto streams = _load_stream_map->get_or_create(dst_id); + auto streams = _load_stream_map->get_or_create(dst_id, true); RETURN_IF_ERROR(_open_streams_to_backend(dst_id, *streams)); } return Status::OK(); @@ -310,6 +310,11 @@ Status VTabletWriterV2::_build_tablet_node_mapping() { tablet.set_index_id(index.index_id); tablet.set_tablet_id(tablet_id); _tablets_for_node[node].emplace(tablet_id, tablet); + constexpr int64_t DUMMY_TABLET_ID = 0; + if (tablet_id == DUMMY_TABLET_ID) [[unlikely]] { + // ignore fake tablet for auto partition + continue; + } if (known_indexes.contains(index.index_id)) [[likely]] { continue; } @@ -548,32 +553,26 @@ Status VTabletWriterV2::close(Status exec_status) { LOG(INFO) << "sink " << _sender_id << " released streams, is_last=" << is_last_sink << ", load_id=" << print_id(_load_id); - // send CLOSE_LOAD on all streams if this is the last sink + // send CLOSE_LOAD on all non-incremental streams if this is the last sink if (is_last_sink) { - RETURN_IF_ERROR(_load_stream_map->close_load()); + RETURN_IF_ERROR(_load_stream_map->close_load(false)); } - // close_wait on all streams, even if this is not the last sink. + // close_wait on all non-incremental streams, even if this is not the last sink. // because some per-instance data structures are now shared among all sinks // due to sharing delta writers and load stream stubs. - { - SCOPED_TIMER(_close_load_timer); - RETURN_IF_ERROR(_load_stream_map->for_each_st([this](int64_t dst_id, - const Streams& streams) -> Status { - for (auto& stream : streams) { - int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - - _timeout_watch.elapsed_time() / 1000 / 1000; - if (remain_ms <= 0) { - LOG(WARNING) << "load timed out before close waiting, load_id=" - << print_id(_load_id); - return Status::TimedOut("load timed out before close waiting"); - } - RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); - } - return Status::OK(); - })); + RETURN_IF_ERROR(_close_wait(false)); + + // send CLOSE_LOAD on all incremental streams if this is the last sink. + // this must happen after all non-incremental streams are closed, + // so we can ensure all sinks are in close phase before closing incremental streams. + if (is_last_sink) { + RETURN_IF_ERROR(_load_stream_map->close_load(true)); } + // close_wait on all incremental streams, even if this is not the last sink. + RETURN_IF_ERROR(_close_wait(true)); + // calculate and submit commit info if (is_last_sink) { DBUG_EXECUTE_IF("VTabletWriterV2.close.add_failed_tablet", { @@ -624,6 +623,27 @@ Status VTabletWriterV2::close(Status exec_status) { return status; } +Status VTabletWriterV2::_close_wait(bool incremental) { + SCOPED_TIMER(_close_load_timer); + return _load_stream_map->for_each_st( + [this, incremental](int64_t dst_id, const Streams& streams) -> Status { + for (auto& stream : streams) { + if (stream->is_incremental() != incremental) { + continue; + } + int64_t remain_ms = static_cast<int64_t>(_state->execution_timeout()) * 1000 - + _timeout_watch.elapsed_time() / 1000 / 1000; + if (remain_ms <= 0) { + LOG(WARNING) << "load timed out before close waiting, load_id=" + << print_id(_load_id); + return Status::TimedOut("load timed out before close waiting"); + } + RETURN_IF_ERROR(stream->close_wait(_state, remain_ms)); + } + return Status::OK(); + }); +} + void VTabletWriterV2::_calc_tablets_to_commit() { LOG(INFO) << "saving close load info, load_id=" << print_id(_load_id) << ", txn_id=" << _txn_id << ", sink_id=" << _sender_id; diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.h b/be/src/vec/sink/writer/vtablet_writer_v2.h index e3d31fb32b9..5a9890cdb49 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.h +++ b/be/src/vec/sink/writer/vtablet_writer_v2.h @@ -147,6 +147,8 @@ private: void _calc_tablets_to_commit(); + Status _close_wait(bool incremental); + Status _cancel(Status status); std::shared_ptr<MemTracker> _mem_tracker; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java index 1a4d188a0ca..dafdcdc49f5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/ListPartitionItem.java @@ -35,7 +35,7 @@ import java.util.Set; import java.util.stream.Collectors; public class ListPartitionItem extends PartitionItem { - public static ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList()); + public static final ListPartitionItem DUMMY_ITEM = new ListPartitionItem(Lists.newArrayList()); private final List<PartitionKey> partitionKeys; private boolean isDefaultPartition = false; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java index b227afdc142..ff5fa91ee6c 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/PartitionKey.java @@ -87,6 +87,13 @@ public class PartitionKey implements Comparable<PartitionKey>, Writable { return partitionKey; } + public static PartitionKey createMaxPartitionKey() { + PartitionKey partitionKey = new PartitionKey(); + partitionKey.keys.add(MaxLiteral.MAX_VALUE); + // type not set + return partitionKey; + } + public static PartitionKey createPartitionKey(List<PartitionValue> keys, List<Column> columns) throws AnalysisException { PartitionKey partitionKey = new PartitionKey(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java index 56214aaa0ea..bb7ddabbaa4 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RangePartitionItem.java @@ -30,10 +30,12 @@ import java.util.Optional; public class RangePartitionItem extends PartitionItem { private Range<PartitionKey> partitionKeyRange; - public static final Range<PartitionKey> DUMMY_ITEM; + public static final Range<PartitionKey> DUMMY_RANGE; + public static final RangePartitionItem DUMMY_ITEM; static { - DUMMY_ITEM = Range.closed(new PartitionKey(), new PartitionKey()); + DUMMY_RANGE = Range.closed(new PartitionKey(), new PartitionKey()); + DUMMY_ITEM = new RangePartitionItem(Range.closed(new PartitionKey(), PartitionKey.createMaxPartitionKey())); } public RangePartitionItem(Range<PartitionKey> range) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java index f52bc11829f..df6b02b8324 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/InternalCatalog.java @@ -1698,12 +1698,12 @@ public class InternalCatalog implements CatalogIf<Database> { isTempPartition, partitionInfo.getIsMutable(partitionId)); } else if (partitionInfo.getType() == PartitionType.LIST) { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_ITEM, partitionInfo.getItem(partitionId), dataProperty, + RangePartitionItem.DUMMY_RANGE, partitionInfo.getItem(partitionId), dataProperty, partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), isTempPartition, partitionInfo.getIsMutable(partitionId)); } else { info = new PartitionPersistInfo(db.getId(), olapTable.getId(), partition, - RangePartitionItem.DUMMY_ITEM, ListPartitionItem.DUMMY_ITEM, dataProperty, + RangePartitionItem.DUMMY_RANGE, ListPartitionItem.DUMMY_ITEM, dataProperty, partitionInfo.getReplicaAllocation(partitionId), partitionInfo.getIsInMemory(partitionId), isTempPartition, partitionInfo.getIsMutable(partitionId)); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java index ada7c6b770b..e3195eec135 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapTableSink.java @@ -339,18 +339,84 @@ public class OlapTableSink extends DataSink { return distColumns; } + private PartitionItem createDummyPartitionItem(PartitionType partType) throws UserException { + if (partType == PartitionType.LIST) { + return ListPartitionItem.DUMMY_ITEM; + } else if (partType == PartitionType.RANGE) { + return RangePartitionItem.DUMMY_ITEM; + } else { + throw new UserException("unsupported partition for OlapTable, partition=" + partType); + } + } + + private TOlapTablePartitionParam createDummyPartition(long dbId, OlapTable table, Analyzer analyzer, + TOlapTablePartitionParam partitionParam, PartitionInfo partitionInfo, PartitionType partType) + throws UserException { + partitionParam.setEnableAutomaticPartition(true); + // these partitions only use in locations. not find partition. + partitionParam.setPartitionsIsFake(true); + + // set columns + for (Column partCol : partitionInfo.getPartitionColumns()) { + partitionParam.addToPartitionColumns(partCol.getName()); + } + + int partColNum = partitionInfo.getPartitionColumns().size(); + + TOlapTablePartition fakePartition = new TOlapTablePartition(); + fakePartition.setId(0); + // set partition keys + setPartitionKeys(fakePartition, createDummyPartitionItem(partType), partColNum); + + for (Long indexId : table.getIndexIdToMeta().keySet()) { + fakePartition.addToIndexes(new TOlapTableIndexTablets(indexId, Arrays.asList(0L))); + fakePartition.setNumBuckets(1); + } + fakePartition.setIsMutable(true); + + DistributionInfo distInfo = table.getDefaultDistributionInfo(); + partitionParam.setDistributedColumns(getDistColumns(distInfo)); + partitionParam.addToPartitions(fakePartition); + + ArrayList<Expr> exprSource = partitionInfo.getPartitionExprs(); + if (exprSource != null && analyzer != null) { + Analyzer funcAnalyzer = new Analyzer(analyzer.getEnv(), analyzer.getContext()); + tupleDescriptor.setTable(table); + funcAnalyzer.registerTupleDescriptor(tupleDescriptor); + // we must clone the exprs. otherwise analyze will influence the origin exprs. + ArrayList<Expr> exprs = new ArrayList<Expr>(); + for (Expr e : exprSource) { + exprs.add(e.clone()); + } + for (Expr e : exprs) { + e.reset(); + e.analyze(funcAnalyzer); + } + partitionParam.setPartitionFunctionExprs(Expr.treesToThrift(exprs)); + } + + return partitionParam; + } + public TOlapTablePartitionParam createPartition(long dbId, OlapTable table, Analyzer analyzer) throws UserException { TOlapTablePartitionParam partitionParam = new TOlapTablePartitionParam(); + PartitionInfo partitionInfo = table.getPartitionInfo(); + boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition(); + PartitionType partType = table.getPartitionInfo().getType(); partitionParam.setDbId(dbId); partitionParam.setTableId(table.getId()); partitionParam.setVersion(0); + partitionParam.setPartitionType(partType.toThrift()); + + // create shadow partition for empty auto partition table. only use in this load. + if (enableAutomaticPartition && partitionIds.isEmpty()) { + return createDummyPartition(dbId, table, analyzer, partitionParam, partitionInfo, partType); + } - PartitionType partType = table.getPartitionInfo().getType(); switch (partType) { case LIST: case RANGE: { - PartitionInfo partitionInfo = table.getPartitionInfo(); for (Column partCol : partitionInfo.getPartitionColumns()) { partitionParam.addToPartitionColumns(partCol.getName()); } @@ -395,7 +461,6 @@ public class OlapTableSink extends DataSink { } } } - boolean enableAutomaticPartition = partitionInfo.enableAutomaticPartition(); // for auto create partition by function expr, there is no any partition firstly, // But this is required in thrift struct. if (enableAutomaticPartition && partitionIds.isEmpty()) { @@ -464,7 +529,6 @@ public class OlapTableSink extends DataSink { throw new UserException("unsupported partition for OlapTable, partition=" + partType); } } - partitionParam.setPartitionType(partType.toThrift()); return partitionParam; } @@ -505,7 +569,46 @@ public class OlapTableSink extends DataSink { } } + public List<TOlapTableLocationParam> createDummyLocation(OlapTable table) throws UserException { + TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); + TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); + + final long fakeTabletId = 0; + SystemInfoService clusterInfo = Env.getCurrentSystemInfo(); + List<Long> aliveBe = clusterInfo.getAllBackendIds(true); + if (aliveBe.isEmpty()) { + throw new UserException(InternalErrorCode.REPLICA_FEW_ERR, "no available BE in cluster"); + } + for (int i = 0; i < table.getIndexNumber(); i++) { + // only one fake tablet here + if (singleReplicaLoad) { + Long[] nodes = aliveBe.toArray(new Long[0]); + List<Long> slaveBe = aliveBe; + + Random random = new SecureRandom(); + int masterNode = random.nextInt(nodes.length); + locationParam.addToTablets(new TTabletLocation(fakeTabletId, + Arrays.asList(nodes[masterNode]))); + + slaveBe.remove(masterNode); + slaveLocationParam.addToTablets(new TTabletLocation(fakeTabletId, + slaveBe)); + } else { + locationParam.addToTablets(new TTabletLocation(fakeTabletId, + Arrays.asList(aliveBe.get(0)))); // just one fake location is enough + + LOG.info("created dummy location tablet_id={}, be_id={}", fakeTabletId, aliveBe.get(0)); + } + } + + return Arrays.asList(locationParam, slaveLocationParam); + } + public List<TOlapTableLocationParam> createLocation(OlapTable table) throws UserException { + if (table.getPartitionInfo().enableAutomaticPartition() && partitionIds.isEmpty()) { + return createDummyLocation(table); + } + TOlapTableLocationParam locationParam = new TOlapTableLocationParam(); TOlapTableLocationParam slaveLocationParam = new TOlapTableLocationParam(); // BE id -> path hash diff --git a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java index b0abcc67141..af86660be21 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/org/apache/doris/service/FrontendServiceImpl.java @@ -3511,7 +3511,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (!Env.getCurrentEnv().isMaster()) { errorStatus.setStatusCode(TStatusCode.NOT_MASTER); errorStatus.addToErrorMsgs(NOT_MASTER_ERR_MSG); - LOG.warn("failed to createPartition: {}", NOT_MASTER_ERR_MSG); + LOG.warn("failed to replace Partition: {}", NOT_MASTER_ERR_MSG); return result; } @@ -3546,10 +3546,8 @@ public class FrontendServiceImpl implements FrontendService.Iface { List<String> allReqPartNames; // all request partitions try { taskLock.lock(); - // we dont lock the table. other thread in this txn will be controled by - // taskLock. - // if we have already replaced. dont do it again, but acquire the recorded new - // partition directly. + // we dont lock the table. other thread in this txn will be controled by taskLock. + // if we have already replaced. dont do it again, but acquire the recorded new partition directly. // if not by this txn, just let it fail naturally is ok. List<Long> replacedPartIds = overwriteManager.tryReplacePartitionIds(taskGroupId, partitionIds); // here if replacedPartIds still have null. this will throw exception. @@ -3559,8 +3557,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { .filter(i -> partitionIds.get(i) == replacedPartIds.get(i)) // equal means not replaced .mapToObj(partitionIds::get) .collect(Collectors.toList()); - // from here we ONLY deal the pending partitions. not include the dealed(by - // others). + // from here we ONLY deal the pending partitions. not include the dealed(by others). if (!pendingPartitionIds.isEmpty()) { // below two must have same order inner. List<String> pendingPartitionNames = olapTable.uncheckedGetPartNamesById(pendingPartitionIds); @@ -3571,8 +3568,7 @@ public class FrontendServiceImpl implements FrontendService.Iface { overwriteManager.registerTaskInGroup(taskGroupId, taskId); InsertOverwriteUtil.addTempPartitions(olapTable, pendingPartitionNames, tempPartitionNames); InsertOverwriteUtil.replacePartition(olapTable, pendingPartitionNames, tempPartitionNames); - // now temp partitions are bumped up and use new names. we get their ids and - // record them. + // now temp partitions are bumped up and use new names. we get their ids and record them. List<Long> newPartitionIds = new ArrayList<Long>(); for (String newPartName : pendingPartitionNames) { newPartitionIds.add(olapTable.getPartition(newPartName).getId()); diff --git a/gensrc/proto/internal_service.proto b/gensrc/proto/internal_service.proto index 12b1b6b1eda..0a975b81991 100644 --- a/gensrc/proto/internal_service.proto +++ b/gensrc/proto/internal_service.proto @@ -98,6 +98,7 @@ message PTabletWriterOpenRequest { optional bool is_incremental = 15 [default = false]; optional int64 txn_expiration = 16; // Absolute time optional bool write_file_cache = 17; + optional int32 sender_id = 19; }; message PTabletWriterOpenResult { @@ -152,6 +153,8 @@ message PTabletWriterAddBlockRequest { optional bool write_single_replica = 12 [default = false]; map<int64, PSlaveTabletNodes> slave_tablet_nodes = 13; optional bool is_single_tablet_block = 14 [default = false]; + // for auto-partition first stage close, we should hang. + optional bool hang_wait = 15 [default = false]; }; message PSlaveTabletNodes { diff --git a/gensrc/thrift/Descriptors.thrift b/gensrc/thrift/Descriptors.thrift index ef7a8451684..5da7b4df7de 100644 --- a/gensrc/thrift/Descriptors.thrift +++ b/gensrc/thrift/Descriptors.thrift @@ -211,6 +211,7 @@ struct TOlapTablePartitionParam { // insert overwrite partition(*) 11: optional bool enable_auto_detect_overwrite 12: optional i64 overwrite_group_id + 13: optional bool partitions_is_fake = false } struct TOlapTableIndex { diff --git a/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out new file mode 100644 index 00000000000..4ee136aef2b --- /dev/null +++ b/regression-test/data/partition_p1/auto_partition/sql/two_instance_correctness.out @@ -0,0 +1,4 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql -- +2 + diff --git a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy index 508f086f865..f52dc2945f0 100644 --- a/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy +++ b/regression-test/suites/partition_p0/auto_partition/test_auto_range_partition.groovy @@ -140,8 +140,7 @@ suite("test_auto_range_partition") { logger.info("${result2}") assertEquals(result2.size(), 2) - // partition expr extraction - + // insert into select have multi sender in load sql " drop table if exists isit " sql " drop table if exists isit_src " sql """ diff --git a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy index 4f9b7a365b4..8d43d90ff15 100644 --- a/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy +++ b/regression-test/suites/partition_p1/auto_partition/sql/multi_thread_load.groovy @@ -19,7 +19,7 @@ import groovy.io.FileType import java.nio.file.Files import java.nio.file.Paths -suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use resource fully +suite("multi_thread_load", "p1,nonConcurrent") { // stress case should use resource fully``` // get doris-db from s3 def dirPath = context.file.parent def fatherPath = context.file.parentFile.parentFile.getPath() diff --git a/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy new file mode 100644 index 00000000000..c9f2f04aab3 --- /dev/null +++ b/regression-test/suites/partition_p1/auto_partition/sql/two_instance_correctness.groovy @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("two_instance_correctness") { + + // finish time of instances have diff + sql "DROP TABLE IF EXISTS two_bkt;" + sql """ + create table two_bkt( + k0 date not null + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 2 + properties("replication_num" = "1"); + """ + + sql """ insert into two_bkt values ("2012-12-11"); """ + sql """ insert into two_bkt select "2020-12-12" from numbers("number" = "20000"); """ + + sql " DROP TABLE IF EXISTS two_bkt_dest; " + sql """ + create table two_bkt_dest( + k0 date not null + ) + AUTO PARTITION BY RANGE (date_trunc(k0, 'day')) () + DISTRIBUTED BY HASH(`k0`) BUCKETS 10 + properties("replication_num" = "1"); + """ + sql " insert into two_bkt_dest select * from two_bkt; " + + qt_sql " select count(distinct k0) from two_bkt_dest; " +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org