This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch compaction_opt
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 468b08a8542aed718f90b0cccab46977801a074a
Author: yixiutt <102007456+yixi...@users.noreply.github.com>
AuthorDate: Wed Nov 9 09:10:52 2022 +0800

    (compaction) support ordered data compaction (#14054)
---
 be/src/olap/compaction.cpp                       | 168 +++++++++++++++++++----
 be/src/olap/compaction.h                         |  10 +-
 be/src/olap/olap_server.cpp                      |   7 -
 be/src/olap/rowset/beta_rowset.cpp               |  18 ++-
 be/src/olap/rowset/beta_rowset.h                 |   5 +-
 be/src/olap/rowset/beta_rowset_writer.cpp        |  55 +++++++-
 be/src/olap/rowset/beta_rowset_writer.h          |   6 +
 be/src/olap/rowset/rowset.h                      |  14 +-
 be/src/olap/rowset/rowset_meta.h                 |   9 ++
 be/src/olap/rowset/rowset_writer.h               |   3 +
 be/src/olap/rowset/segment_v2/segment_writer.cpp |  91 ++++++++----
 be/src/olap/rowset/segment_v2/segment_writer.h   |  11 ++
 be/test/io/cache/remote_file_cache_test.cpp      |   4 +-
 be/test/olap/rowid_conversion_test.cpp           |   2 +-
 be/test/olap/rowset/segment_v2/segment_test.cpp  |  37 ++---
 be/test/testutil/mock_rowset.h                   |   3 +-
 build-support/clang-format.sh                    |   3 +-
 regression-test/conf/regression-conf.groovy      |   2 +-
 18 files changed, 351 insertions(+), 97 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index c2355c56d7..0283e1e76c 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -19,6 +19,7 @@
 
 #include "common/status.h"
 #include "gutil/strings/substitute.h"
+#include "olap/rowset/beta_rowset.h"
 #include "olap/rowset/rowset.h"
 #include "olap/rowset/rowset_meta.h"
 #include "olap/tablet.h"
@@ -34,6 +35,8 @@ Compaction::Compaction(TabletSharedPtr tablet, const 
std::string& label)
         : _tablet(tablet),
           _input_rowsets_size(0),
           _input_row_num(0),
+          _input_num_segments(0),
+          _input_index_size(0),
           _state(CompactionState::INITED) {
     _mem_tracker = 
std::make_shared<MemTrackerLimiter>(MemTrackerLimiter::Type::COMPACTION, label);
 }
@@ -99,20 +102,76 @@ int64_t Compaction::get_avg_segment_rows() {
     // todo(yixiu): add a new conf of segment size in compaction
     return config::write_buffer_size / (_input_rowsets_size / (_input_row_num 
+ 1) + 1);
 }
+bool Compaction::is_rowset_tidy(std::string& pre_max_key, const 
RowsetSharedPtr& rhs) {
+    size_t min_tidy_size = 10 * 1024 * 1024;
+    if (rhs->num_segments() == 0) {
+        return true;
+    }
+    if (rhs->is_segments_overlapping()) {
+        return false;
+    }
+    // check segment size
+    auto beta_rowset = reinterpret_cast<BetaRowset*>(rhs.get());
+    std::vector<size_t> segments_size;
+    beta_rowset->get_segments_size(&segments_size);
+    for (auto segment_size : segments_size) {
+        // is segment is too small, need to do compaction
+        if (segment_size < min_tidy_size) {
+            return false;
+        }
+    }
 
-Status Compaction::do_compaction_impl(int64_t permits) {
-    OlapStopWatch watch;
+    auto min_key = rhs->min_key();
+    if (min_key < pre_max_key) {
+        return false;
+    }
+    pre_max_key = rhs->max_key();
+    return true;
+}
+
+Status Compaction::do_compact_ordered_rowsets() {
+    LOG(INFO) << "start to do ordered data compaction, tablet=" << 
_tablet->full_name()
+              << ", output_version=" << _output_version;
+    build_basic_info();
+    RETURN_NOT_OK(construct_output_rowset_writer());
+    // link data to new rowset
+    auto seg_id = 0;
+    std::vector<KeyBoundsPB> segment_key_bounds;
+    for (auto rowset : _input_rowsets) {
+        RETURN_NOT_OK(rowset->link_files_to(_tablet->tablet_path(), 
_output_rs_writer->rowset_id(),
+                                            seg_id));
+        seg_id += rowset->num_segments();
 
-    // 1. prepare input and output parameters
-    int64_t segments_num = 0;
+        std::vector<KeyBoundsPB> key_bounds;
+        rowset->get_segments_key_bounds(&key_bounds);
+        segment_key_bounds.insert(segment_key_bounds.end(), 
key_bounds.begin(), key_bounds.end());
+    }
+    // build output rowset
+    RowsetMetaSharedPtr rowset_meta = std::make_shared<RowsetMeta>();
+    rowset_meta->set_num_rows(_input_row_num);
+    rowset_meta->set_total_disk_size(_input_rowsets_size);
+    rowset_meta->set_data_disk_size(_input_rowsets_size);
+    rowset_meta->set_index_disk_size(_input_index_size);
+    rowset_meta->set_empty(_input_row_num == 0);
+    rowset_meta->set_num_segments(_input_num_segments);
+    rowset_meta->set_segments_overlap(NONOVERLAPPING);
+    rowset_meta->set_rowset_state(VISIBLE);
+
+    rowset_meta->set_segments_key_bounds(segment_key_bounds);
+    _output_rowset = _output_rs_writer->manual_build(rowset_meta);
+    return Status::OK();
+}
+
+void Compaction::build_basic_info() {
     for (auto& rowset : _input_rowsets) {
         _input_rowsets_size += rowset->data_disk_size();
+        _input_index_size += rowset->index_disk_size();
         _input_row_num += rowset->num_rows();
-        segments_num += rowset->num_segments();
+        _input_num_segments += rowset->num_segments();
     }
     TRACE_COUNTER_INCREMENT("input_rowsets_data_size", _input_rowsets_size);
     TRACE_COUNTER_INCREMENT("input_row_num", _input_row_num);
-    TRACE_COUNTER_INCREMENT("input_segments_num", segments_num);
+    TRACE_COUNTER_INCREMENT("input_segments_num", _input_num_segments);
 
     _output_version =
             Version(_input_rowsets.front()->start_version(), 
_input_rowsets.back()->end_version());
@@ -120,22 +179,81 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     _oldest_write_timestamp = _input_rowsets.front()->oldest_write_timestamp();
     _newest_write_timestamp = _input_rowsets.back()->newest_write_timestamp();
 
-    auto use_vectorized_compaction = config::enable_vectorized_compaction;
-    string merge_type = use_vectorized_compaction ? "v" : "";
-    bool vertical_compaction = should_vertical_compaction();
-
-    LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << 
_tablet->full_name()
-              << ", output_version=" << _output_version << ", permits: " << 
permits
-              << ", is_vertical_compaction=" << vertical_compaction;
-    // get cur schema if rowset schema exist, rowset schema must be newer than 
tablet schema
     std::vector<RowsetMetaSharedPtr> rowset_metas(_input_rowsets.size());
     std::transform(_input_rowsets.begin(), _input_rowsets.end(), 
rowset_metas.begin(),
                    [](const RowsetSharedPtr& rowset) { return 
rowset->rowset_meta(); });
-    TabletSchemaSPtr cur_tablet_schema =
+    _cur_tablet_schema =
             
_tablet->rowset_meta_with_max_schema_version(rowset_metas)->tablet_schema();
+}
+
+bool Compaction::handle_ordered_data_compaction() {
+    // check delete version: if compaction type is base compaction and
+    // has a delete version, use original compaction
+    if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
+        for (auto rowset : _input_rowsets) {
+            if (_tablet->version_for_delete_predicate(rowset->version())) {
+                return false;
+            }
+        }
+    }
 
-    RETURN_NOT_OK(construct_output_rowset_writer(cur_tablet_schema, 
vertical_compaction));
+    // check if rowsets are tidy so we can just modify meta and do link
+    // files to handle compaction
+    auto input_size = _input_rowsets.size();
+    std::string pre_max_key;
+    for (auto i = 0; i < input_size; ++i) {
+        if (!is_rowset_tidy(pre_max_key, _input_rowsets[i])) {
+            if (i <= input_size / 2) {
+                return false;
+            } else {
+                _input_rowsets.resize(i);
+                break;
+            }
+        }
+    }
+    // most rowset of current compaction is nonoverlapping
+    // just handle nonoverlappint rowsets
+    auto st = do_compact_ordered_rowsets();
+    if (!st.ok()) {
+        return false;
+    }
+    return true;
+}
+
+Status Compaction::do_compaction_impl(int64_t permits) {
+    OlapStopWatch watch;
+
+    auto use_vectorized_compaction = config::enable_vectorized_compaction;
+    string merge_type = use_vectorized_compaction ? "v" : "";
+
+    if (handle_ordered_data_compaction()) {
+        RETURN_NOT_OK(modify_rowsets());
+        TRACE("modify rowsets finished");
+
+        int64_t now = UnixMillis();
+        if (compaction_type() == ReaderType::READER_CUMULATIVE_COMPACTION) {
+            _tablet->set_last_cumu_compaction_success_time(now);
+        } else {
+            _tablet->set_last_base_compaction_success_time(now);
+        }
+        auto cumu_policy = _tablet->cumulative_compaction_policy();
+        LOG(INFO) << "succeed to do ordered data " << merge_type << 
compaction_name()
+                  << ". tablet=" << _tablet->full_name() << ", 
output_version=" << _output_version
+                  << ", disk=" << _tablet->data_dir()->path()
+                  << ", segments=" << _input_num_segments << ", 
input_row_num=" << _input_row_num
+                  << ", output_row_num=" << _output_rowset->num_rows()
+                  << ". elapsed time=" << watch.get_elapse_second()
+                  << "s. cumulative_compaction_policy="
+                  << (cumu_policy == nullptr ? "quick" : cumu_policy->name());
+        return Status::OK();
+    }
+    build_basic_info();
+
+    LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << 
_tablet->full_name()
+              << ", output_version=" << _output_version << ", permits: " << 
permits;
+    bool vertical_compaction = should_vertical_compaction();
     RETURN_NOT_OK(construct_input_rowset_readers());
+    RETURN_NOT_OK(construct_output_rowset_writer(vertical_compaction));
     TRACE("prepare finished");
 
     // 2. write merged rows to output rowset
@@ -149,15 +267,15 @@ Status Compaction::do_compaction_impl(int64_t permits) {
 
     if (use_vectorized_compaction) {
         if (vertical_compaction) {
-            res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), 
cur_tablet_schema,
+            res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
                                                  _input_rs_readers, 
_output_rs_writer.get(),
                                                  get_avg_segment_rows(), 
&stats);
         } else {
-            res = Merger::vmerge_rowsets(_tablet, compaction_type(), 
cur_tablet_schema,
+            res = Merger::vmerge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
                                          _input_rs_readers, 
_output_rs_writer.get(), &stats);
         }
     } else {
-        res = Merger::merge_rowsets(_tablet, compaction_type(), 
cur_tablet_schema,
+        res = Merger::merge_rowsets(_tablet, compaction_type(), 
_cur_tablet_schema,
                                     _input_rs_readers, 
_output_rs_writer.get(), &stats);
     }
 
@@ -215,7 +333,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     LOG(INFO) << "succeed to do " << merge_type << compaction_name()
               << ". tablet=" << _tablet->full_name() << ", output_version=" << 
_output_version
               << ", current_max_version=" << current_max_version
-              << ", disk=" << _tablet->data_dir()->path() << ", segments=" << 
segments_num
+              << ", disk=" << _tablet->data_dir()->path() << ", segments=" << 
_input_num_segments
               << ", input_row_num=" << _input_row_num
               << ", output_row_num=" << _output_rowset->num_rows()
               << ". elapsed time=" << watch.get_elapse_second()
@@ -225,15 +343,15 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     return Status::OK();
 }
 
-Status Compaction::construct_output_rowset_writer(TabletSchemaSPtr schema, 
bool is_vertical) {
+Status Compaction::construct_output_rowset_writer(bool is_vertical) {
     if (is_vertical) {
         return _tablet->create_vertical_rowset_writer(_output_version, 
VISIBLE, NONOVERLAPPING,
-                                                      schema, 
_oldest_write_timestamp,
+                                                      _cur_tablet_schema, 
_oldest_write_timestamp,
                                                       _newest_write_timestamp, 
&_output_rs_writer);
     }
-    return _tablet->create_rowset_writer(_output_version, VISIBLE, 
NONOVERLAPPING, schema,
-                                         _oldest_write_timestamp, 
_newest_write_timestamp,
-                                         &_output_rs_writer);
+    return _tablet->create_rowset_writer(_output_version, VISIBLE, 
NONOVERLAPPING,
+                                         _cur_tablet_schema, 
_oldest_write_timestamp,
+                                         _newest_write_timestamp, 
&_output_rs_writer);
 }
 
 Status Compaction::construct_input_rowset_readers() {
diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h
index 7f23f94b26..f2072347b2 100644
--- a/be/src/olap/compaction.h
+++ b/be/src/olap/compaction.h
@@ -64,7 +64,7 @@ protected:
     Status modify_rowsets();
     void gc_output_rowset();
 
-    Status construct_output_rowset_writer(TabletSchemaSPtr schema, bool 
is_vertical);
+    Status construct_output_rowset_writer(bool is_vertical = false);
     Status construct_input_rowset_readers();
 
     Status check_version_continuity(const std::vector<RowsetSharedPtr>& 
rowsets);
@@ -76,6 +76,11 @@ protected:
     bool should_vertical_compaction();
     int64_t get_avg_segment_rows();
 
+    bool handle_ordered_data_compaction();
+    Status do_compact_ordered_rowsets();
+    bool is_rowset_tidy(std::string& pre_max_key, const RowsetSharedPtr& rhs);
+    void build_basic_info();
+
 protected:
     // the root tracker for this compaction
     std::shared_ptr<MemTrackerLimiter> _mem_tracker;
@@ -86,6 +91,8 @@ protected:
     std::vector<RowsetReaderSharedPtr> _input_rs_readers;
     int64_t _input_rowsets_size;
     int64_t _input_row_num;
+    int64_t _input_num_segments;
+    int64_t _input_index_size;
 
     RowsetSharedPtr _output_rowset;
     std::unique_ptr<RowsetWriter> _output_rs_writer;
@@ -98,6 +105,7 @@ protected:
     int64_t _oldest_write_timestamp;
     int64_t _newest_write_timestamp;
     RowIdConversion _rowid_conversion;
+    TabletSchemaSPtr _cur_tablet_schema;
 
     DISALLOW_COPY_AND_ASSIGN(Compaction);
 };
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 084f44b805..6d27916db7 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -81,19 +81,12 @@ Status StorageEngine::start_bg_threads() {
             .set_min_threads(config::max_cumu_compaction_threads)
             .set_max_threads(config::max_cumu_compaction_threads)
             .build(&_cumu_compaction_thread_pool);
-<<<<<<< HEAD
-    ThreadPoolBuilder("SmallCompactionTaskThreadPool")
-            .set_min_threads(config::quick_compaction_max_threads)
-            .set_max_threads(config::quick_compaction_max_threads)
-            .build(&_quick_compaction_thread_pool);
     if (config::enable_segcompaction && config::enable_storage_vectorization) {
         ThreadPoolBuilder("SegCompactionTaskThreadPool")
                 .set_min_threads(config::seg_compaction_max_threads)
                 .set_max_threads(config::seg_compaction_max_threads)
                 .build(&_seg_compaction_thread_pool);
     }
-=======
->>>>>>> 480acd41a... [enhancement](compaction) opt compaction task producer 
and quick compaction (#13495)
 
     // compaction tasks producer thread
     RETURN_IF_ERROR(Thread::create(
diff --git a/be/src/olap/rowset/beta_rowset.cpp 
b/be/src/olap/rowset/beta_rowset.cpp
index 8ed48fdf70..6fbe708090 100644
--- a/be/src/olap/rowset/beta_rowset.cpp
+++ b/be/src/olap/rowset/beta_rowset.cpp
@@ -109,6 +109,19 @@ Status BetaRowset::do_load(bool /*use_cache*/) {
     return Status::OK();
 }
 
+Status BetaRowset::get_segments_size(std::vector<size_t>* segments_size) {
+    auto fs = _rowset_meta->fs();
+    if (!fs || _schema == nullptr) {
+        return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
+    }
+    for (int seg_id = 0; seg_id < num_segments(); ++seg_id) {
+        auto seg_path = segment_file_path(seg_id);
+        size_t file_size;
+        RETURN_IF_ERROR(fs->file_size(seg_path, &file_size));
+        segments_size->push_back(file_size);
+    }
+    return Status::OK();
+}
 Status BetaRowset::load_segments(std::vector<segment_v2::SegmentSharedPtr>* 
segments) {
     auto fs = _rowset_meta->fs();
     if (!fs || _schema == nullptr) {
@@ -197,14 +210,15 @@ void BetaRowset::do_close() {
     // do nothing.
 }
 
-Status BetaRowset::link_files_to(const std::string& dir, RowsetId 
new_rowset_id) {
+Status BetaRowset::link_files_to(const std::string& dir, RowsetId 
new_rowset_id,
+                                 size_t new_rowset_start_seg_id) {
     DCHECK(is_local());
     auto fs = _rowset_meta->fs();
     if (!fs) {
         return Status::OLAPInternalError(OLAP_ERR_INIT_FAILED);
     }
     for (int i = 0; i < num_segments(); ++i) {
-        auto dst_path = segment_file_path(dir, new_rowset_id, i);
+        auto dst_path = segment_file_path(dir, new_rowset_id, i + 
new_rowset_start_seg_id);
         // TODO(lingbin): use Env API? or EnvUtil?
         bool dst_path_exist = false;
         if (!fs->exists(dst_path, &dst_path_exist).ok() || dst_path_exist) {
diff --git a/be/src/olap/rowset/beta_rowset.h b/be/src/olap/rowset/beta_rowset.h
index 7cd792bf69..93d9b31676 100644
--- a/be/src/olap/rowset/beta_rowset.h
+++ b/be/src/olap/rowset/beta_rowset.h
@@ -70,7 +70,8 @@ public:
 
     Status remove() override;
 
-    Status link_files_to(const std::string& dir, RowsetId new_rowset_id) 
override;
+    Status link_files_to(const std::string& dir, RowsetId new_rowset_id,
+                         size_t new_rowset_start_seg_id = 0) override;
 
     Status copy_files_to(const std::string& dir, const RowsetId& 
new_rowset_id) override;
 
@@ -89,6 +90,8 @@ public:
 
     Status load_segment(int64_t seg_id, segment_v2::SegmentSharedPtr* segment);
 
+    Status get_segments_size(std::vector<size_t>* segments_size);
+
 protected:
     BetaRowset(TabletSchemaSPtr schema, const std::string& tablet_path,
                RowsetMetaSharedPtr rowset_meta);
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp 
b/be/src/olap/rowset/beta_rowset_writer.cpp
index 485b53421e..154d76224d 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -710,6 +710,27 @@ Status BetaRowsetWriter::_wait_flying_segcompaction() {
     return Status::OK();
 }
 
+RowsetSharedPtr BetaRowsetWriter::manual_build(const RowsetMetaSharedPtr& 
spec_rowset_meta) {
+    if (_rowset_meta->oldest_write_timestamp() == -1) {
+        _rowset_meta->set_oldest_write_timestamp(UnixSeconds());
+    }
+
+    if (_rowset_meta->newest_write_timestamp() == -1) {
+        _rowset_meta->set_newest_write_timestamp(UnixSeconds());
+    }
+
+    _build_rowset_meta_with_spec_field(_rowset_meta, spec_rowset_meta);
+    RowsetSharedPtr rowset;
+    auto status = RowsetFactory::create_rowset(_context.tablet_schema, 
_context.rowset_dir,
+                                               _rowset_meta, &rowset);
+    if (!status.ok()) {
+        LOG(WARNING) << "rowset init failed when build new rowset, res=" << 
status;
+        return nullptr;
+    }
+    _already_built = true;
+    return rowset;
+}
+
 RowsetSharedPtr BetaRowsetWriter::build() {
     // TODO(lingbin): move to more better place, or in a CreateBlockBatch?
     for (auto& file_writer : _file_writers) {
@@ -764,6 +785,37 @@ RowsetSharedPtr BetaRowsetWriter::build() {
     return rowset;
 }
 
+bool BetaRowsetWriter::_is_segment_overlapping() {
+    std::string last;
+    for (auto segment_encode_key : _segments_encoded_key_bounds) {
+        auto cur_min = segment_encode_key.min_key();
+        auto cur_max = segment_encode_key.max_key();
+        if (cur_min < last) {
+            return true;
+        }
+        last = cur_max;
+    }
+    return false;
+}
+
+void BetaRowsetWriter::_build_rowset_meta_with_spec_field(
+        RowsetMetaSharedPtr rowset_meta, const RowsetMetaSharedPtr& 
spec_rowset_meta) {
+    rowset_meta->set_num_rows(spec_rowset_meta->num_rows());
+    rowset_meta->set_total_disk_size(spec_rowset_meta->total_disk_size());
+    rowset_meta->set_data_disk_size(spec_rowset_meta->total_disk_size());
+    rowset_meta->set_index_disk_size(spec_rowset_meta->index_disk_size());
+    // TODO write zonemap to meta
+    rowset_meta->set_empty(spec_rowset_meta->num_rows() == 0);
+    rowset_meta->set_creation_time(time(nullptr));
+    rowset_meta->set_num_segments(spec_rowset_meta->num_segments());
+    rowset_meta->set_segments_overlap(spec_rowset_meta->segments_overlap());
+    rowset_meta->set_rowset_state(spec_rowset_meta->rowset_state());
+
+    std::vector<KeyBoundsPB> segments_key_bounds;
+    spec_rowset_meta->get_segments_key_bounds(&segments_key_bounds);
+    rowset_meta->set_segments_key_bounds(segments_key_bounds);
+}
+
 void BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> 
rowset_meta) {
     int64_t num_seg = _is_segcompacted() ? _num_segcompacted : _num_segment;
     int64_t num_rows_written = 0;
@@ -782,7 +834,7 @@ void 
BetaRowsetWriter::_build_rowset_meta(std::shared_ptr<RowsetMeta> rowset_met
         }
     }
     rowset_meta->set_num_segments(num_seg);
-    if (num_seg <= 1) {
+    if (!_is_segment_overlapping()) {
         rowset_meta->set_segments_overlap(NONOVERLAPPING);
     }
     _segment_num_rows = segment_num_rows;
@@ -930,7 +982,6 @@ Status 
BetaRowsetWriter::_flush_segment_writer(std::unique_ptr<segment_v2::Segme
         std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex);
         CHECK_EQ(_segid_statistics_map.find(segid) == 
_segid_statistics_map.end(), true);
         _segid_statistics_map.emplace(segid, segstat);
-
     }
     VLOG_DEBUG << "_segid_statistics_map add new record. segid:" << segid << " 
row_num:" << row_num
                << " data_size:" << segment_size << " index_size:" << 
index_size;
diff --git a/be/src/olap/rowset/beta_rowset_writer.h 
b/be/src/olap/rowset/beta_rowset_writer.h
index 558d426c17..5064ac47b4 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -64,6 +64,8 @@ public:
     // for this segment
     RowsetSharedPtr build_tmp() override;
 
+    RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& rowset_meta) 
override;
+
     Version version() override { return _context.version; }
 
     int64_t num_rows() const override { return _raw_num_rows_written; }
@@ -120,6 +122,10 @@ private:
 
     Status _do_compact_segments(SegCompactionCandidatesSharedPtr segments);
 
+    void _build_rowset_meta_with_spec_field(RowsetMetaSharedPtr rowset_meta,
+                                            const RowsetMetaSharedPtr& 
spec_rowset_meta);
+    bool _is_segment_overlapping();
+
 protected:
     RowsetWriterContext _context;
     std::shared_ptr<RowsetMeta> _rowset_meta;
diff --git a/be/src/olap/rowset/rowset.h b/be/src/olap/rowset/rowset.h
index 32fda0e102..312fe62b1a 100644
--- a/be/src/olap/rowset/rowset.h
+++ b/be/src/olap/rowset/rowset.h
@@ -161,6 +161,7 @@ public:
     RowsetMetaPB get_rowset_pb() const { return 
rowset_meta()->get_rowset_pb(); }
     int64_t oldest_write_timestamp() const { return 
rowset_meta()->oldest_write_timestamp(); }
     int64_t newest_write_timestamp() const { return 
rowset_meta()->newest_write_timestamp(); }
+    bool is_segments_overlapping() const { return 
rowset_meta()->is_segments_overlapping(); }
     KeysType keys_type() { return _schema->keys_type(); }
 
     // remove all files in this rowset
@@ -198,7 +199,8 @@ public:
     }
 
     // hard link all files in this rowset to `dir` to form a new rowset with 
id `new_rowset_id`.
-    virtual Status link_files_to(const std::string& dir, RowsetId 
new_rowset_id) = 0;
+    virtual Status link_files_to(const std::string& dir, RowsetId 
new_rowset_id,
+                                 size_t new_rowset_start_seg_id = 0) = 0;
 
     // copy all files to `dir`
     virtual Status copy_files_to(const std::string& dir, const RowsetId& 
new_rowset_id) = 0;
@@ -265,6 +267,16 @@ public:
         _rowset_meta->get_segments_key_bounds(segments_key_bounds);
         return Status::OK();
     }
+    std::string min_key() {
+        KeyBoundsPB key_bounds;
+        _rowset_meta->get_first_segment_key_bound(&key_bounds);
+        return key_bounds.min_key();
+    }
+    std::string max_key() {
+        KeyBoundsPB key_bounds;
+        _rowset_meta->get_last_segment_key_bound(&key_bounds);
+        return key_bounds.max_key();
+    }
 
     bool check_rowset_segment();
 
diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h
index 7154116e7f..a4a7059d39 100644
--- a/be/src/olap/rowset/rowset_meta.h
+++ b/be/src/olap/rowset/rowset_meta.h
@@ -320,6 +320,15 @@ public:
             segments_key_bounds->push_back(key_range);
         }
     }
+    virtual void get_first_segment_key_bound(KeyBoundsPB* key_bounds) {
+        DCHECK(_rowset_meta_pb.segments_key_bounds_size() > 0);
+        *key_bounds = _rowset_meta_pb.segments_key_bounds(0);
+    }
+    virtual void get_last_segment_key_bound(KeyBoundsPB* key_bounds) {
+        DCHECK(_rowset_meta_pb.segments_key_bounds_size() > 0);
+        *key_bounds =
+                
_rowset_meta_pb.segments_key_bounds(_rowset_meta_pb.segments_key_bounds_size() 
- 1);
+    }
 
     void set_segments_key_bounds(const std::vector<KeyBoundsPB>& 
segments_key_bounds) {
         for (const KeyBoundsPB& key_bounds : segments_key_bounds) {
diff --git a/be/src/olap/rowset/rowset_writer.h 
b/be/src/olap/rowset/rowset_writer.h
index f5d095a48c..531dd103c8 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -82,6 +82,9 @@ public:
     // real build will be called in DeltaWriter close_wait.
     virtual RowsetSharedPtr build_tmp() = 0;
 
+    // For ordered rowset compaction, manual build rowset
+    virtual RowsetSharedPtr manual_build(const RowsetMetaSharedPtr& 
rowset_meta) = 0;
+
     virtual Version version() = 0;
 
     virtual int64_t num_rows() const = 0;
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 971706abbf..284738e97d 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -51,11 +51,9 @@ SegmentWriter::SegmentWriter(io::FileWriter* file_writer, 
uint32_t segment_id,
           _mem_tracker(std::make_unique<MemTracker>("SegmentWriter:Segment-" +
                                                     
std::to_string(segment_id))) {
     CHECK_NOTNULL(file_writer);
-    if (_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write) {
-        _num_key_columns = _tablet_schema->num_key_columns();
-    } else {
-        _num_key_columns = _tablet_schema->num_short_key_columns();
-    }
+    _num_key_columns = _tablet_schema->num_key_columns();
+    _num_short_key_columns = _tablet_schema->num_short_key_columns();
+    DCHECK(_num_key_columns >= _num_short_key_columns);
     for (size_t cid = 0; cid < _num_key_columns; ++cid) {
         const auto& column = _tablet_schema->column(cid);
         _key_coders.push_back(get_key_coder(column.type()));
@@ -206,10 +204,15 @@ Status SegmentWriter::append_block(const 
vectorized::Block* block, size_t row_po
             // create primary indexes
             for (size_t pos = 0; pos < num_rows; pos++) {
                 RETURN_IF_ERROR(
-                        
_primary_key_index_builder->add_item(_encode_keys(key_columns, pos)));
+                        
_primary_key_index_builder->add_item(_full_encode_keys(key_columns, pos)));
             }
         } else {
-            // create short key indexes
+            // create short key indexes'
+            // for min_max key
+            for (size_t pos = 0; pos < num_rows; pos++) {
+                set_min_max_key(_full_encode_keys(key_columns, pos));
+            }
+            key_columns.resize(_num_short_key_columns);
             for (const auto pos : short_key_pos) {
                 
RETURN_IF_ERROR(_short_key_index_builder->add_item(_encode_keys(key_columns, 
pos)));
             }
@@ -233,17 +236,15 @@ int64_t SegmentWriter::max_row_to_add(size_t 
row_avg_size_in_bytes) {
     return std::min(size_rows, count_rows);
 }
 
-std::string SegmentWriter::_encode_keys(
+std::string SegmentWriter::_full_encode_keys(
         const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, 
size_t pos,
         bool null_first) {
-    if (_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write &&
-        _tablet_schema->has_sequence_col()) {
+    assert(_key_index_size.size() == _num_key_columns);
+    if (_tablet_schema->has_sequence_col() && 
_opts.enable_unique_key_merge_on_write) {
         assert(key_columns.size() == _num_key_columns + 1 &&
-               _key_coders.size() == _num_key_columns + 1 &&
-               _key_index_size.size() == _num_key_columns);
+               _key_coders.size() == _num_key_columns + 1);
     } else {
-        assert(key_columns.size() == _num_key_columns && _key_coders.size() == 
_num_key_columns &&
-               _key_index_size.size() == _num_key_columns);
+        assert(key_columns.size() == _num_key_columns && _key_coders.size() == 
_num_key_columns);
     }
 
     std::string encoded_keys;
@@ -259,11 +260,31 @@ std::string SegmentWriter::_encode_keys(
             continue;
         }
         encoded_keys.push_back(KEY_NORMAL_MARKER);
-        if (_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write) {
-            _key_coders[cid]->full_encode_ascending(field, &encoded_keys);
-        } else {
-            _key_coders[cid]->encode_ascending(field, _key_index_size[cid], 
&encoded_keys);
+        _key_coders[cid]->full_encode_ascending(field, &encoded_keys);
+        ++cid;
+    }
+    return encoded_keys;
+}
+
+std::string SegmentWriter::_encode_keys(
+        const std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns, 
size_t pos,
+        bool null_first) {
+    assert(key_columns.size() == _num_short_key_columns);
+
+    std::string encoded_keys;
+    size_t cid = 0;
+    for (const auto& column : key_columns) {
+        auto field = column->get_data_at(pos);
+        if (UNLIKELY(!field)) {
+            if (null_first) {
+                encoded_keys.push_back(KEY_NULL_FIRST_MARKER);
+            } else {
+                encoded_keys.push_back(KEY_NULL_LAST_MARKER);
+            }
+            continue;
         }
+        encoded_keys.push_back(KEY_NORMAL_MARKER);
+        _key_coders[cid]->encode_ascending(field, _key_index_size[cid], 
&encoded_keys);
         ++cid;
     }
     return encoded_keys;
@@ -275,24 +296,25 @@ Status SegmentWriter::append_row(const RowType& row) {
         auto cell = row.cell(cid);
         RETURN_IF_ERROR(_column_writers[cid]->append(cell));
     }
+    std::string full_encoded_key;
+    encode_key<RowType, true, true>(&full_encoded_key, row, _num_key_columns);
+    if (_tablet_schema->has_sequence_col()) {
+        full_encoded_key.push_back(KEY_NORMAL_MARKER);
+        auto cid = _tablet_schema->sequence_col_idx();
+        auto cell = row.cell(cid);
+        row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), 
&full_encoded_key);
+    }
 
     if (_tablet_schema->keys_type() == UNIQUE_KEYS && 
_opts.enable_unique_key_merge_on_write) {
-        std::string encoded_key;
-        encode_key<RowType, true, true>(&encoded_key, row, _num_key_columns);
-        if (_tablet_schema->has_sequence_col()) {
-            encoded_key.push_back(KEY_NORMAL_MARKER);
-            auto cid = _tablet_schema->sequence_col_idx();
-            auto cell = row.cell(cid);
-            row.schema()->column(cid)->full_encode_ascending(cell.cell_ptr(), 
&encoded_key);
-        }
-        RETURN_IF_ERROR(_primary_key_index_builder->add_item(encoded_key));
+        
RETURN_IF_ERROR(_primary_key_index_builder->add_item(full_encoded_key));
     } else {
         // At the beginning of one block, so add a short key index entry
         if ((_num_rows_written % _opts.num_rows_per_block) == 0) {
             std::string encoded_key;
-            encode_key(&encoded_key, row, _num_key_columns);
+            encode_key(&encoded_key, row, _num_short_key_columns);
             RETURN_IF_ERROR(_short_key_index_builder->add_item(encoded_key));
         }
+        set_min_max_key(full_encoded_key);
     }
     ++_num_rows_written;
     return Status::OK();
@@ -465,13 +487,22 @@ Status SegmentWriter::_write_raw_data(const 
std::vector<Slice>& slices) {
 }
 
 Slice SegmentWriter::min_encoded_key() {
-    return (_primary_key_index_builder == nullptr) ? Slice()
+    return (_primary_key_index_builder == nullptr) ? Slice(_min_key.data(), 
_min_key.size())
                                                    : 
_primary_key_index_builder->min_key();
 }
 Slice SegmentWriter::max_encoded_key() {
-    return (_primary_key_index_builder == nullptr) ? Slice()
+    return (_primary_key_index_builder == nullptr) ? Slice(_max_key.data(), 
_max_key.size())
                                                    : 
_primary_key_index_builder->max_key();
 }
 
+void SegmentWriter::set_min_max_key(const Slice& key) {
+    if (UNLIKELY(_is_first_row)) {
+        _min_key.append(key.get_data(), key.get_size());
+        _is_first_row = false;
+    }
+    _max_key.clear();
+    _max_key.append(key.get_data(), key.get_size());
+}
+
 } // namespace segment_v2
 } // namespace doris
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.h 
b/be/src/olap/rowset/segment_v2/segment_writer.h
index 8e7041d21b..689224ba8f 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.h
+++ b/be/src/olap/rowset/segment_v2/segment_writer.h
@@ -26,6 +26,7 @@
 #include "gen_cpp/segment_v2.pb.h"
 #include "gutil/macros.h"
 #include "olap/tablet_schema.h"
+#include "util/faststring.h"
 #include "vec/core/block.h"
 #include "vec/olap/olap_data_convertor.h"
 
@@ -112,6 +113,11 @@ private:
     Status _write_raw_data(const std::vector<Slice>& slices);
     std::string _encode_keys(const 
std::vector<vectorized::IOlapColumnDataAccessor*>& key_columns,
                              size_t pos, bool null_first = true);
+    // for unique-key merge on write and segment min_max key
+    std::string _full_encode_keys(
+            const std::vector<vectorized::IOlapColumnDataAccessor*>& 
key_columns, size_t pos,
+            bool null_first = true);
+    void set_min_max_key(const Slice& key);
 
     void _reset_column_writers();
 
@@ -127,6 +133,7 @@ private:
 
     SegmentFooterPB _footer;
     size_t _num_key_columns;
+    size_t _num_short_key_columns;
     std::unique_ptr<ShortKeyIndexBuilder> _short_key_index_builder;
     std::unique_ptr<PrimaryKeyIndexBuilder> _primary_key_index_builder;
     std::vector<std::unique_ptr<ColumnWriter>> _column_writers;
@@ -146,6 +153,10 @@ private:
     // In vertical compaction row count is recorded when key columns group 
finish
     //  and _num_rows_written will be updated in value column group
     uint32_t _row_count = 0;
+
+    bool _is_first_row = true;
+    faststring _min_key;
+    faststring _max_key;
 };
 
 } // namespace segment_v2
diff --git a/be/test/io/cache/remote_file_cache_test.cpp 
b/be/test/io/cache/remote_file_cache_test.cpp
index 5de0a5f955..813c8807f5 100644
--- a/be/test/io/cache/remote_file_cache_test.cpp
+++ b/be/test/io/cache/remote_file_cache_test.cpp
@@ -139,8 +139,8 @@ protected:
         EXPECT_TRUE(st.ok());
         EXPECT_TRUE(file_writer->close().ok());
 
-        EXPECT_EQ("", writer.min_encoded_key().to_string());
-        EXPECT_EQ("", writer.max_encoded_key().to_string());
+        EXPECT_NE("", writer.min_encoded_key().to_string());
+        EXPECT_NE("", writer.max_encoded_key().to_string());
 
         st = segment_v2::Segment::open(fs, path, "", 0, {}, query_schema, res);
         EXPECT_TRUE(st.ok());
diff --git a/be/test/olap/rowid_conversion_test.cpp 
b/be/test/olap/rowid_conversion_test.cpp
index 27b43fec3c..01ac4a667c 100644
--- a/be/test/olap/rowid_conversion_test.cpp
+++ b/be/test/olap/rowid_conversion_test.cpp
@@ -75,7 +75,7 @@ protected:
         TabletSchemaSPtr tablet_schema = std::make_shared<TabletSchema>();
         TabletSchemaPB tablet_schema_pb;
         tablet_schema_pb.set_keys_type(keys_type);
-        tablet_schema_pb.set_num_short_key_columns(2);
+        tablet_schema_pb.set_num_short_key_columns(1);
         tablet_schema_pb.set_num_rows_per_row_block(1024);
         tablet_schema_pb.set_compress_kind(COMPRESS_NONE);
         tablet_schema_pb.set_next_column_unique_id(4);
diff --git a/be/test/olap/rowset/segment_v2/segment_test.cpp 
b/be/test/olap/rowset/segment_v2/segment_test.cpp
index 068d5b3ee3..5f9ff06d2b 100644
--- a/be/test/olap/rowset/segment_v2/segment_test.cpp
+++ b/be/test/olap/rowset/segment_v2/segment_test.cpp
@@ -146,29 +146,22 @@ protected:
         EXPECT_TRUE(st.ok());
         EXPECT_TRUE(file_writer->close().ok());
         // Check min/max key generation
-        if (build_schema->keys_type() == UNIQUE_KEYS && 
opts.enable_unique_key_merge_on_write) {
-            // Create min row
-            for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) {
-                RowCursorCell cell = row.cell(cid);
-                generator(0, cid, 0 / opts.num_rows_per_block, cell);
-            }
-            std::string min_encoded_key;
-            encode_key<RowCursor, true, true>(&min_encoded_key, row,
-                                              build_schema->num_key_columns());
-            EXPECT_EQ(min_encoded_key, writer.min_encoded_key().to_string());
-            // Create max row
-            for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) {
-                RowCursorCell cell = row.cell(cid);
-                generator(nrows - 1, cid, (nrows - 1) / 
opts.num_rows_per_block, cell);
-            }
-            std::string max_encoded_key;
-            encode_key<RowCursor, true, true>(&max_encoded_key, row,
-                                              build_schema->num_key_columns());
-            EXPECT_EQ(max_encoded_key, writer.max_encoded_key().to_string());
-        } else {
-            EXPECT_EQ("", writer.min_encoded_key().to_string());
-            EXPECT_EQ("", writer.max_encoded_key().to_string());
+        // Create min row
+        for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) {
+            RowCursorCell cell = row.cell(cid);
+            generator(0, cid, 0 / opts.num_rows_per_block, cell);
+        }
+        std::string min_encoded_key;
+        encode_key<RowCursor, true, true>(&min_encoded_key, row, 
build_schema->num_key_columns());
+        EXPECT_EQ(min_encoded_key, writer.min_encoded_key().to_string());
+        // Create max row
+        for (int cid = 0; cid < build_schema->num_key_columns(); ++cid) {
+            RowCursorCell cell = row.cell(cid);
+            generator(nrows - 1, cid, (nrows - 1) / opts.num_rows_per_block, 
cell);
         }
+        std::string max_encoded_key;
+        encode_key<RowCursor, true, true>(&max_encoded_key, row, 
build_schema->num_key_columns());
+        EXPECT_EQ(max_encoded_key, writer.max_encoded_key().to_string());
 
         st = Segment::open(fs, path, "", 0, {}, query_schema, res);
         EXPECT_TRUE(st.ok());
diff --git a/be/test/testutil/mock_rowset.h b/be/test/testutil/mock_rowset.h
index 17865ce3d8..234a64eeab 100644
--- a/be/test/testutil/mock_rowset.h
+++ b/be/test/testutil/mock_rowset.h
@@ -37,7 +37,8 @@ class MockRowset : public Rowset {
         return Status::NotSupported("MockRowset not support this method.");
     }
 
-    virtual Status link_files_to(const std::string& dir, RowsetId 
new_rowset_id) override {
+    virtual Status link_files_to(const std::string& dir, RowsetId 
new_rowset_id,
+                                 size_t start_seg_id) override {
         return Status::NotSupported("MockRowset not support this method.");
     }
 
diff --git a/build-support/clang-format.sh b/build-support/clang-format.sh
index 9dfeb25167..dd04d4ba23 100755
--- a/build-support/clang-format.sh
+++ b/build-support/clang-format.sh
@@ -31,6 +31,7 @@ DORIS_HOME=$(
 )
 export DORIS_HOME
 
-CLANG_FORMAT="${CLANG_FORMAT_BINARY:=$(which clang-format)}"
+#CLANG_FORMAT="${CLANG_FORMAT_BINARY:=$(which clang-format)}"
+CLANG_FORMAT="/mnt/disk1/yixiu/incubator-doris/clang-format"
 
 python "${DORIS_HOME}/build-support/run_clang_format.py" 
"--clang-format-executable" "${CLANG_FORMAT}" "-r" "--style" "file" "--inplace" 
"true" "--extensions" "c,h,C,H,cpp,hpp,cc,hh,c++,h++,cxx,hxx" "--exclude" 
"none" "be/src be/test"
diff --git a/regression-test/conf/regression-conf.groovy 
b/regression-test/conf/regression-conf.groovy
index 49d20902c1..67288da324 100644
--- a/regression-test/conf/regression-conf.groovy
+++ b/regression-test/conf/regression-conf.groovy
@@ -81,4 +81,4 @@ pg_14_port=5442
 // See `docker/thirdparties/start-thirdparties-docker.sh`
 enableHiveTest=false
 hms_port=9183
-hdfs_port=8120
+hdfs_port=8120
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to