zhannngchen commented on code in PR #12866: URL: https://github.com/apache/doris/pull/12866#discussion_r982002203
########## be/src/olap/rowset/beta_rowset_writer.cpp: ########## @@ -304,60 +397,76 @@ void BetaRowsetWriter::find_longest_consecutive_small_segment( is_terminated_by_big = true; break; } else { - rename_compacted_segment_plain(_segcompacted_point); - ++_segcompacted_point; + RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++)); } } else { let_big_terminate = true; // break if find a big after small segments->push_back(seg); - ++_segcompacted_point; } } size_t s = segments->size(); if (!is_terminated_by_big && s <= (config::segcompaction_threshold_segment_num / 2)) { // start with big segments and end with small, better to do it in next // round to compact more at once - _segcompacted_point -= s; segments->clear(); - LOG(INFO) << "candidate segments num too small:" << s; - return; + return Status::OK(); } if (s == 1) { // poor bachelor, let it go LOG(INFO) << "only one candidate segment"; - rename_compacted_segment_plain(_segcompacted_point - 1); + RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++)); segments->clear(); - return; + return Status::OK(); } std::stringstream ss; for (auto& segment : (*segments.get())) { ss << "[id:" << segment->id() << " num_rows:" << segment->num_rows() << "]"; } LOG(INFO) << "candidate segments num:" << s << " list of candidates:" << ss.str(); + return Status::OK(); } -SegCompactionCandidatesSharedPtr BetaRowsetWriter::get_segcompaction_candidates(bool is_last) { - SegCompactionCandidatesSharedPtr segments = std::make_shared<SegCompactionCandidates>(); +Status BetaRowsetWriter::_get_segcompaction_candidates(SegCompactionCandidatesSharedPtr& segments, + bool is_last) { if (is_last) { - load_noncompacted_segments(segments.get()); - if (segments->size() == 1) { - LOG(INFO) << "only one last candidate segment"; - rename_compacted_segment_plain(_segcompacted_point); - segments->clear(); + // currently we only rename remaining segments to reduce wait time + // so that transaction can be committed ASAP + RETURN_NOT_OK(_load_noncompacted_segments(segments.get())); + for (int i = 0; i < segments->size(); ++i) { + RETURN_NOT_OK(_rename_compacted_segment_plain(_segcompacted_point++)); } + segments->clear(); } else { - find_longest_consecutive_small_segment(segments); + RETURN_NOT_OK(_find_longest_consecutive_small_segment(segments)); } - return segments; + return Status::OK(); } -void BetaRowsetWriter::segcompaction_if_necessary() { - if (!config::enable_segcompaction || !config::enable_storage_vectorization) { - return; - } - if (!_is_doing_segcompaction && - ((_num_segment - _segcompacted_point) >= config::segcompaction_threshold_segment_num)) { +bool BetaRowsetWriter::_check_and_set_is_doing_segcompaction() { + std::unique_lock<std::mutex> l(_is_doing_segcompaction_lock); + if (!_is_doing_segcompaction) { _is_doing_segcompaction = true; - SegCompactionCandidatesSharedPtr segments = get_segcompaction_candidates(false); + return true; + } else { + return false; + } +} + +Status BetaRowsetWriter::_segcompaction_if_necessary() { + if (!config::enable_segcompaction || !config::enable_storage_vectorization || + !_check_and_set_is_doing_segcompaction()) { + return Status::OK(); + } + if (_segcompaction_status.load() != OLAP_SUCCESS) { + _is_doing_segcompaction = false; Review Comment: ok,👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org