This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 308ede0657e branch-2.1-pick: [Fix](full compaction) Fix problems for 
full compaction #49825 (#49919)
308ede0657e is described below

commit 308ede0657effcf94927e860ea103c540efabb8e
Author: bobhan1 <bao...@selectdb.com>
AuthorDate: Thu Apr 10 23:26:39 2025 +0800

    branch-2.1-pick: [Fix](full compaction) Fix problems for full compaction 
#49825 (#49919)
    
    pick https://github.com/apache/doris/pull/49825
---
 be/src/olap/full_compaction.cpp                    |  98 ++++++++++-----------
 be/src/olap/full_compaction.h                      |   5 +-
 .../test_full_compaction_mow.out                   | Bin 0 -> 181 bytes
 .../test_full_compaction_mow.groovy                |  98 +++++++++++++++++++++
 4 files changed, 146 insertions(+), 55 deletions(-)

diff --git a/be/src/olap/full_compaction.cpp b/be/src/olap/full_compaction.cpp
index 74bad632250..04efe95132e 100644
--- a/be/src/olap/full_compaction.cpp
+++ b/be/src/olap/full_compaction.cpp
@@ -35,6 +35,7 @@
 #include "olap/schema_change.h"
 #include "olap/tablet_meta.h"
 #include "runtime/thread_context.h"
+#include "util/debug_points.h"
 #include "util/thread.h"
 #include "util/trace.h"
 
@@ -112,13 +113,46 @@ Status FullCompaction::pick_rowsets_to_compact() {
 }
 
 Status FullCompaction::modify_rowsets(const Merger::Statistics* stats) {
+    std::vector<RowsetSharedPtr> output_rowsets {_output_rowset};
     if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
         _tablet->enable_unique_key_merge_on_write()) {
-        RETURN_IF_ERROR(
-                _full_compaction_update_delete_bitmap(_output_rowset, 
_output_rs_writer.get()));
-    }
-    std::vector<RowsetSharedPtr> output_rowsets(1, _output_rowset);
-    {
+        std::vector<RowsetSharedPtr> tmp_rowsets {};
+
+        // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
+        if (_tablet->tablet_state() == TABLET_NOTREADY) {
+            LOG(INFO) << "tablet is under alter process, update delete bitmap 
later, tablet_id="
+                      << _tablet->tablet_id();
+            return Status::OK();
+        }
+
+        int64_t max_version = _tablet->max_version().second;
+        DCHECK(max_version >= _output_rowset->version().second);
+        if (max_version > _output_rowset->version().second) {
+            RETURN_IF_ERROR(_tablet->capture_consistent_rowsets(
+                    {_output_rowset->version().second + 1, max_version}, 
&tmp_rowsets));
+        }
+
+        for (const auto& it : tmp_rowsets) {
+            const int64_t& cur_version = it->rowset_meta()->start_version();
+            RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(it, 
_output_rowset, cur_version,
+                                                                
_output_rs_writer.get()));
+        }
+        DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.before.block", 
DBUG_BLOCK);
+        std::lock_guard rowset_update_lock(_tablet->get_rowset_update_lock());
+        std::lock_guard header_lock(_tablet->get_header_lock());
+        SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
+        for (const auto& it : _tablet->rowset_map()) {
+            const int64_t& cur_version = it.first.first;
+            const RowsetSharedPtr& published_rowset = it.second;
+            if (cur_version > max_version) {
+                RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(
+                        published_rowset, _output_rowset, cur_version, 
_output_rs_writer.get()));
+            }
+        }
+        RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, 
_input_rowsets, true));
+        DBUG_EXECUTE_IF("FullCompaction.modify_rowsets.sleep", { sleep(5); })
+        _tablet->save_meta();
+    } else {
         std::lock_guard<std::mutex> 
rowset_update_wlock(_tablet->get_rowset_update_lock());
         std::lock_guard<std::shared_mutex> 
meta_wlock(_tablet->get_header_lock());
         RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, 
_input_rowsets, true));
@@ -148,55 +182,16 @@ Status FullCompaction::_check_all_version(const 
std::vector<RowsetSharedPtr>& ro
     return Status::OK();
 }
 
-Status FullCompaction::_full_compaction_update_delete_bitmap(const 
RowsetSharedPtr& rowset,
-                                                             RowsetWriter* 
rowset_writer) {
-    std::vector<RowsetSharedPtr> tmp_rowsets {};
-
-    // tablet is under alter process. The delete bitmap will be calculated 
after conversion.
-    if (_tablet->tablet_state() == TABLET_NOTREADY) {
-        LOG(INFO) << "tablet is under alter process, update delete bitmap 
later, tablet_id="
-                  << _tablet->tablet_id();
-        return Status::OK();
-    }
-
-    int64_t max_version = _tablet->max_version().second;
-    DCHECK(max_version >= rowset->version().second);
-    if (max_version > rowset->version().second) {
-        RETURN_IF_ERROR(_tablet->capture_consistent_rowsets(
-                {rowset->version().second + 1, max_version}, &tmp_rowsets));
-    }
-
-    for (const auto& it : tmp_rowsets) {
-        const int64_t& cur_version = it->rowset_meta()->start_version();
-        RETURN_IF_ERROR(
-                _full_compaction_calc_delete_bitmap(it, rowset, cur_version, 
rowset_writer));
-    }
-
-    std::lock_guard rowset_update_lock(_tablet->get_rowset_update_lock());
-    std::lock_guard header_lock(_tablet->get_header_lock());
-    SCOPED_SIMPLE_TRACE_IF_TIMEOUT(TRACE_TABLET_LOCK_THRESHOLD);
-    for (const auto& it : _tablet->rowset_map()) {
-        const int64_t& cur_version = it.first.first;
-        const RowsetSharedPtr& published_rowset = it.second;
-        if (cur_version > max_version) {
-            
RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(published_rowset, rowset,
-                                                                cur_version, 
rowset_writer));
-        }
-    }
-
-    return Status::OK();
-}
-
 Status FullCompaction::_full_compaction_calc_delete_bitmap(const 
RowsetSharedPtr& published_rowset,
                                                            const 
RowsetSharedPtr& rowset,
-                                                           const int64_t& 
cur_version,
+                                                           int64_t cur_version,
                                                            RowsetWriter* 
rowset_writer) {
     std::vector<segment_v2::SegmentSharedPtr> segments;
-    auto beta_rowset = reinterpret_cast<BetaRowset*>(published_rowset.get());
-    RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
+    RETURN_IF_ERROR(
+            
std::static_pointer_cast<BetaRowset>(published_rowset)->load_segments(&segments));
     DeleteBitmapPtr delete_bitmap =
             
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
-    std::vector<RowsetSharedPtr> specified_rowsets(1, rowset);
+    std::vector<RowsetSharedPtr> specified_rowsets {rowset};
 
     OlapStopWatch watch;
     RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments, 
specified_rowsets,
@@ -213,10 +208,11 @@ Status 
FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr
                << ", cost: " << watch.get_elapse_time_us() << "(us), total 
rows: " << total_rows;
 
     for (const auto& [k, v] : delete_bitmap->delete_bitmap) {
-        _tablet->tablet_meta()->delete_bitmap().merge({std::get<0>(k), 
std::get<1>(k), cur_version},
-                                                      v);
+        if (std::get<1>(k) != DeleteBitmap::INVALID_SEGMENT_ID) {
+            _tablet->tablet_meta()->delete_bitmap().merge(
+                    {std::get<0>(k), std::get<1>(k), cur_version}, v);
+        }
     }
-
     return Status::OK();
 }
 
diff --git a/be/src/olap/full_compaction.h b/be/src/olap/full_compaction.h
index 631d901e846..89051458dea 100644
--- a/be/src/olap/full_compaction.h
+++ b/be/src/olap/full_compaction.h
@@ -47,11 +47,8 @@ protected:
 
 private:
     Status _check_all_version(const std::vector<RowsetSharedPtr>& rowsets);
-    Status _full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset,
-                                                 RowsetWriter* rowset_writer);
     Status _full_compaction_calc_delete_bitmap(const RowsetSharedPtr& 
published_rowset,
-                                               const RowsetSharedPtr& rowset,
-                                               const int64_t& cur_version,
+                                               const RowsetSharedPtr& rowset, 
int64_t cur_version,
                                                RowsetWriter* rowset_writer);
 
     DISALLOW_COPY_AND_ASSIGN(FullCompaction);
diff --git 
a/regression-test/data/fault_injection_p0/test_full_compaction_mow.out 
b/regression-test/data/fault_injection_p0/test_full_compaction_mow.out
new file mode 100644
index 00000000000..584a107fca0
Binary files /dev/null and 
b/regression-test/data/fault_injection_p0/test_full_compaction_mow.out differ
diff --git 
a/regression-test/suites/fault_injection_p0/test_full_compaction_mow.groovy 
b/regression-test/suites/fault_injection_p0/test_full_compaction_mow.groovy
new file mode 100644
index 00000000000..4d213ffa93e
--- /dev/null
+++ b/regression-test/suites/fault_injection_p0/test_full_compaction_mow.groovy
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_full_compaction_mow","nonConcurrent") {
+    if (isCloudMode()) {
+        return
+    }
+
+    def backends = sql_return_maparray('show backends')
+    if (backends.size() > 1) {
+        return
+    }
+
+    def tableName = "test_full_compaction_mow"
+    sql """ DROP TABLE IF EXISTS ${tableName} """
+    sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `k` int ,
+            `v` int ,
+        ) engine=olap
+        UNIQUE KEY(k)
+        DISTRIBUTED BY HASH(`k`) BUCKETS 1
+        properties(
+            "replication_num" = "1",
+            "disable_auto_compaction" = "true")
+        """
+
+    sql """ INSERT INTO ${tableName} VALUES (0,00)"""
+    sql """ INSERT INTO ${tableName} VALUES (1,10)"""
+    sql """ INSERT INTO ${tableName} VALUES (2,20)"""
+    sql """ INSERT INTO ${tableName} VALUES (3,30)"""
+
+    def tabletStats = sql_return_maparray("show tablets from ${tableName};")
+    def tabletId = tabletStats[0].TabletId
+    def tabletBackendId = tabletStats[0].BackendId
+    def tabletBackend
+    for (def be : backends) {
+        if (be.BackendId == tabletBackendId) {
+            tabletBackend = be
+            break;
+        }
+    }
+    logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with 
backendId=${tabletBackend.BackendId}");
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("FullCompaction.modify_rowsets.before.block")
+
+        // trigger full compaction
+        logger.info("trigger full compaction on BE ${tabletBackend.Host} with 
backendId=${tabletBackend.BackendId}")
+        def (code, out, err) = be_run_full_compaction(tabletBackend.Host, 
tabletBackend.HttpPort, tabletId)
+        logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" 
+ err)
+        assert  code == 0
+        def compactJson = parseJson(out.trim())
+        assert "success" == compactJson.status.toLowerCase()
+
+        sql """ INSERT INTO ${tableName} VALUES (1,99),(2,99),(3,99);"""
+        qt_sql "select * from ${tableName} order by k;"
+
+        
GetDebugPoint().disableDebugPointForAllBEs("FullCompaction.modify_rowsets.before.block")
+
+        // wait for compaction to finish
+        def running = true
+        do {
+            Thread.sleep(1000)
+            (code, out, err) = be_get_compaction_status(tabletBackend.Host, 
tabletBackend.HttpPort, tabletId)
+            logger.info("Get compaction status: code=" + code + ", out=" + out 
+ ", err=" + err)
+            assertEquals(code, 0)
+            def compactionStatus = parseJson(out.trim())
+            assertEquals("success", compactionStatus.status.toLowerCase())
+            running = compactionStatus.run_status
+        } while (running)
+
+        qt_dup_key_count "select count() from (select k, count(*) from 
${tableName} group by k having count(*) > 1) t"
+        qt_sql "select * from ${tableName} order by k;"
+    } catch (Exception e) {
+        logger.info(e.getMessage())
+        exception = true;
+    } finally {
+        GetDebugPoint().clearDebugPointsForAllBEs()
+        GetDebugPoint().clearDebugPointsForAllFEs()
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to