This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch compaction_opt in repository https://gitbox.apache.org/repos/asf/doris.git
commit 468b08a8542aed718f90b0cccab46977801a074a Author: yixiutt <102007456+yixi...@users.noreply.github.com> AuthorDate: Wed Nov 9 09:10:52 2022 +0800 (compaction) support ordered data compaction (#14054) --- be/src/olap/compaction.cpp | 168 +++++++++++++++++++---- be/src/olap/compaction.h | 10 +- be/src/olap/olap_server.cpp | 7 - be/src/olap/rowset/beta_rowset.cpp | 18 ++- be/src/olap/rowset/beta_rowset.h | 5 +- be/src/olap/rowset/beta_rowset_writer.cpp | 55 +++++++- be/src/olap/rowset/beta_rowset_writer.h | 6 + be/src/olap/rowset/rowset.h | 14 +- be/src/olap/rowset/rowset_meta.h | 9 ++ be/src/olap/rowset/rowset_writer.h | 3 + be/src/olap/rowset/segment_v2/segment_writer.cpp | 91 ++++++++---- be/src/olap/rowset/segment_v2/segment_writer.h | 11 ++ be/test/io/cache/remote_file_cache_test.cpp | 4 +- be/test/olap/rowid_conversion_test.cpp | 2 +- be/test/olap/rowset/segment_v2/segment_test.cpp | 37 ++--- be/test/testutil/mock_rowset.h | 3 +- build-support/clang-format.sh | 3 +- regression-test/conf/regression-conf.groovy | 2 +- 18 files changed, 351 insertions(+), 97 deletions(-) diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index c2355c56d7..0283e1e76c 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -19,6 +19,7 @@ #include "common/status.h" #include "gutil/strings/substitute.h" +#include "olap/rowset/beta_rowset.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_meta.h" #include "olap/tablet.h" @@ -34,6 +35,8 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label) : _tablet(tablet), _input_rowsets_size(0), _input_row_num(0), + _input_num_segments(0), + _input_index_size(0), _state(CompactionState::INITED) { _mem_tracker = std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::COMPACTION, label); } @@ -99,20 +102,76 @@ int64_t Compaction::get_avg_segment_rows() { // todo(yixiu): add a new conf of segment size in compaction return config::write_buffer_size / (_input_rowsets_size / (_input_row_num + 1) + 1); } +bool Compaction::is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs) { + size_t min_tidy_size = 10 * 1024 * 1024; + if (rhs->num_segments() == 0) { + return true; + } + if (rhs->is_segments_overlapping()) { + return false; + } + // check segment size + auto beta_rowset = reinterpret_cast<BetaRowset*>(rhs.get()); + std::vector<size_t> segments_size; + beta_rowset->get_segments_size(&segments_size); + for (auto segment_size : segments_size) { + // is segment is too small, need to do compaction + if (segment_size < min_tidy_size) { + return false; + } + } -Status Compaction::do_compaction_impl(int64_t permits) { - OlapStopWatch watch; + auto min_key = rhs->min_key(); + if (min_key < pre_max_key) { + return false; + } + pre_max_key = rhs->max_key(); + return true; +} + +Status Compaction::do_compact_ordered_rowsets() { + LOG(INFO) << "start to do ordered data compaction, tablet=" << _tablet->full_name() + << ", output_version=" << _output_version; + build_basic_info(); + RETURN_NOT_OK(construct_output_rowset_writer()); + // link data to new rowset + auto seg_id = 0; + std::vector<KeyBoundsPB> segment_key_bounds; + for (auto rowset : _input_rowsets) { + RETURN_NOT_OK(rowset->link_files_to(_tablet->tablet_path(), _output_rs_writer->rowset_id(), + seg_id)); + seg_id += rowset->num_segments(); - // 1. prepare input and output parameters - int64_t segments_num = 0; + std::vector<KeyBoundsPB> key_bounds; + rowset->get_segments_key_bounds(&key_bounds); + segment_key_bounds.insert(segment_key_bounds.end(), key_bounds.begin(), key_bounds.end()); + } + // build output rowset + RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>(); + rowset_meta->set_num_rows(_input_row_num); + rowset_meta->set_total_disk_size(_input_rowsets_size); + rowset_meta->set_data_disk_size(_input_rowsets_size); + rowset_meta->set_index_disk_size(_input_index_size); + rowset_meta->set_empty(_input_row_num == 0); + rowset_meta->set_num_segments(_input_num_segments); + rowset_meta->set_segments_overlap(NONOVERLAPPING); + rowset_meta->set_rowset_state(VISIBLE); + + rowset_meta->set_segments_key_bounds(segment_key_bounds); + _output_rowset = _output_rs_writer->manual_build(rowset_meta); + return Status::OK(); +} + +void Compaction::build_basic_info() { for (auto& rowset : _input_rowsets) { _input_rowsets_size += rowset->data_disk_size(); + _input_index_size += rowset->index_disk_size(); _input_row_num += rowset->num_rows(); - segments_num += rowset->num_segments(); + _input_num_segments += rowset->num_segments(); } TRACE_COUNTER_INCREMENT("input_rowsets_data_size", _input_rowsets_size); TRACE_COUNTER_INCREMENT("input_row_num", _input_row_num); - TRACE_COUNTER_INCREMENT("input_segments_num", segments_num); + TRACE_COUNTER_INCREMENT("input_segments_num", _input_num_segments); _output_version = Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version()); @@ -120,22 +179,81 @@ Status Compaction::do_compaction_impl(int64_t permits) { _oldest_write_timestamp = _input_rowsets.front()->oldest_write_timestamp(); _newest_write_timestamp = _input_rowsets.back()->newest_write_timestamp(); - auto use_vectorized_compaction = config::enable_vectorized_compaction; - string merge_type = use_vectorized_compaction ? "v" : ""; - bool vertical_compaction = should_vertical_compaction(); - - LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name() - << ", output_version=" << _output_version << ", permits: " << permits - << ", is_vertical_compaction=" << vertical_compaction; - // get cur schema if rowset schema exist, rowset schema must be newer than tablet schema std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size()); std::transform(_input_rowsets.begin(), _input_rowsets.end(), rowset_metas.begin(), [](const RowsetSharedPtr& rowset) { return rowset->rowset_meta(); }); - TabletSchemaSPtr cur_tablet_schema = + _cur_tablet_schema = _tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema(); +} + +bool Compaction::handle_ordered_data_compaction() { + // check delete version: if compaction type is base compaction and + // has a delete version, use original compaction + if (compaction_type() == ReaderType::READER_BASE_COMPACTION) { + for (auto rowset : _input_rowsets) { + if (_tablet->version_for_delete_predicate(rowset->version())) { + return false; + } + } + } - RETURN_NOT_OK(construct_output_rowset_writer(cur_tablet_schema, vertical_compaction)); + // check if rowsets are tidy so we can just modify meta and do link + // files to handle compaction + auto input_size = _input_rowsets.size(); + std::string pre_max_key; + for (auto i = 0; i < input_size; ++i) { + if (!is_rowset_tidy(pre_max_key, _input_rowsets[i])) { + if (i <= input_size / 2) { + return false; + } else { + _input_rowsets.resize(i); + break; + } + } + } + // most rowset of current compaction is nonoverlapping + // just handle nonoverlappint rowsets + auto st = do_compact_ordered_rowsets(); + if (!st.ok()) { + return false; + } + return true; +} + +Status Compaction::do_compaction_impl(int64_t permits) { + OlapStopWatch watch; + + auto use_vectorized_compaction = config::enable_vectorized_compaction; + string merge_type = use_vectorized_compaction ? "v" : ""; + + if (handle_ordered_data_compaction()) { + RETURN_NOT_OK(modify_rowsets()); + TRACE("modify rowsets finished"); + + int64_t now = UnixMillis(); + if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) { + _tablet->set_last_cumu_compaction_success_time(now); + } else { + _tablet->set_last_base_compaction_success_time(now); + } + auto cumu_policy = _tablet->cumulative_compaction_policy(); + LOG(INFO) << "succeed to do ordered data " << merge_type << compaction_name() + << ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version + << ", disk=" << _tablet->data_dir()->path() + << ", segments=" << _input_num_segments << ", input_row_num=" << _input_row_num + << ", output_row_num=" << _output_rowset->num_rows() + << ". elapsed time=" << watch.get_elapse_second() + << "s. cumulative_compaction_policy=" + << (cumu_policy == nullptr ? "quick" : cumu_policy->name()); + return Status::OK(); + } + build_basic_info(); + + LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name() + << ", output_version=" << _output_version << ", permits: " << permits; + bool vertical_compaction = should_vertical_compaction(); RETURN_NOT_OK(construct_input_rowset_readers()); + RETURN_NOT_OK(construct_output_rowset_writer(vertical_compaction)); TRACE("prepare finished"); // 2. write merged rows to output rowset @@ -149,15 +267,15 @@ Status Compaction::do_compaction_impl(int64_t permits) { if (use_vectorized_compaction) { if (vertical_compaction) { - res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), cur_tablet_schema, + res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, _input_rs_readers, _output_rs_writer.get(), get_avg_segment_rows(), &stats); } else { - res = Merger::vmerge_rowsets(_tablet, compaction_type(), cur_tablet_schema, + res = Merger::vmerge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, _input_rs_readers, _output_rs_writer.get(), &stats); } } else { - res = Merger::merge_rowsets(_tablet, compaction_type(), cur_tablet_schema, + res = Merger::merge_rowsets(_tablet, compaction_type(), _cur_tablet_schema, _input_rs_readers, _output_rs_writer.get(), &stats); } @@ -215,7 +333,7 @@ Status Compaction::do_compaction_impl(int64_t permits) { LOG(INFO) << "succeed to do " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name() << ", output_version=" << _output_version << ", current_max_version=" << current_max_version - << ", disk=" << _tablet->data_dir()->path() << ", segments=" << segments_num + << ", disk=" << _tablet->data_dir()->path() << ", segments=" << _input_num_segments << ", input_row_num=" << _input_row_num << ", output_row_num=" << _output_rowset->num_rows() << ". elapsed time=" << watch.get_elapse_second() @@ -225,15 +343,15 @@ Status Compaction::do_compaction_impl(int64_t permits) { return Status::OK(); } -Status Compaction::construct_output_rowset_writer(TabletSchemaSPtr schema, bool is_vertical) { +Status Compaction::construct_output_rowset_writer(bool is_vertical) { if (is_vertical) { return _tablet->create_vertical_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, - schema, _oldest_write_timestamp, + _cur_tablet_schema, _oldest_write_timestamp, _newest_write_timestamp, &_output_rs_writer); } - return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, schema, - _oldest_write_timestamp, _newest_write_timestamp, - &_output_rs_writer); + return _tablet->create_rowset_writer(_output_version, VISIBLE, NONOVERLAPPING, + _cur_tablet_schema, _oldest_write_timestamp, + _newest_write_timestamp, &_output_rs_writer); } Status Compaction::construct_input_rowset_readers() { diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 7f23f94b26..f2072347b2 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -64,7 +64,7 @@ protected: Status modify_rowsets(); void gc_output_rowset(); - Status construct_output_rowset_writer(TabletSchemaSPtr schema, bool is_vertical); + Status construct_output_rowset_writer(bool is_vertical = false); Status construct_input_rowset_readers(); Status check_version_continuity(const std::vector<RowsetSharedPtr>& rowsets); @@ -76,6 +76,11 @@ protected: bool should_vertical_compaction(); int64_t get_avg_segment_rows(); + bool handle_ordered_data_compaction(); + Status do_compact_ordered_rowsets(); + bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs); + void build_basic_info(); + protected: // the root tracker for this compaction std::shared_ptr<MemTrackerLimiter> _mem_tracker; @@ -86,6 +91,8 @@ protected: std::vector<RowsetReaderSharedPtr> _input_rs_readers; int64_t _input_rowsets_size; int64_t _input_row_num; + int64_t _input_num_segments; + int64_t _input_index_size; RowsetSharedPtr _output_rowset; std::unique_ptr<RowsetWriter> _output_rs_writer; @@ -98,6 +105,7 @@ protected: int64_t _oldest_write_timestamp; int64_t _newest_write_timestamp; RowIdConversion _rowid_conversion; + TabletSchemaSPtr _cur_tablet_schema; DISALLOW_COPY_AND_ASSIGN(Compaction); }; diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 084f44b805..6d27916db7 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -81,19 +81,12 @@ Status StorageEngine::start_bg_threads() { .set_min_threads(config::max_cumu_compaction_threads) .set_max_threads(config::max_cumu_compaction_threads) .build(&_cumu_compaction_thread_pool); -<<<<<<< HEAD - ThreadPoolBuilder("SmallCompactionTaskThreadPool") - .set_min_threads(config::quick_compaction_max_threads) - .set_max_threads(config::quick_compaction_max_threads) - .build(&_quick_compaction_thread_pool); if (config::enable_segcompaction && config::enable_storage_vectorization) { ThreadPoolBuilder("SegCompactionTaskThreadPool") .set_min_threads(config::seg_compaction_max_threads) .set_max_threads(config::seg_compaction_max_threads) .build(&_seg_compaction_thread_pool); } -======= ->>>>>>> 480acd41a... [enhancement](compaction) opt compaction task producer and quick compaction (#13495) // compaction tasks producer thread RETURN_IF_ERROR(Thread::create( diff --git a/be/src/olap/rowset/beta_rowset.cpp b/be/src/olap/rowset/beta_rowset.cpp index 8ed48fdf70..6fbe708090 100644 --- a/be/src/olap/rowset/beta_rowset.cpp +++ b/be/src/olap/rowset/beta_rowset.cpp @@ -109,6 +109,19 @@ Status BetaRowset::do_load(bool /*use_cache*/) { return Status::OK(); } +Status BetaRowset::get_segments_size(std::vector<size_t>* segments_size) { + auto fs = _rowset_meta->fs(); + if (!fs || _schema == nullptr) { + return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); + } + for (int seg_id = 0; seg_id < num_segments(); ++seg_id) { + auto seg_path = segment_file_path(seg_id); + size_t file_size; + RETURN_IF_ERROR(fs->file_size(seg_path, &file_size)); + segments_size->push_back(file_size); + } + return Status::OK(); +} Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* segments) { auto fs = _rowset_meta->fs(); if (!fs || _schema == nullptr) { @@ -197,14 +210,15 @@ void BetaRowset::do_close() { // do nothing. } -Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id) { +Status BetaRowset::link_files_to(const std::string& dir, RowsetId new_rowset_id, + size_t new_rowset_start_seg_id) { DCHECK(is_local()); auto fs = _rowset_meta->fs(); if (!fs) { return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED); } for (int i = 0; i < num_segments(); ++i) { - auto dst_path = segment_file_path(dir, new_rowset_id, i); + auto dst_path = segment_file_path(dir, new_rowset_id, i + new_rowset_start_seg_id); // TODO(lingbin): use Env API? or EnvUtil? bool dst_path_exist = false; if (!fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) { diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h index 7cd792bf69..93d9b31676 100644 --- a/be/src/olap/rowset/beta_rowset.h +++ b/be/src/olap/rowset/beta_rowset.h @@ -70,7 +70,8 @@ public: Status remove() override; - Status link_files_to(const std::string& dir, RowsetId new_rowset_id) override; + Status link_files_to(const std::string& dir, RowsetId new_rowset_id, + size_t new_rowset_start_seg_id = 0) override; Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) override; @@ -89,6 +90,8 @@ public: Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment); + Status get_segments_size(std::vector<size_t>* segments_size); + protected: BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path, RowsetMetaSharedPtr rowset_meta); diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp index 485b53421e..154d76224d 100644 --- a/be/src/olap/rowset/beta_rowset_writer.cpp +++ b/be/src/olap/rowset/beta_rowset_writer.cpp @@ -710,6 +710,27 @@ Status BetaRowsetWriter::_wait_flying_segcompaction() { return Status::OK(); } +RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& spec_rowset_meta) { + if (_rowset_meta->oldest_write_timestamp() == -1) { + _rowset_meta->set_oldest_write_timestamp(UnixSeconds()); + } + + if (_rowset_meta->newest_write_timestamp() == -1) { + _rowset_meta->set_newest_write_timestamp(UnixSeconds()); + } + + _build_rowset_meta_with_spec_field(_rowset_meta, spec_rowset_meta); + RowsetSharedPtr rowset; + auto status = RowsetFactory::create_rowset(_context.tablet_schema, _context.rowset_dir, + _rowset_meta, &rowset); + if (!status.ok()) { + LOG(WARNING) << "rowset init failed when build new rowset, res=" << status; + return nullptr; + } + _already_built = true; + return rowset; +} + RowsetSharedPtr BetaRowsetWriter::build() { // TODO(lingbin): move to more better place, or in a CreateBlockBatch? for (auto& file_writer : _file_writers) { @@ -764,6 +785,37 @@ RowsetSharedPtr BetaRowsetWriter::build() { return rowset; } +bool BetaRowsetWriter::_is_segment_overlapping() { + std::string last; + for (auto segment_encode_key : _segments_encoded_key_bounds) { + auto cur_min = segment_encode_key.min_key(); + auto cur_max = segment_encode_key.max_key(); + if (cur_min < last) { + return true; + } + last = cur_max; + } + return false; +} + +void BetaRowsetWriter::_build_rowset_meta_with_spec_field( + RowsetMetaSharedPtr rowset_meta, const RowsetMetaSharedPtr& spec_rowset_meta) { + rowset_meta->set_num_rows(spec_rowset_meta->num_rows()); + rowset_meta->set_total_disk_size(spec_rowset_meta->total_disk_size()); + rowset_meta->set_data_disk_size(spec_rowset_meta->total_disk_size()); + rowset_meta->set_index_disk_size(spec_rowset_meta->index_disk_size()); + // TODO write zonemap to meta + rowset_meta->set_empty(spec_rowset_meta->num_rows() == 0); + rowset_meta->set_creation_time(time(nullptr)); + rowset_meta->set_num_segments(spec_rowset_meta->num_segments()); + rowset_meta->set_segments_overlap(spec_rowset_meta->segments_overlap()); + rowset_meta->set_rowset_state(spec_rowset_meta->rowset_state()); + + std::vector<KeyBoundsPB> segments_key_bounds; + spec_rowset_meta->get_segments_key_bounds(&segments_key_bounds); + rowset_meta->set_segments_key_bounds(segments_key_bounds); +} + void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_meta) { int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment; int64_t num_rows_written = 0; @@ -782,7 +834,7 @@ void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met } } rowset_meta->set_num_segments(num_seg); - if (num_seg <= 1) { + if (!_is_segment_overlapping()) { rowset_meta->set_segments_overlap(NONOVERLAPPING); } _segment_num_rows = segment_num_rows; @@ -930,7 +982,6 @@ Status BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); CHECK_EQ(_segid_statistics_map.find(segid) == _segid_statistics_map.end(), true); _segid_statistics_map.emplace(segid, segstat); - } VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " row_num:" << row_num << " data_size:" << segment_size << " index_size:" << index_size; diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h index 558d426c17..5064ac47b4 100644 --- a/be/src/olap/rowset/beta_rowset_writer.h +++ b/be/src/olap/rowset/beta_rowset_writer.h @@ -64,6 +64,8 @@ public: // for this segment RowsetSharedPtr build_tmp() override; + RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) override; + Version version() override { return _context.version; } int64_t num_rows() const override { return _raw_num_rows_written; } @@ -120,6 +122,10 @@ private: Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments); + void _build_rowset_meta_with_spec_field(RowsetMetaSharedPtr rowset_meta, + const RowsetMetaSharedPtr& spec_rowset_meta); + bool _is_segment_overlapping(); + protected: RowsetWriterContext _context; std::shared_ptr<RowsetMeta> _rowset_meta; diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h index 32fda0e102..312fe62b1a 100644 --- a/be/src/olap/rowset/rowset.h +++ b/be/src/olap/rowset/rowset.h @@ -161,6 +161,7 @@ public: RowsetMetaPB get_rowset_pb() const { return rowset_meta()->get_rowset_pb(); } int64_t oldest_write_timestamp() const { return rowset_meta()->oldest_write_timestamp(); } int64_t newest_write_timestamp() const { return rowset_meta()->newest_write_timestamp(); } + bool is_segments_overlapping() const { return rowset_meta()->is_segments_overlapping(); } KeysType keys_type() { return _schema->keys_type(); } // remove all files in this rowset @@ -198,7 +199,8 @@ public: } // hard link all files in this rowset to `dir` to form a new rowset with id `new_rowset_id`. - virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id) = 0; + virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id, + size_t new_rowset_start_seg_id = 0) = 0; // copy all files to `dir` virtual Status copy_files_to(const std::string& dir, const RowsetId& new_rowset_id) = 0; @@ -265,6 +267,16 @@ public: _rowset_meta->get_segments_key_bounds(segments_key_bounds); return Status::OK(); } + std::string min_key() { + KeyBoundsPB key_bounds; + _rowset_meta->get_first_segment_key_bound(&key_bounds); + return key_bounds.min_key(); + } + std::string max_key() { + KeyBoundsPB key_bounds; + _rowset_meta->get_last_segment_key_bound(&key_bounds); + return key_bounds.max_key(); + } bool check_rowset_segment(); diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index 7154116e7f..a4a7059d39 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -320,6 +320,15 @@ public: segments_key_bounds->push_back(key_range); } } + virtual void get_first_segment_key_bound(KeyBoundsPB* key_bounds) { + DCHECK(_rowset_meta_pb.segments_key_bounds_size() > 0); + *key_bounds = _rowset_meta_pb.segments_key_bounds(0); + } + virtual void get_last_segment_key_bound(KeyBoundsPB* key_bounds) { + DCHECK(_rowset_meta_pb.segments_key_bounds_size() > 0); + *key_bounds = + _rowset_meta_pb.segments_key_bounds(_rowset_meta_pb.segments_key_bounds_size() - 1); + } void set_segments_key_bounds(const std::vector<KeyBoundsPB>& segments_key_bounds) { for (const KeyBoundsPB& key_bounds : segments_key_bounds) { diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h index f5d095a48c..531dd103c8 100644 --- a/be/src/olap/rowset/rowset_writer.h +++ b/be/src/olap/rowset/rowset_writer.h @@ -82,6 +82,9 @@ public: // real build will be called in DeltaWriter close_wait. virtual RowsetSharedPtr build_tmp() = 0; + // For ordered rowset compaction, manual build rowset + virtual RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) = 0; + virtual Version version() = 0; virtual int64_t num_rows() const = 0; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 971706abbf..284738e97d 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -51,11 +51,9 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, uint32_t segment_id, _mem_tracker(std::make_unique<MemTracker>("SegmentWriter:Segment-" + std::to_string(segment_id))) { CHECK_NOTNULL(file_writer); - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { - _num_key_columns = _tablet_schema->num_key_columns(); - } else { - _num_key_columns = _tablet_schema->num_short_key_columns(); - } + _num_key_columns = _tablet_schema->num_key_columns(); + _num_short_key_columns = _tablet_schema->num_short_key_columns(); + DCHECK(_num_key_columns >= _num_short_key_columns); for (size_t cid = 0; cid < _num_key_columns; ++cid) { const auto& column = _tablet_schema->column(cid); _key_coders.push_back(get_key_coder(column.type())); @@ -206,10 +204,15 @@ Status SegmentWriter::append_block(const vectorized::Block* block, size_t row_po // create primary indexes for (size_t pos = 0; pos < num_rows; pos++) { RETURN_IF_ERROR( - _primary_key_index_builder->add_item(_encode_keys(key_columns, pos))); + _primary_key_index_builder->add_item(_full_encode_keys(key_columns, pos))); } } else { - // create short key indexes + // create short key indexes' + // for min_max key + for (size_t pos = 0; pos < num_rows; pos++) { + set_min_max_key(_full_encode_keys(key_columns, pos)); + } + key_columns.resize(_num_short_key_columns); for (const auto pos : short_key_pos) { RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns, pos))); } @@ -233,17 +236,15 @@ int64_t SegmentWriter::max_row_to_add(size_t row_avg_size_in_bytes) { return std::min(size_rows, count_rows); } -std::string SegmentWriter::_encode_keys( +std::string SegmentWriter::_full_encode_keys( const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos, bool null_first) { - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write && - _tablet_schema->has_sequence_col()) { + assert(_key_index_size.size() == _num_key_columns); + if (_tablet_schema->has_sequence_col() && _opts.enable_unique_key_merge_on_write) { assert(key_columns.size() == _num_key_columns + 1 && - _key_coders.size() == _num_key_columns + 1 && - _key_index_size.size() == _num_key_columns); + _key_coders.size() == _num_key_columns + 1); } else { - assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns && - _key_index_size.size() == _num_key_columns); + assert(key_columns.size() == _num_key_columns && _key_coders.size() == _num_key_columns); } std::string encoded_keys; @@ -259,11 +260,31 @@ std::string SegmentWriter::_encode_keys( continue; } encoded_keys.push_back(KEY_NORMAL_MARKER); - if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { - _key_coders[cid]->full_encode_ascending(field, &encoded_keys); - } else { - _key_coders[cid]->encode_ascending(field, _key_index_size[cid], &encoded_keys); + _key_coders[cid]->full_encode_ascending(field, &encoded_keys); + ++cid; + } + return encoded_keys; +} + +std::string SegmentWriter::_encode_keys( + const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos, + bool null_first) { + assert(key_columns.size() == _num_short_key_columns); + + std::string encoded_keys; + size_t cid = 0; + for (const auto& column : key_columns) { + auto field = column->get_data_at(pos); + if (UNLIKELY(!field)) { + if (null_first) { + encoded_keys.push_back(KEY_NULL_FIRST_MARKER); + } else { + encoded_keys.push_back(KEY_NULL_LAST_MARKER); + } + continue; } + encoded_keys.push_back(KEY_NORMAL_MARKER); + _key_coders[cid]->encode_ascending(field, _key_index_size[cid], &encoded_keys); ++cid; } return encoded_keys; @@ -275,24 +296,25 @@ Status SegmentWriter::append_row(const RowType& row) { auto cell = row.cell(cid); RETURN_IF_ERROR(_column_writers[cid]->append(cell)); } + std::string full_encoded_key; + encode_key<RowType, true, true>(&full_encoded_key, row, _num_key_columns); + if (_tablet_schema->has_sequence_col()) { + full_encoded_key.push_back(KEY_NORMAL_MARKER); + auto cid = _tablet_schema->sequence_col_idx(); + auto cell = row.cell(cid); + row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &full_encoded_key); + } if (_tablet_schema->keys_type() == UNIQUE_KEYS && _opts.enable_unique_key_merge_on_write) { - std::string encoded_key; - encode_key<RowType, true, true>(&encoded_key, row, _num_key_columns); - if (_tablet_schema->has_sequence_col()) { - encoded_key.push_back(KEY_NORMAL_MARKER); - auto cid = _tablet_schema->sequence_col_idx(); - auto cell = row.cell(cid); - row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), &encoded_key); - } - RETURN_IF_ERROR(_primary_key_index_builder->add_item(encoded_key)); + RETURN_IF_ERROR(_primary_key_index_builder->add_item(full_encoded_key)); } else { // At the beginning of one block, so add a short key index entry if ((_num_rows_written % _opts.num_rows_per_block) == 0) { std::string encoded_key; - encode_key(&encoded_key, row, _num_key_columns); + encode_key(&encoded_key, row, _num_short_key_columns); RETURN_IF_ERROR(_short_key_index_builder->add_item(encoded_key)); } + set_min_max_key(full_encoded_key); } ++_num_rows_written; return Status::OK(); @@ -465,13 +487,22 @@ Status SegmentWriter::_write_raw_data(const std::vector<Slice>& slices) { } Slice SegmentWriter::min_encoded_key() { - return (_primary_key_index_builder == nullptr) ? Slice() + return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(), _min_key.size()) : _primary_key_index_builder->min_key(); } Slice SegmentWriter::max_encoded_key() { - return (_primary_key_index_builder == nullptr) ? Slice() + return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(), _max_key.size()) : _primary_key_index_builder->max_key(); } +void SegmentWriter::set_min_max_key(const Slice& key) { + if (UNLIKELY(_is_first_row)) { + _min_key.append(key.get_data(), key.get_size()); + _is_first_row = false; + } + _max_key.clear(); + _max_key.append(key.get_data(), key.get_size()); +} + } // namespace segment_v2 } // namespace doris diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h b/be/src/olap/rowset/segment_v2/segment_writer.h index 8e7041d21b..689224ba8f 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.h +++ b/be/src/olap/rowset/segment_v2/segment_writer.h @@ -26,6 +26,7 @@ #include "gen_cpp/segment_v2.pb.h" #include "gutil/macros.h" #include "olap/tablet_schema.h" +#include "util/faststring.h" #include "vec/core/block.h" #include "vec/olap/olap_data_convertor.h" @@ -112,6 +113,11 @@ private: Status _write_raw_data(const std::vector<Slice>& slices); std::string _encode_keys(const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos, bool null_first = true); + // for unique-key merge on write and segment min_max key + std::string _full_encode_keys( + const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, size_t pos, + bool null_first = true); + void set_min_max_key(const Slice& key); void _reset_column_writers(); @@ -127,6 +133,7 @@ private: SegmentFooterPB _footer; size_t _num_key_columns; + size_t _num_short_key_columns; std::unique_ptr<ShortKeyIndexBuilder> _short_key_index_builder; std::unique_ptr<PrimaryKeyIndexBuilder> _primary_key_index_builder; std::vector<std::unique_ptr<ColumnWriter>> _column_writers; @@ -146,6 +153,10 @@ private: // In vertical compaction row count is recorded when key columns group finish // and _num_rows_written will be updated in value column group uint32_t _row_count = 0; + + bool _is_first_row = true; + faststring _min_key; + faststring _max_key; }; } // namespace segment_v2 diff --git a/be/test/io/cache/remote_file_cache_test.cpp b/be/test/io/cache/remote_file_cache_test.cpp index 5de0a5f955..813c8807f5 100644 --- a/be/test/io/cache/remote_file_cache_test.cpp +++ b/be/test/io/cache/remote_file_cache_test.cpp @@ -139,8 +139,8 @@ protected: EXPECT_TRUE(st.ok()); EXPECT_TRUE(file_writer->close().ok()); - EXPECT_EQ("", writer.min_encoded_key().to_string()); - EXPECT_EQ("", writer.max_encoded_key().to_string()); + EXPECT_NE("", writer.min_encoded_key().to_string()); + EXPECT_NE("", writer.max_encoded_key().to_string()); st = segment_v2::Segment::open(fs, path, "", 0, {}, query_schema, res); EXPECT_TRUE(st.ok()); diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index 27b43fec3c..01ac4a667c 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -75,7 +75,7 @@ protected: TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>(); TabletSchemaPB tablet_schema_pb; tablet_schema_pb.set_keys_type(keys_type); - tablet_schema_pb.set_num_short_key_columns(2); + tablet_schema_pb.set_num_short_key_columns(1); tablet_schema_pb.set_num_rows_per_row_block(1024); tablet_schema_pb.set_compress_kind(COMPRESS_NONE); tablet_schema_pb.set_next_column_unique_id(4); diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp b/be/test/olap/rowset/segment_v2/segment_test.cpp index 068d5b3ee3..5f9ff06d2b 100644 --- a/be/test/olap/rowset/segment_v2/segment_test.cpp +++ b/be/test/olap/rowset/segment_v2/segment_test.cpp @@ -146,29 +146,22 @@ protected: EXPECT_TRUE(st.ok()); EXPECT_TRUE(file_writer->close().ok()); // Check min/max key generation - if (build_schema->keys_type() == UNIQUE_KEYS && opts.enable_unique_key_merge_on_write) { - // Create min row - for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) { - RowCursorCell cell = row.cell(cid); - generator(0, cid, 0 / opts.num_rows_per_block, cell); - } - std::string min_encoded_key; - encode_key<RowCursor, true, true>(&min_encoded_key, row, - build_schema->num_key_columns()); - EXPECT_EQ(min_encoded_key, writer.min_encoded_key().to_string()); - // Create max row - for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) { - RowCursorCell cell = row.cell(cid); - generator(nrows - 1, cid, (nrows - 1) / opts.num_rows_per_block, cell); - } - std::string max_encoded_key; - encode_key<RowCursor, true, true>(&max_encoded_key, row, - build_schema->num_key_columns()); - EXPECT_EQ(max_encoded_key, writer.max_encoded_key().to_string()); - } else { - EXPECT_EQ("", writer.min_encoded_key().to_string()); - EXPECT_EQ("", writer.max_encoded_key().to_string()); + // Create min row + for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) { + RowCursorCell cell = row.cell(cid); + generator(0, cid, 0 / opts.num_rows_per_block, cell); + } + std::string min_encoded_key; + encode_key<RowCursor, true, true>(&min_encoded_key, row, build_schema->num_key_columns()); + EXPECT_EQ(min_encoded_key, writer.min_encoded_key().to_string()); + // Create max row + for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) { + RowCursorCell cell = row.cell(cid); + generator(nrows - 1, cid, (nrows - 1) / opts.num_rows_per_block, cell); } + std::string max_encoded_key; + encode_key<RowCursor, true, true>(&max_encoded_key, row, build_schema->num_key_columns()); + EXPECT_EQ(max_encoded_key, writer.max_encoded_key().to_string()); st = Segment::open(fs, path, "", 0, {}, query_schema, res); EXPECT_TRUE(st.ok()); diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h index 17865ce3d8..234a64eeab 100644 --- a/be/test/testutil/mock_rowset.h +++ b/be/test/testutil/mock_rowset.h @@ -37,7 +37,8 @@ class MockRowset : public Rowset { return Status::NotSupported("MockRowset not support this method."); } - virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id) override { + virtual Status link_files_to(const std::string& dir, RowsetId new_rowset_id, + size_t start_seg_id) override { return Status::NotSupported("MockRowset not support this method."); } diff --git a/build-support/clang-format.sh b/build-support/clang-format.sh index 9dfeb25167..dd04d4ba23 100755 --- a/build-support/clang-format.sh +++ b/build-support/clang-format.sh @@ -31,6 +31,7 @@ DORIS_HOME=$( ) export DORIS_HOME -CLANG_FORMAT="${CLANG_FORMAT_BINARY:=$(which clang-format)}" +#CLANG_FORMAT="${CLANG_FORMAT_BINARY:=$(which clang-format)}" +CLANG_FORMAT="/mnt/disk1/yixiu/incubator-doris/clang-format" python "${DORIS_HOME}/build-support/run_clang_format.py" "--clang-format-executable" "${CLANG_FORMAT}" "-r" "--style" "file" "--inplace" "true" "--extensions" "c,h,C,H,cpp,hpp,cc,hh,c++,h++,cxx,hxx" "--exclude" "none" "be/src be/test" diff --git a/regression-test/conf/regression-conf.groovy b/regression-test/conf/regression-conf.groovy index 49d20902c1..67288da324 100644 --- a/regression-test/conf/regression-conf.groovy +++ b/regression-test/conf/regression-conf.groovy @@ -81,4 +81,4 @@ pg_14_port=5442 // See `docker/thirdparties/start-thirdparties-docker.sh` enableHiveTest=false hms_port=9183 -hdfs_port=8120 +hdfs_port=8120 \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org