zhannngchen commented on code in PR #12866: URL: https://github.com/apache/doris/pull/12866#discussion_r981901633
########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -165,52 +171,91 @@ Status BetaRowsetWriter::delete_original_segments(uint32_t begin, uint32_t end) // Even if an error is encountered, these files that have not been cleaned up // will be cleaned up by the GC background. So here we only print the error // message when we encounter an error. - WARN_IF_ERROR(fs->delete_file(seg_path), - strings::Substitute("Failed to delete file=$0", seg_path)); + RETURN_NOT_OK_LOG(fs->delete_file(seg_path), + strings::Substitute("Failed to delete file=$0", seg_path)); } return Status::OK(); } -void BetaRowsetWriter::rename_compacted_segments(int64_t begin, int64_t end) { +Status BetaRowsetWriter::_rename_compacted_segments(int64_t begin, int64_t end) { int ret; auto src_seg_path = BetaRowset::local_segment_path_segcompacted(_context.tablet_path, _context.rowset_id, begin, end); auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, _num_segcompacted++); ret = rename(src_seg_path.c_str(), dst_seg_path.c_str()); - DCHECK_EQ(ret, 0); + if (ret) { + return Status::OLAPInternalError(OLAP_ERR_ROWSET_RENAME_FILE_FAILED); + } + return Status::OK(); } -// todo: will rename only do the job? maybe need deep modification -void BetaRowsetWriter::rename_compacted_segment_plain(uint64_t seg_id) { +// TODO(zhangzhengyu): maybe need content modification instead of simply renaming +Status BetaRowsetWriter::_rename_compacted_segment_plain(uint64_t seg_id) { + if (seg_id == _num_segcompacted) { + return Status::OK(); + } + int ret; auto src_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, seg_id); auto dst_seg_path = BetaRowset::local_segment_path(_context.tablet_path, _context.rowset_id, _num_segcompacted++); LOG(INFO) << "segcompaction skip this segment. rename " << src_seg_path << " to " << dst_seg_path; - if (src_seg_path.compare(dst_seg_path) != 0) { - CHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); - CHECK_EQ(_segid_statistics_map.find(_num_segcompacted) == _segid_statistics_map.end(), - true); + { + std::lock_guard<std::mutex> lock(_segid_statistics_map_mutex); + DCHECK_EQ(_segid_statistics_map.find(seg_id + 1) == _segid_statistics_map.end(), false); Review Comment: rebase to master, and use `seg_id` instead of `seg_id + 1` ########## be/src/olap/rowset/beta_rowset_writer.h: ########## @@ -149,9 +157,15 @@ class BetaRowsetWriter : public RowsetWriter { std::vector<KeyBoundsPB> _segments_encoded_key_bounds; // record rows number of every segment std::vector<uint32_t> _segment_num_rows; + + // ensure only one inflight segcompaction task for each rowset std::atomic<bool> _is_doing_segcompaction; + // enforce compare-and-swap on _is_doing_segcompaction + std::mutex _is_doing_segcompaction_lock; Review Comment: `_segcompacting_cond_lock` is enought to help to protect `_is_doing_segcompaction`, no need to import another lock? I don't think we need two locks here, you can keep 1 of them, and the code can works well and simple? ########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -304,60 +397,76 @@ void BetaRowsetWriter::find_longest_consecutive_small_segment( is_terminated_by_big = true; break; } else { - rename_compacted_segment_plain(_segcompacted_point); - ++_segcompacted_point; + RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++)); } } else { let_big_terminate = true; // break if find a big after small segments->push_back(seg); - ++_segcompacted_point; } } size_t s = segments->size(); if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) { // start with big segments and end with small, better to do it in next // round to compact more at once - _segcompacted_point -= s; segments->clear(); - LOG(INFO) << "candidate segments num too small:" << s; - return; + return Status::OK(); } if (s == 1) { // poor bachelor, let it go LOG(INFO) << "only one candidate segment"; - rename_compacted_segment_plain(_segcompacted_point - 1); + RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++)); segments->clear(); - return; + return Status::OK(); } std::stringstream ss; for (auto& segment : (*segments.get())) { ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; } LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << ss.str(); + return Status::OK(); } -SegCompactionCandidatesSharedPtr BetaRowsetWriter::get_segcompaction_candidates(bool is_last) { - SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); +Status BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, + bool is_last) { if (is_last) { - load_noncompacted_segments(segments.get()); - if (segments->size() == 1) { - LOG(INFO) << "only one last candidate segment"; - rename_compacted_segment_plain(_segcompacted_point); - segments->clear(); + // currently we only rename remaining segments to reduce wait time + // so that transaction can be committed ASAP + RETURN_NOT_OK(_load_noncompacted_segments(segments.get())); + for (int i = 0; i < segments->size(); ++i) { + RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++)); } + segments->clear(); } else { - find_longest_consecutive_small_segment(segments); + RETURN_NOT_OK(_find_longest_consecutive_small_segment(segments)); } - return segments; + return Status::OK(); } -void BetaRowsetWriter::segcompaction_if_necessary() { - if (!config::enable_segcompaction || !config::enable_storage_vectorization) { - return; - } - if (!_is_doing_segcompaction && - ((_num_segment - _segcompacted_point) >= config::segcompaction_threshold_segment_num)) { +bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() { + std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock); + if (!_is_doing_segcompaction) { _is_doing_segcompaction = true; - SegCompactionCandidatesSharedPtr segments = get_segcompaction_candidates(false); + return true; + } else { + return false; + } +} + +Status BetaRowsetWriter::_segcompaction_if_necessary() { + if (!config::enable_segcompaction || !config::enable_storage_vectorization || + !_check_and_set_is_doing_segcompaction()) { + return Status::OK(); + } + if (_segcompaction_status.load() != OLAP_SUCCESS) { + _is_doing_segcompaction = false; Review Comment: Need a log here? print `_segcompaction_status` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org