This is an automated email from the ASF dual-hosted git repository. yiguolei pushed a commit to branch branch-2.1 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push: new 308ede0657e branch-2.1-pick: [Fix](full compaction) Fix problems for full compaction #49825 (#49919) 308ede0657e is described below commit 308ede0657effcf94927e860ea103c540efabb8e Author: bobhan1 <bao...@selectdb.com> AuthorDate: Thu Apr 10 23:26:39 2025 +0800 branch-2.1-pick: [Fix](full compaction) Fix problems for full compaction #49825 (#49919) pick https://github.com/apache/doris/pull/49825 --- be/src/olap/full_compaction.cpp | 98 ++++++++++----------- be/src/olap/full_compaction.h | 5 +- .../test_full_compaction_mow.out | Bin 0 -> 181 bytes .../test_full_compaction_mow.groovy | 98 +++++++++++++++++++++ 4 files changed, 146 insertions(+), 55 deletions(-) diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp index 74bad632250..04efe95132e 100644 --- a/be/src/olap/full_compaction.cpp +++ b/be/src/olap/full_compaction.cpp @@ -35,6 +35,7 @@ #include "olap/schema_change.h" #include "olap/tablet_meta.h" #include "runtime/thread_context.h" +#include "util/debug_points.h" #include "util/thread.h" #include "util/trace.h" @@ -112,13 +113,46 @@ Status FullCompaction::pick_rowsets_to_compact() { } Status FullCompaction::modify_rowsets(const Merger::Statistics* stats) { + std::vector<RowsetSharedPtr> output_rowsets {_output_rowset}; if (_tablet->keys_type() == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) { - RETURN_IF_ERROR( - _full_compaction_update_delete_bitmap(_output_rowset, _output_rs_writer.get())); - } - std::vector<RowsetSharedPtr> output_rowsets(1, _output_rowset); - { + std::vector<RowsetSharedPtr> tmp_rowsets {}; + + // tablet is under alter process. The delete bitmap will be calculated after conversion. + if (_tablet->tablet_state() == TABLET_NOTREADY) { + LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" + << _tablet->tablet_id(); + return Status::OK(); + } + + int64_t max_version = _tablet->max_version().second; + DCHECK(max_version >= _output_rowset->version().second); + if (max_version > _output_rowset->version().second) { + RETURN_IF_ERROR(_tablet->capture_consistent_rowsets( + {_output_rowset->version().second + 1, max_version}, &tmp_rowsets)); + } + + for (const auto& it : tmp_rowsets) { + const int64_t& cur_version = it->rowset_meta()->start_version(); + RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(it, _output_rowset, cur_version, + _output_rs_writer.get())); + } + DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.before.block", DBUG_BLOCK); + std::lock_guard rowset_update_lock(_tablet->get_rowset_update_lock()); + std::lock_guard header_lock(_tablet->get_header_lock()); + SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); + for (const auto& it : _tablet->rowset_map()) { + const int64_t& cur_version = it.first.first; + const RowsetSharedPtr& published_rowset = it.second; + if (cur_version > max_version) { + RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap( + published_rowset, _output_rowset, cur_version, _output_rs_writer.get())); + } + } + RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true)); + DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); }) + _tablet->save_meta(); + } else { std::lock_guard<std::mutex> rowset_update_wlock(_tablet->get_rowset_update_lock()); std::lock_guard<std::shared_mutex> meta_wlock(_tablet->get_header_lock()); RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true)); @@ -148,55 +182,16 @@ Status FullCompaction::_check_all_version(const std::vector<RowsetSharedPtr>& ro return Status::OK(); } -Status FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset, - RowsetWriter* rowset_writer) { - std::vector<RowsetSharedPtr> tmp_rowsets {}; - - // tablet is under alter process. The delete bitmap will be calculated after conversion. - if (_tablet->tablet_state() == TABLET_NOTREADY) { - LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id=" - << _tablet->tablet_id(); - return Status::OK(); - } - - int64_t max_version = _tablet->max_version().second; - DCHECK(max_version >= rowset->version().second); - if (max_version > rowset->version().second) { - RETURN_IF_ERROR(_tablet->capture_consistent_rowsets( - {rowset->version().second + 1, max_version}, &tmp_rowsets)); - } - - for (const auto& it : tmp_rowsets) { - const int64_t& cur_version = it->rowset_meta()->start_version(); - RETURN_IF_ERROR( - _full_compaction_calc_delete_bitmap(it, rowset, cur_version, rowset_writer)); - } - - std::lock_guard rowset_update_lock(_tablet->get_rowset_update_lock()); - std::lock_guard header_lock(_tablet->get_header_lock()); - SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD); - for (const auto& it : _tablet->rowset_map()) { - const int64_t& cur_version = it.first.first; - const RowsetSharedPtr& published_rowset = it.second; - if (cur_version > max_version) { - RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(published_rowset, rowset, - cur_version, rowset_writer)); - } - } - - return Status::OK(); -} - Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr& published_rowset, const RowsetSharedPtr& rowset, - const int64_t& cur_version, + int64_t cur_version, RowsetWriter* rowset_writer) { std::vector<segment_v2::SegmentSharedPtr> segments; - auto beta_rowset = reinterpret_cast<BetaRowset*>(published_rowset.get()); - RETURN_IF_ERROR(beta_rowset->load_segments(&segments)); + RETURN_IF_ERROR( + std::static_pointer_cast<BetaRowset>(published_rowset)->load_segments(&segments)); DeleteBitmapPtr delete_bitmap = std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id()); - std::vector<RowsetSharedPtr> specified_rowsets(1, rowset); + std::vector<RowsetSharedPtr> specified_rowsets {rowset}; OlapStopWatch watch; RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments, specified_rowsets, @@ -213,10 +208,11 @@ Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr << ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows; for (const auto& [k, v] : delete_bitmap->delete_bitmap) { - _tablet->tablet_meta()->delete_bitmap().merge({std::get<0>(k), std::get<1>(k), cur_version}, - v); + if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) { + _tablet->tablet_meta()->delete_bitmap().merge( + {std::get<0>(k), std::get<1>(k), cur_version}, v); + } } - return Status::OK(); } diff --git a/be/src/olap/full_compaction.h b/be/src/olap/full_compaction.h index 631d901e846..89051458dea 100644 --- a/be/src/olap/full_compaction.h +++ b/be/src/olap/full_compaction.h @@ -47,11 +47,8 @@ protected: private: Status _check_all_version(const std::vector<RowsetSharedPtr>& rowsets); - Status _full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset, - RowsetWriter* rowset_writer); Status _full_compaction_calc_delete_bitmap(const RowsetSharedPtr& published_rowset, - const RowsetSharedPtr& rowset, - const int64_t& cur_version, + const RowsetSharedPtr& rowset, int64_t cur_version, RowsetWriter* rowset_writer); DISALLOW_COPY_AND_ASSIGN(FullCompaction); diff --git a/regression-test/data/fault_injection_p0/test_full_compaction_mow.out b/regression-test/data/fault_injection_p0/test_full_compaction_mow.out new file mode 100644 index 00000000000..584a107fca0 Binary files /dev/null and b/regression-test/data/fault_injection_p0/test_full_compaction_mow.out differ diff --git a/regression-test/suites/fault_injection_p0/test_full_compaction_mow.groovy b/regression-test/suites/fault_injection_p0/test_full_compaction_mow.groovy new file mode 100644 index 00000000000..4d213ffa93e --- /dev/null +++ b/regression-test/suites/fault_injection_p0/test_full_compaction_mow.groovy @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_full_compaction_mow","nonConcurrent") { + if (isCloudMode()) { + return + } + + def backends = sql_return_maparray('show backends') + if (backends.size() > 1) { + return + } + + def tableName = "test_full_compaction_mow" + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `k` int , + `v` int , + ) engine=olap + UNIQUE KEY(k) + DISTRIBUTED BY HASH(`k`) BUCKETS 1 + properties( + "replication_num" = "1", + "disable_auto_compaction" = "true") + """ + + sql """ INSERT INTO ${tableName} VALUES (0,00)""" + sql """ INSERT INTO ${tableName} VALUES (1,10)""" + sql """ INSERT INTO ${tableName} VALUES (2,20)""" + sql """ INSERT INTO ${tableName} VALUES (3,30)""" + + def tabletStats = sql_return_maparray("show tablets from ${tableName};") + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + GetDebugPoint().clearDebugPointsForAllBEs() + + try { + GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.modify_rowsets.before.block") + + // trigger full compaction + logger.info("trigger full compaction on BE ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_full_compaction(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assert code == 0 + def compactJson = parseJson(out.trim()) + assert "success" == compactJson.status.toLowerCase() + + sql """ INSERT INTO ${tableName} VALUES (1,99),(2,99),(3,99);""" + qt_sql "select * from ${tableName} order by k;" + + GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.modify_rowsets.before.block") + + // wait for compaction to finish + def running = true + do { + Thread.sleep(1000) + (code, out, err) = be_get_compaction_status(tabletBackend.Host, tabletBackend.HttpPort, tabletId) + logger.info("Get compaction status: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + def compactionStatus = parseJson(out.trim()) + assertEquals("success", compactionStatus.status.toLowerCase()) + running = compactionStatus.run_status + } while (running) + + qt_dup_key_count "select count() from (select k, count(*) from ${tableName} group by k having count(*) > 1) t" + qt_sql "select * from ${tableName} order by k;" + } catch (Exception e) { + logger.info(e.getMessage()) + exception = true; + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org