This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 4e165dc7ce1 Revert "[enhancement](compaction) optimizing memory usage for compaction (#36492)" (#37032) 4e165dc7ce1 is described below commit 4e165dc7ce15f17e0c72ae5ea0c1caf29bdfc157 Author: Yongqiang YANG <98214048+dataroar...@users.noreply.github.com> AuthorDate: Sun Jun 30 11:08:22 2024 +0800 Revert "[enhancement](compaction) optimizing memory usage for compaction (#36492)" (#37032) This reverts commit 99901814d8b90887f54b1768b98b4f0b78fab376. --- be/src/cloud/cloud_base_compaction.cpp | 10 -- be/src/cloud/cloud_cumulative_compaction.cpp | 13 +- be/src/common/config.cpp | 6 - be/src/common/config.h | 6 - be/src/olap/base_compaction.cpp | 10 -- be/src/olap/base_tablet.h | 5 - be/src/olap/compaction.cpp | 15 +-- be/src/olap/compaction.h | 2 - be/src/olap/cumulative_compaction.cpp | 15 +-- be/src/olap/iterators.h | 15 +-- be/src/olap/merger.cpp | 67 +--------- be/src/olap/merger.h | 6 +- be/src/olap/rowset/rowset_meta.h | 15 --- be/src/olap/rowset/segcompaction.cpp | 2 +- be/src/olap/tablet_reader.h | 2 - be/src/vec/olap/vertical_block_reader.cpp | 18 +-- be/src/vec/olap/vertical_block_reader.h | 3 +- be/src/vec/olap/vertical_merge_iterator.cpp | 29 ++--- be/src/vec/olap/vertical_merge_iterator.h | 25 +--- be/test/olap/base_compaction_test.cpp | 84 ------------- be/test/olap/rowid_conversion_test.cpp | 6 +- be/test/vec/olap/vertical_compaction_test.cpp | 14 +-- .../compaction_width_array_column.groovy | 137 --------------------- 23 files changed, 42 insertions(+), 463 deletions(-) diff --git a/be/src/cloud/cloud_base_compaction.cpp b/be/src/cloud/cloud_base_compaction.cpp index 4ceab8eb6e3..d4a86743a48 100644 --- a/be/src/cloud/cloud_base_compaction.cpp +++ b/be/src/cloud/cloud_base_compaction.cpp @@ -163,16 +163,6 @@ Status CloudBaseCompaction::pick_rowsets_to_compact() { return Status::Error<BE_NO_SUITABLE_VERSION>("no suitable versions for compaction"); } - int score = 0; - int rowset_cnt = 0; - while (rowset_cnt < _input_rowsets.size()) { - score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score(); - if (score > config::base_compaction_max_compaction_score) { - break; - } - } - _input_rowsets.resize(rowset_cnt); - // 1. cumulative rowset must reach base_compaction_min_rowset_num threshold if (_input_rowsets.size() > config::base_compaction_min_rowset_num) { VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() diff --git a/be/src/cloud/cloud_cumulative_compaction.cpp b/be/src/cloud/cloud_cumulative_compaction.cpp index 2a26b1b294b..de318f979a5 100644 --- a/be/src/cloud/cloud_cumulative_compaction.cpp +++ b/be/src/cloud/cloud_cumulative_compaction.cpp @@ -354,20 +354,11 @@ Status CloudCumulativeCompaction::pick_rowsets_to_compact() { return st; } - int64_t max_score = config::cumulative_compaction_max_deltas; - auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); - bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8; - if (cloud_tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || - memory_usage_high) { - max_score = std::max(config::cumulative_compaction_max_deltas / - config::cumulative_compaction_max_deltas_factor, - config::cumulative_compaction_min_deltas + 1); - } - size_t compaction_score = 0; auto compaction_policy = cloud_tablet()->tablet_meta()->compaction_policy(); _engine.cumu_compaction_policy(compaction_policy) - ->pick_input_rowsets(cloud_tablet(), candidate_rowsets, max_score, + ->pick_input_rowsets(cloud_tablet(), candidate_rowsets, + config::cumulative_compaction_max_deltas, config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version, &compaction_score); diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3e948f4cca2..580793d36ab 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -385,7 +385,6 @@ DEFINE_mInt32(max_single_replica_compaction_threads, "-1"); DEFINE_Bool(enable_base_compaction_idle_sched, "true"); DEFINE_mInt64(base_compaction_min_rowset_num, "5"); -DEFINE_mInt64(base_compaction_max_compaction_score, "20"); DEFINE_mDouble(base_compaction_min_data_ratio, "0.3"); DEFINE_mInt64(base_compaction_dup_key_max_file_size_mbytes, "1024"); @@ -416,7 +415,6 @@ DEFINE_mInt64(compaction_min_size_mbytes, "64"); // cumulative compaction policy: min and max delta file's number DEFINE_mInt64(cumulative_compaction_min_deltas, "5"); DEFINE_mInt64(cumulative_compaction_max_deltas, "1000"); -DEFINE_mInt32(cumulative_compaction_max_deltas_factor, "10"); // This config can be set to limit thread number in multiget thread pool. DEFINE_mInt32(multi_get_max_threads, "10"); @@ -1315,10 +1313,6 @@ DEFINE_Bool(enable_file_logger, "true"); // The minimum row group size when exporting Parquet files. default 128MB DEFINE_Int64(min_row_group_size, "134217728"); -DEFINE_mInt64(compaction_memory_bytes_limit, "1073741824"); - -DEFINE_mInt64(compaction_batch_size, "-1"); - // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 6f0065e2fe3..9920b65fe52 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -438,7 +438,6 @@ DECLARE_mInt32(max_single_replica_compaction_threads); DECLARE_Bool(enable_base_compaction_idle_sched); DECLARE_mInt64(base_compaction_min_rowset_num); -DECLARE_mInt64(base_compaction_max_compaction_score); DECLARE_mDouble(base_compaction_min_data_ratio); DECLARE_mInt64(base_compaction_dup_key_max_file_size_mbytes); @@ -469,7 +468,6 @@ DECLARE_mInt64(compaction_min_size_mbytes); // cumulative compaction policy: min and max delta file's number DECLARE_mInt64(cumulative_compaction_min_deltas); DECLARE_mInt64(cumulative_compaction_max_deltas); -DECLARE_mInt32(cumulative_compaction_max_deltas_factor); // This config can be set to limit thread number in multiget thread pool. DECLARE_mInt32(multi_get_max_threads); @@ -1401,10 +1399,6 @@ DECLARE_Bool(enable_file_logger); // The minimum row group size when exporting Parquet files. DECLARE_Int64(min_row_group_size); -DECLARE_mInt64(compaction_memory_bytes_limit); - -DECLARE_mInt64(compaction_batch_size); - #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp index 8be29383c1e..436180c78ca 100644 --- a/be/src/olap/base_compaction.cpp +++ b/be/src/olap/base_compaction.cpp @@ -151,16 +151,6 @@ Status BaseCompaction::pick_rowsets_to_compact() { "situation, no need to do base compaction."); } - int score = 0; - int rowset_cnt = 0; - while (rowset_cnt < _input_rowsets.size()) { - score += _input_rowsets[rowset_cnt++]->rowset_meta()->get_compaction_score(); - if (score > config::base_compaction_max_compaction_score) { - break; - } - } - _input_rowsets.resize(rowset_cnt); - // 1. cumulative rowset must reach base_compaction_num_cumulative_deltas threshold if (_input_rowsets.size() > config::base_compaction_min_rowset_num) { VLOG_NOTICE << "satisfy the base compaction policy. tablet=" << _tablet->tablet_id() diff --git a/be/src/olap/base_tablet.h b/be/src/olap/base_tablet.h index 4852a6cba9b..dc5f488e044 100644 --- a/be/src/olap/base_tablet.h +++ b/be/src/olap/base_tablet.h @@ -22,7 +22,6 @@ #include <string> #include "common/status.h" -#include "olap/iterators.h" #include "olap/partial_update_info.h" #include "olap/rowset/segment_v2/segment.h" #include "olap/tablet_fwd.h" @@ -300,10 +299,6 @@ public: std::atomic<int64_t> read_block_count = 0; std::atomic<int64_t> write_count = 0; std::atomic<int64_t> compaction_count = 0; - - std::mutex sample_info_lock; - std::vector<CompactionSampleInfo> sample_infos; - Status last_compaction_status = Status::OK(); }; } /* namespace doris */ diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp index b42c23f1874..37dcac5283e 100644 --- a/be/src/olap/compaction.cpp +++ b/be/src/olap/compaction.cpp @@ -149,15 +149,6 @@ void Compaction::init_profile(const std::string& label) { _merge_rowsets_latency_timer = ADD_TIMER(_profile, "merge_rowsets_latency"); } -int64_t Compaction::merge_way_num() { - int64_t way_num = 0; - for (auto&& rowset : _input_rowsets) { - way_num += rowset->rowset_meta()->get_merge_way_num(); - } - - return way_num; -} - Status Compaction::merge_input_rowsets() { std::vector<RowsetReaderSharedPtr> input_rs_readers; input_rs_readers.reserve(_input_rowsets.size()); @@ -179,23 +170,19 @@ Status Compaction::merge_input_rowsets() { _stats.rowid_conversion = &_rowid_conversion; } - int64_t way_num = merge_way_num(); - Status res; { SCOPED_TIMER(_merge_rowsets_latency_timer); if (_is_vertical) { res = Merger::vertical_merge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), - get_avg_segment_rows(), way_num, &_stats); + get_avg_segment_rows(), &_stats); } else { res = Merger::vmerge_rowsets(_tablet, compaction_type(), *_cur_tablet_schema, input_rs_readers, _output_rs_writer.get(), &_stats); } } - _tablet->last_compaction_status = res; - if (!res.ok()) { LOG(WARNING) << "fail to do " << compaction_name() << ". res=" << res << ", tablet=" << _tablet->tablet_id() diff --git a/be/src/olap/compaction.h b/be/src/olap/compaction.h index 8e0c1099a20..9ec1297c69c 100644 --- a/be/src/olap/compaction.h +++ b/be/src/olap/compaction.h @@ -81,8 +81,6 @@ protected: void _load_segment_to_cache(); - int64_t merge_way_num(); - // the root tracker for this compaction std::shared_ptr<MemTrackerLimiter> _mem_tracker; diff --git a/be/src/olap/cumulative_compaction.cpp b/be/src/olap/cumulative_compaction.cpp index 2c7e654787a..1e0f338da23 100644 --- a/be/src/olap/cumulative_compaction.cpp +++ b/be/src/olap/cumulative_compaction.cpp @@ -134,20 +134,11 @@ Status CumulativeCompaction::pick_rowsets_to_compact() { << ", tablet=" << _tablet->tablet_id(); } - int64_t max_score = config::cumulative_compaction_max_deltas; - auto process_memory_usage = doris::GlobalMemoryArbitrator::process_memory_usage(); - bool memory_usage_high = process_memory_usage > MemInfo::soft_mem_limit() * 0.8; - if (tablet()->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || memory_usage_high) { - max_score = std::max(config::cumulative_compaction_max_deltas / - config::cumulative_compaction_max_deltas_factor, - config::cumulative_compaction_min_deltas + 1); - } - size_t compaction_score = 0; tablet()->cumulative_compaction_policy()->pick_input_rowsets( - tablet(), candidate_rowsets, max_score, config::cumulative_compaction_min_deltas, - &_input_rowsets, &_last_delete_version, &compaction_score, - _allow_delete_in_cumu_compaction); + tablet(), candidate_rowsets, config::cumulative_compaction_max_deltas, + config::cumulative_compaction_min_deltas, &_input_rowsets, &_last_delete_version, + &compaction_score, _allow_delete_in_cumu_compaction); // Cumulative compaction will process with at least 1 rowset. // So when there is no rowset being chosen, we should return Status::Error<CUMULATIVE_NO_SUITABLE_VERSION>(): diff --git a/be/src/olap/iterators.h b/be/src/olap/iterators.h index cbf8f1eca65..330aa9e3475 100644 --- a/be/src/olap/iterators.h +++ b/be/src/olap/iterators.h @@ -17,7 +17,6 @@ #pragma once -#include <cstddef> #include <memory> #include "common/status.h" @@ -122,12 +121,6 @@ public: size_t topn_limit = 0; }; -struct CompactionSampleInfo { - int64_t bytes = 0; - int64_t rows = 0; - int64_t group_data_size; -}; - class RowwiseIterator; using RowwiseIteratorUPtr = std::unique_ptr<RowwiseIterator>; class RowwiseIterator { @@ -140,13 +133,7 @@ public: // Input options may contain scan range in which this scan. // Return Status::OK() if init successfully, // Return other error otherwise - virtual Status init(const StorageReadOptions& opts) { - return Status::NotSupported("to be implemented"); - } - - virtual Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) { - return Status::NotSupported("to be implemented"); - } + virtual Status init(const StorageReadOptions& opts) = 0; // If there is any valid data, this function will load data // into input batch with Status::OK() returned diff --git a/be/src/olap/merger.cpp b/be/src/olap/merger.cpp index 4c620d30252..cecbeb163dd 100644 --- a/be/src/olap/merger.cpp +++ b/be/src/olap/merger.cpp @@ -24,7 +24,6 @@ #include <algorithm> #include <iterator> #include <memory> -#include <mutex> #include <numeric> #include <ostream> #include <shared_mutex> @@ -34,9 +33,7 @@ #include "common/config.h" #include "common/logging.h" -#include "common/status.h" #include "olap/base_tablet.h" -#include "olap/iterators.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowid_conversion.h" @@ -46,7 +43,6 @@ #include "olap/rowset/segment_v2/segment_writer.h" #include "olap/storage_engine.h" #include "olap/tablet.h" -#include "olap/tablet_fwd.h" #include "olap/tablet_reader.h" #include "olap/utils.h" #include "util/slice.h" @@ -245,8 +241,7 @@ Status Merger::vertical_compact_one_group( vectorized::RowSourcesBuffer* row_source_buf, const std::vector<RowsetReaderSharedPtr>& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output, - std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size, - CompactionSampleInfo* sample_info) { + std::vector<uint32_t> key_group_cluster_key_idxes) { // build tablet reader VLOG_NOTICE << "vertical compact one group, max_rows_per_segment=" << max_rows_per_segment; vectorized::VerticalBlockReader reader(row_source_buf); @@ -284,8 +279,7 @@ Status Merger::vertical_compact_one_group( reader_params.return_columns = column_group; reader_params.origin_return_columns = &reader_params.return_columns; - reader_params.batch_size = batch_size; - RETURN_IF_ERROR(reader.init(reader_params, sample_info)); + RETURN_IF_ERROR(reader.init(reader_params)); if (reader_params.record_rowids) { stats_output->rowid_conversion->set_dst_rowset_id(dst_rowset_writer->rowset_id()); @@ -391,55 +385,6 @@ Status Merger::vertical_compact_one_group(int64_t tablet_id, ReaderType reader_t return Status::OK(); } -int64_t estimate_batch_size(int group_index, BaseTabletSPtr tablet, int64_t way_cnt) { - std::unique_lock<std::mutex> lock(tablet->sample_info_lock); - CompactionSampleInfo info = tablet->sample_infos[group_index]; - if (way_cnt <= 0) { - LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " - << tablet->tablet_id() << " way cnt: " << way_cnt; - return 4096 - 32; - } - int64_t block_mem_limit = config::compaction_memory_bytes_limit / way_cnt; - if (tablet->last_compaction_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>()) { - block_mem_limit /= 4; - } - - int64_t group_data_size = 0; - if (info.group_data_size > 0 && info.bytes > 0 && info.rows > 0) { - float smoothing_factor = 0.5; - group_data_size = int64_t(info.group_data_size * (1 - smoothing_factor) + - info.bytes / info.rows * smoothing_factor); - tablet->sample_infos[group_index].group_data_size = group_data_size; - } else if (info.group_data_size > 0 && (info.bytes <= 0 || info.rows <= 0)) { - group_data_size = info.group_data_size; - } else if (info.group_data_size <= 0 && info.bytes > 0 && info.rows > 0) { - group_data_size = info.bytes / info.rows; - tablet->sample_infos[group_index].group_data_size = group_data_size; - } else { - LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " - << tablet->tablet_id() << " group data size: " << info.group_data_size - << " row num: " << info.rows << " consume bytes: " << info.bytes; - return 1024 - 32; - } - - if (group_data_size <= 0) { - LOG(WARNING) << "estimate batch size for vertical compaction, tablet id: " - << tablet->tablet_id() << " unexpected group data size: " << group_data_size; - return 4096 - 32; - } - - tablet->sample_infos[group_index].bytes = 0; - tablet->sample_infos[group_index].rows = 0; - - int64_t batch_size = block_mem_limit / group_data_size; - int64_t res = std::max(std::min(batch_size, int64_t(4096 - 32)), 32L); - LOG(INFO) << "estimate batch size for vertical compaction, tablet id: " << tablet->tablet_id() - << " group data size: " << info.group_data_size << " row num: " << info.rows - << " consume bytes: " << info.bytes << " way cnt: " << way_cnt - << " batch size: " << res; - return res; -} - // steps to do vertical merge: // 1. split columns into column groups // 2. compact groups one by one, generate a row_source_buf when compact key group @@ -449,7 +394,7 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t const TabletSchema& tablet_schema, const std::vector<RowsetReaderSharedPtr>& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, - int64_t merge_way_num, Statistics* stats_output) { + Statistics* stats_output) { LOG(INFO) << "Start to do vertical compaction, tablet_id: " << tablet->tablet_id(); std::vector<std::vector<uint32_t>> column_groups; vertical_split_columns(tablet_schema, &column_groups); @@ -460,18 +405,14 @@ Status Merger::vertical_merge_rowsets(BaseTabletSPtr tablet, ReaderType reader_t vectorized::RowSourcesBuffer row_sources_buf( tablet->tablet_id(), dst_rowset_writer->context().tablet_path, reader_type); - tablet->sample_infos.resize(column_groups.size(), {0, 0, 0}); // compact group one by one for (auto i = 0; i < column_groups.size(); ++i) { VLOG_NOTICE << "row source size: " << row_sources_buf.total_size(); bool is_key = (i == 0); - int64_t batch_size = config::compaction_batch_size != -1 - ? config::compaction_batch_size - : estimate_batch_size(i, tablet, merge_way_num); RETURN_IF_ERROR(vertical_compact_one_group( tablet, reader_type, tablet_schema, is_key, column_groups[i], &row_sources_buf, src_rowset_readers, dst_rowset_writer, max_rows_per_segment, stats_output, - key_group_cluster_key_idxes, batch_size, &(tablet->sample_infos[i]))); + key_group_cluster_key_idxes)); if (is_key) { RETURN_IF_ERROR(row_sources_buf.flush()); } diff --git a/be/src/olap/merger.h b/be/src/olap/merger.h index 7513c90fbd1..5749f518136 100644 --- a/be/src/olap/merger.h +++ b/be/src/olap/merger.h @@ -21,7 +21,6 @@ #include "common/status.h" #include "io/io_common.h" -#include "olap/iterators.h" #include "olap/rowset/rowset_fwd.h" #include "olap/tablet_fwd.h" @@ -60,7 +59,7 @@ public: static Status vertical_merge_rowsets( BaseTabletSPtr tablet, ReaderType reader_type, const TabletSchema& tablet_schema, const std::vector<RowsetReaderSharedPtr>& src_rowset_readers, - RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, int64_t merge_way_num, + RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output); // for vertical compaction @@ -72,8 +71,7 @@ public: vectorized::RowSourcesBuffer* row_source_buf, const std::vector<RowsetReaderSharedPtr>& src_rowset_readers, RowsetWriter* dst_rowset_writer, int64_t max_rows_per_segment, Statistics* stats_output, - std::vector<uint32_t> key_group_cluster_key_idxes, int64_t batch_size, - CompactionSampleInfo* sample_info); + std::vector<uint32_t> key_group_cluster_key_idxes); // for segcompaction static Status vertical_compact_one_group(int64_t tablet_id, ReaderType reader_type, diff --git a/be/src/olap/rowset/rowset_meta.h b/be/src/olap/rowset/rowset_meta.h index aa20b5b1ef1..90b2ce48a0a 100644 --- a/be/src/olap/rowset/rowset_meta.h +++ b/be/src/olap/rowset/rowset_meta.h @@ -269,21 +269,6 @@ public: return score; } - uint32_t get_merge_way_num() const { - uint32_t way_num = 0; - if (!is_segments_overlapping()) { - if (num_segments() == 0) { - way_num = 0; - } else { - way_num = 1; - } - } else { - way_num = num_segments(); - CHECK(way_num > 0); - } - return way_num; - } - void get_segments_key_bounds(std::vector<KeyBoundsPB>* segments_key_bounds) const { for (const KeyBoundsPB& key_range : _rowset_meta_pb.segments_key_bounds()) { segments_key_bounds->push_back(key_range); diff --git a/be/src/olap/rowset/segcompaction.cpp b/be/src/olap/rowset/segcompaction.cpp index 95f2a945134..22a7049aa8f 100644 --- a/be/src/olap/rowset/segcompaction.cpp +++ b/be/src/olap/rowset/segcompaction.cpp @@ -101,7 +101,7 @@ Status SegcompactionWorker::_get_segcompaction_reader( reader_params.tablet = tablet; reader_params.return_columns = return_columns; reader_params.is_key_column_group = is_key; - return (*reader)->init(reader_params, nullptr); + return (*reader)->init(reader_params); } std::unique_ptr<segment_v2::SegmentWriter> SegcompactionWorker::_create_segcompaction_writer( diff --git a/be/src/olap/tablet_reader.h b/be/src/olap/tablet_reader.h index c257ba007f5..a3cd3bd4a49 100644 --- a/be/src/olap/tablet_reader.h +++ b/be/src/olap/tablet_reader.h @@ -183,8 +183,6 @@ public: void check_validation() const; std::string to_string() const; - - int64_t batch_size = -1; }; TabletReader() = default; diff --git a/be/src/vec/olap/vertical_block_reader.cpp b/be/src/vec/olap/vertical_block_reader.cpp index 872836c91cd..c4dda20f40f 100644 --- a/be/src/vec/olap/vertical_block_reader.cpp +++ b/be/src/vec/olap/vertical_block_reader.cpp @@ -25,8 +25,6 @@ #include <ostream> #include "cloud/config.h" -#include "olap/compaction.h" -#include "olap/iterators.h" #include "olap/olap_common.h" #include "olap/olap_define.h" #include "olap/rowset/rowset.h" @@ -110,8 +108,7 @@ Status VerticalBlockReader::_get_segment_iterators(const ReaderParams& read_para return Status::OK(); } -Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params, - CompactionSampleInfo* sample_info) { +Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params) { std::vector<bool> iterator_init_flag; std::vector<RowsetId> rowset_ids; std::vector<RowwiseIteratorUPtr>* segment_iters_ptr = read_params.segment_iters_ptr; @@ -160,8 +157,7 @@ Status VerticalBlockReader::_init_collect_iter(const ReaderParams& read_params, // init collect iterator StorageReadOptions opts; opts.record_rowids = read_params.record_rowids; - opts.block_row_max = read_params.batch_size; - RETURN_IF_ERROR(_vcollect_iter->init(opts, sample_info)); + RETURN_IF_ERROR(_vcollect_iter->init(opts)); // In agg keys value columns compact, get first row for _init_agg_state if (!read_params.is_key_column_group && read_params.tablet->keys_type() == KeysType::AGG_KEYS) { @@ -208,17 +204,13 @@ void VerticalBlockReader::_init_agg_state(const ReaderParams& read_params) { } Status VerticalBlockReader::init(const ReaderParams& read_params) { - return init(read_params, nullptr); -} - -Status VerticalBlockReader::init(const ReaderParams& read_params, - CompactionSampleInfo* sample_info) { StorageReadOptions opts; - _reader_context.batch_size = read_params.batch_size; + _reader_context.batch_size = opts.block_row_max; RETURN_IF_ERROR(TabletReader::init(read_params)); _arena = std::make_unique<Arena>(); - auto status = _init_collect_iter(read_params, sample_info); + + auto status = _init_collect_iter(read_params); if (!status.ok()) [[unlikely]] { if (!config::is_cloud_mode()) { static_cast<Tablet*>(_tablet.get())->report_error(status); diff --git a/be/src/vec/olap/vertical_block_reader.h b/be/src/vec/olap/vertical_block_reader.h index e1e8cfa1239..81ef8d79100 100644 --- a/be/src/vec/olap/vertical_block_reader.h +++ b/be/src/vec/olap/vertical_block_reader.h @@ -56,7 +56,6 @@ public: // Initialize VerticalBlockReader with tablet, data version and fetch range. Status init(const ReaderParams& read_params) override; - Status init(const ReaderParams& read_params, CompactionSampleInfo* sample_info); Status next_block_with_aggregation(Block* block, bool* eof) override; @@ -80,7 +79,7 @@ private: // to minimize the comparison time in merge heap. Status _unique_key_next_block(Block* block, bool* eof); - Status _init_collect_iter(const ReaderParams& read_params, CompactionSampleInfo* sample_info); + Status _init_collect_iter(const ReaderParams& read_params); Status _get_segment_iterators(const ReaderParams& read_params, std::vector<RowwiseIteratorUPtr>* segment_iters, diff --git a/be/src/vec/olap/vertical_merge_iterator.cpp b/be/src/vec/olap/vertical_merge_iterator.cpp index 81cfc756d63..3323492ee90 100644 --- a/be/src/vec/olap/vertical_merge_iterator.cpp +++ b/be/src/vec/olap/vertical_merge_iterator.cpp @@ -21,7 +21,6 @@ #include <gen_cpp/olap_file.pb.h> #include <stdlib.h> -#include <cstddef> #include <ostream> #include "cloud/config.h" @@ -30,7 +29,6 @@ #include "common/logging.h" #include "io/cache/block_file_cache_factory.h" #include "olap/field.h" -#include "olap/iterators.h" #include "olap/olap_common.h" #include "vec/columns/column.h" #include "vec/common/string_ref.h" @@ -342,18 +340,13 @@ Status VerticalMergeIteratorContext::copy_rows(Block* block, bool advanced) { return Status::OK(); } -Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts, - CompactionSampleInfo* sample_info) { +Status VerticalMergeIteratorContext::init(const StorageReadOptions& opts) { if (LIKELY(_inited)) { return Status::OK(); } _block_row_max = opts.block_row_max; _record_rowids = opts.record_rowids; RETURN_IF_ERROR(_load_next_block()); - if (sample_info != nullptr) { - sample_info->bytes += bytes(); - sample_info->rows += rows(); - } if (valid()) { RETURN_IF_ERROR(advance()); } @@ -512,8 +505,7 @@ Status VerticalHeapMergeIterator::next_batch(Block* block) { return Status::EndOfFile("no more data in segment"); } -Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts, - CompactionSampleInfo* sample_info) { +Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts) { DCHECK(_origin_iters.size() == _iterator_init_flags.size()); _record_rowids = opts.record_rowids; if (_origin_iters.empty()) { @@ -541,7 +533,7 @@ Status VerticalHeapMergeIterator::init(const StorageReadOptions& opts, for (size_t i = 0; i < num_iters; ++i) { if (_iterator_init_flags[i] || pre_iter_invalid) { auto& ctx = _ori_iter_ctx[i]; - RETURN_IF_ERROR(ctx->init(opts, sample_info)); + RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { pre_iter_invalid = true; continue; @@ -614,8 +606,7 @@ Status VerticalFifoMergeIterator::next_batch(Block* block) { return Status::EndOfFile("no more data in segment"); } -Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts, - CompactionSampleInfo* sample_info) { +Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts) { DCHECK(_origin_iters.size() == _iterator_init_flags.size()); DCHECK(_keys_type == KeysType::DUP_KEYS); _record_rowids = opts.record_rowids; @@ -635,7 +626,7 @@ Status VerticalFifoMergeIterator::init(const StorageReadOptions& opts, std::unique_ptr<VerticalMergeIteratorContext> ctx( new VerticalMergeIteratorContext(std::move(iter), _rowset_ids[seg_order], _ori_return_cols, seg_order, _seq_col_idx)); - RETURN_IF_ERROR(ctx->init(opts, sample_info)); + RETURN_IF_ERROR(ctx->init(opts)); if (!ctx->valid()) { ++seg_order; continue; @@ -676,7 +667,7 @@ Status VerticalMaskMergeIterator::next_row(vectorized::IteratorRowRef* ref) { uint16_t order = row_source.get_source_num(); auto& ctx = _origin_iter_ctx[order]; // init ctx and this ctx must be valid - RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); + RETURN_IF_ERROR(ctx->init(_opts)); DCHECK(ctx->valid()); if (UNLIKELY(ctx->is_first_row())) { @@ -710,7 +701,7 @@ Status VerticalMaskMergeIterator::unique_key_next_row(vectorized::IteratorRowRef auto row_source = _row_sources_buf->current(); uint16_t order = row_source.get_source_num(); auto& ctx = _origin_iter_ctx[order]; - RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); + RETURN_IF_ERROR(ctx->init(_opts)); DCHECK(ctx->valid()); if (!ctx->valid()) { LOG(INFO) << "VerticalMergeIteratorContext not valid"; @@ -749,7 +740,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) { uint16_t order = _row_sources_buf->current().get_source_num(); DCHECK(order < _origin_iter_ctx.size()); auto& ctx = _origin_iter_ctx[order]; - RETURN_IF_ERROR(ctx->init(_opts, _sample_info)); + RETURN_IF_ERROR(ctx->init(_opts)); DCHECK(ctx->valid()); if (!ctx->valid()) { LOG(INFO) << "VerticalMergeIteratorContext not valid"; @@ -772,8 +763,7 @@ Status VerticalMaskMergeIterator::next_batch(Block* block) { return st; } -Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts, - CompactionSampleInfo* sample_info) { +Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts) { if (_origin_iters.empty()) { return Status::OK(); } @@ -788,7 +778,6 @@ Status VerticalMaskMergeIterator::init(const StorageReadOptions& opts, } _origin_iters.clear(); - _sample_info = sample_info; _block_row_max = opts.block_row_max; return Status::OK(); } diff --git a/be/src/vec/olap/vertical_merge_iterator.h b/be/src/vec/olap/vertical_merge_iterator.h index 3751aa92c78..f46a0446cf2 100644 --- a/be/src/vec/olap/vertical_merge_iterator.h +++ b/be/src/vec/olap/vertical_merge_iterator.h @@ -164,7 +164,7 @@ public: ~VerticalMergeIteratorContext() = default; Status block_reset(const std::shared_ptr<Block>& block); - Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info = nullptr); + Status init(const StorageReadOptions& opts); bool compare(const VerticalMergeIteratorContext& rhs) const; Status copy_rows(Block* block, bool advanced = true); Status copy_rows(Block* block, size_t count); @@ -200,22 +200,6 @@ public: return _block_row_locations[_index_in_block]; } - size_t bytes() { - if (_block) { - return _block->bytes(); - } else { - return 0; - } - } - - size_t rows() { - if (_block) { - return _block->rows(); - } else { - return 0; - } - } - private: // Load next block into _block Status _load_next_block(); @@ -271,7 +255,7 @@ public: VerticalHeapMergeIterator(const VerticalHeapMergeIterator&) = delete; VerticalHeapMergeIterator& operator=(const VerticalHeapMergeIterator&) = delete; - Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; + Status init(const StorageReadOptions& opts) override; Status next_batch(Block* block) override; const Schema& schema() const override { return *_schema; } uint64_t merged_rows() const override { return _merged_rows; } @@ -337,7 +321,7 @@ public: VerticalFifoMergeIterator(const VerticalFifoMergeIterator&) = delete; VerticalFifoMergeIterator& operator=(const VerticalFifoMergeIterator&) = delete; - Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; + Status init(const StorageReadOptions& opts) override; Status next_batch(Block* block) override; const Schema& schema() const override { return *_schema; } uint64_t merged_rows() const override { return _merged_rows; } @@ -383,7 +367,7 @@ public: VerticalMaskMergeIterator(const VerticalMaskMergeIterator&) = delete; VerticalMaskMergeIterator& operator=(const VerticalMaskMergeIterator&) = delete; - Status init(const StorageReadOptions& opts, CompactionSampleInfo* sample_info) override; + Status init(const StorageReadOptions& opts) override; Status next_batch(Block* block) override; @@ -412,7 +396,6 @@ private: size_t _filtered_rows = 0; RowSourcesBuffer* _row_sources_buf; StorageReadOptions _opts; - CompactionSampleInfo* _sample_info = nullptr; }; // segment merge iterator diff --git a/be/test/olap/base_compaction_test.cpp b/be/test/olap/base_compaction_test.cpp deleted file mode 100644 index 7d9abe54ed2..00000000000 --- a/be/test/olap/base_compaction_test.cpp +++ /dev/null @@ -1,84 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "olap/base_compaction.h" - -#include <gen_cpp/AgentService_types.h> -#include <gen_cpp/olap_file.pb.h> -#include <gtest/gtest-message.h> -#include <gtest/gtest-test-part.h> - -#include "gtest/gtest.h" -#include "gtest/gtest_pred_impl.h" -#include "olap/cumulative_compaction.h" -#include "olap/cumulative_compaction_policy.h" -#include "olap/olap_common.h" -#include "olap/rowset/rowset_factory.h" -#include "olap/rowset/rowset_meta.h" -#include "olap/storage_engine.h" -#include "olap/tablet.h" -#include "olap/tablet_meta.h" -#include "util/uid_util.h" - -namespace doris { - -class TestBaseCompaction : public testing::Test {}; - -static RowsetSharedPtr create_rowset(Version version, int num_segments, bool overlapping, - int data_size) { - auto rs_meta = std::make_shared<RowsetMeta>(); - rs_meta->set_rowset_type(BETA_ROWSET); // important - rs_meta->_rowset_meta_pb.set_start_version(version.first); - rs_meta->_rowset_meta_pb.set_end_version(version.second); - rs_meta->set_num_segments(num_segments); - rs_meta->set_segments_overlap(overlapping ? OVERLAPPING : NONOVERLAPPING); - rs_meta->set_total_disk_size(data_size); - RowsetSharedPtr rowset; - Status st = RowsetFactory::create_rowset(nullptr, "", std::move(rs_meta), &rowset); - if (!st.ok()) { - return nullptr; - } - return rowset; -} - -TEST_F(TestBaseCompaction, filter_input_rowset) { - StorageEngine engine({}); - TabletMetaSharedPtr tablet_meta; - tablet_meta.reset(new TabletMeta(1, 2, 15673, 15674, 4, 5, TTabletSchema(), 6, {{7, 8}}, - UniqueId(9, 10), TTabletType::TABLET_TYPE_DISK, - TCompressionType::LZ4F)); - TabletSharedPtr tablet(new Tablet(engine, tablet_meta, nullptr, CUMULATIVE_SIZE_BASED_POLICY)); - tablet->_cumulative_point = 25; - BaseCompaction compaction(engine, tablet); - //std::vector<RowsetSharedPtr> rowsets; - - RowsetSharedPtr init_rs = create_rowset({0, 1}, 1, false, 0); - tablet->_rs_version_map.emplace(init_rs->version(), init_rs); - for (int i = 2; i < 30; ++i) { - RowsetSharedPtr rs = create_rowset({i, i}, 1, false, 1024); - tablet->_rs_version_map.emplace(rs->version(), rs); - } - Status st = compaction.pick_rowsets_to_compact(); - EXPECT_TRUE(st.ok()); - EXPECT_EQ(compaction._input_rowsets.front()->start_version(), 0); - EXPECT_EQ(compaction._input_rowsets.front()->end_version(), 1); - - EXPECT_EQ(compaction._input_rowsets.back()->start_version(), 21); - EXPECT_EQ(compaction._input_rowsets.back()->end_version(), 21); -} - -} // namespace doris diff --git a/be/test/olap/rowid_conversion_test.cpp b/be/test/olap/rowid_conversion_test.cpp index 5ae80398afb..7c56710f2e8 100644 --- a/be/test/olap/rowid_conversion_test.cpp +++ b/be/test/olap/rowid_conversion_test.cpp @@ -348,9 +348,9 @@ protected: stats.rowid_conversion = &rowid_conversion; Status s; if (is_vertical_merger) { - s = Merger::vertical_merge_rowsets( - tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 10000000, num_segments, &stats); + s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, + *tablet_schema, input_rs_readers, + output_rs_writer.get(), 10000000, &stats); } else { s = Merger::vmerge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, output_rs_writer.get(), &stats); diff --git a/be/test/vec/olap/vertical_compaction_test.cpp b/be/test/vec/olap/vertical_compaction_test.cpp index 4c4409a7506..3afd748e14d 100644 --- a/be/test/vec/olap/vertical_compaction_test.cpp +++ b/be/test/vec/olap/vertical_compaction_test.cpp @@ -490,7 +490,7 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMerge) { stats.rowid_conversion = &rowid_conversion; auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 100, num_segments, &stats); + output_rs_writer.get(), 100, &stats); ASSERT_TRUE(s.ok()) << s; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -598,7 +598,7 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMerge) { stats.rowid_conversion = &rowid_conversion; auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 100, num_segments, &stats); + output_rs_writer.get(), 100, &stats); ASSERT_TRUE(s.ok()) << s; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -706,7 +706,7 @@ TEST_F(VerticalCompactionTest, TestUniqueKeyVerticalMerge) { stats.rowid_conversion = &rowid_conversion; auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 10000, num_segments, &stats); + output_rs_writer.get(), 10000, &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -815,8 +815,7 @@ TEST_F(VerticalCompactionTest, TestDupKeyVerticalMergeWithDelete) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, num_segments, - &stats); + input_rs_readers, output_rs_writer.get(), 100, &stats); ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -918,8 +917,7 @@ TEST_F(VerticalCompactionTest, TestDupWithoutKeyVerticalMergeWithDelete) { RowIdConversion rowid_conversion; stats.rowid_conversion = &rowid_conversion; st = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, - input_rs_readers, output_rs_writer.get(), 100, num_segments, - &stats); + input_rs_readers, output_rs_writer.get(), 100, &stats); ASSERT_TRUE(st.ok()) << st; RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); @@ -1012,7 +1010,7 @@ TEST_F(VerticalCompactionTest, TestAggKeyVerticalMerge) { stats.rowid_conversion = &rowid_conversion; auto s = Merger::vertical_merge_rowsets(tablet, ReaderType::READER_BASE_COMPACTION, *tablet_schema, input_rs_readers, - output_rs_writer.get(), 100, num_segments, &stats); + output_rs_writer.get(), 100, &stats); EXPECT_TRUE(s.ok()); RowsetSharedPtr out_rowset; EXPECT_EQ(Status::OK(), output_rs_writer->build(out_rowset)); diff --git a/regression-test/suites/compaction/compaction_width_array_column.groovy b/regression-test/suites/compaction/compaction_width_array_column.groovy deleted file mode 100644 index 4e3fed354c7..00000000000 --- a/regression-test/suites/compaction/compaction_width_array_column.groovy +++ /dev/null @@ -1,137 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -suite('compaction_width_array_column', "p2") { - String backend_id; - def backendId_to_backendIP = [:] - def backendId_to_backendHttpPort = [:] - getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); - - backend_id = backendId_to_backendIP.keySet()[0] - def (code, out, err) = show_be_config(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id)) - - logger.info("Show config: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def configList = parseJson(out.trim()) - assert configList instanceof List - - def s3BucketName = getS3BucketName() - def random = new Random(); - - def s3WithProperties = """WITH S3 ( - |"AWS_ACCESS_KEY" = "${getS3AK()}", - |"AWS_SECRET_KEY" = "${getS3SK()}", - |"AWS_ENDPOINT" = "${getS3Endpoint()}", - |"AWS_REGION" = "${getS3Region()}") - |PROPERTIES( - |"exec_mem_limit" = "8589934592", - |"load_parallelism" = "3")""".stripMargin() - - // set fe configuration - sql "ADMIN SET FRONTEND CONFIG ('max_bytes_per_broker_scanner' = '161061273600')" - - def tableName = "column_witdh_array" - - def table_create_task = { table_name -> - // drop table if exists - sql """drop table if exists ${table_name}""" - // create table - def create_table = new File("""${context.file.parent}/ddl/${table_name}.sql""").text - create_table = create_table.replaceAll("\\\$\\{table\\_name\\}", table_name) - sql create_table - } - - def table_load_task = { table_name -> - uniqueID = Math.abs(UUID.randomUUID().hashCode()).toString() - loadLabel = table_name + "_" + uniqueID - //loadLabel = table_name + '_load_5' - loadSql = new File("""${context.file.parent}/ddl/${table_name}_load.sql""").text.replaceAll("\\\$\\{s3BucketName\\}", s3BucketName) - loadSql = loadSql.replaceAll("\\\$\\{loadLabel\\}", loadLabel) - loadSql = loadSql.replaceAll("\\\$\\{table\\_name\\}", table_name) - nowloadSql = loadSql + s3WithProperties - try_sql nowloadSql - - while (true) { - def stateResult = sql "show load where Label = '${loadLabel}'" - logger.info("load result is ${stateResult}") - def loadState = stateResult[stateResult.size() - 1][2].toString() - if ("CANCELLED".equalsIgnoreCase(loadState)) { - throw new IllegalStateException("load ${loadLabel} failed.") - } else if ("FINISHED".equalsIgnoreCase(loadState)) { - break - } - sleep(5000) - } - } - - table_create_task(tableName) - table_load_task(tableName) - - def tablets = sql_return_maparray """ show tablets from ${tableName}; """ - - boolean isOverLap = true - int tryCnt = 0; - while (isOverLap && tryCnt < 3) { - isOverLap = false - - for (def tablet in tablets) { - String tablet_id = tablet.TabletId - backend_id = tablet.BackendId - (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) - logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def compactJson = parseJson(out.trim()) - assertEquals("success", compactJson.status.toLowerCase()) - } - - // wait for all compactions done - for (def tablet in tablets) { - boolean running = true - do { - Thread.sleep(1000) - String tablet_id = tablet.TabletId - backend_id = tablet.BackendId - (code, out, err) = be_get_compaction_status(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) - logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def compactionStatus = parseJson(out.trim()) - assertEquals("success", compactionStatus.status.toLowerCase()) - running = compactionStatus.run_status - } while (running) - } - - for (def tablet in tablets) { - String tablet_id = tablet.TabletId - (code, out, err) = curl("GET", tablet.CompactionStatus) - logger.info("Show tablets status: code=" + code + ", out=" + out + ", err=" + err) - assertEquals(code, 0) - def tabletJson = parseJson(out.trim()) - assert tabletJson.rowsets instanceof List - for (String rowset in (List<String>) tabletJson.rowsets) { - logger.info("rowset info" + rowset) - String overLappingStr = rowset.split(" ")[3] - if (overLappingStr == "OVERLAPPING") { - isOverLap = true; - } - logger.info("is over lap " + isOverLap + " " + overLappingStr) - } - } - tryCnt++; - } - - assertFalse(isOverLap); -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org