This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-2.0 by this push:
     new 74ed6e85013 [Fix](partial update) Fix rowset not found error when 
doing partial update #34112 (#34357)
74ed6e85013 is described below

commit 74ed6e8501302c5824f0a00b3be8e1ae2b196cb9
Author: abmdocrt <yukang.lian2...@gmail.com>
AuthorDate: Tue Apr 30 20:04:24 2024 +0800

    [Fix](partial update) Fix rowset not found error when doing partial update 
#34112 (#34357)
---
 be/src/olap/cumulative_compaction_policy.cpp       |   2 +
 be/src/olap/delta_writer.cpp                       |   5 +-
 be/src/olap/delta_writer.h                         |   1 +
 be/src/olap/olap_common.h                          |  10 +-
 be/src/olap/rowset/segment_v2/segment_writer.cpp   |  21 +---
 be/src/olap/tablet.cpp                             |  10 +-
 be/src/olap/tablet.h                               |   4 +-
 ..._update_rowset_not_found_fault_injection.groovy | 112 +++++++++++++++++++++
 8 files changed, 132 insertions(+), 33 deletions(-)

diff --git a/be/src/olap/cumulative_compaction_policy.cpp 
b/be/src/olap/cumulative_compaction_policy.cpp
index 8587b29f512..2f47826b7f0 100644
--- a/be/src/olap/cumulative_compaction_policy.cpp
+++ b/be/src/olap/cumulative_compaction_policy.cpp
@@ -284,6 +284,8 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets(
         transient_size += 1;
         input_rowsets->push_back(rowset);
     }
+    
DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets",
+                    { return transient_size; })
 
     if (total_size >= promotion_size) {
         return transient_size;
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 35d2a13905c..594bc7b630c 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -175,6 +175,7 @@ Status DeltaWriter::init() {
         } else {
             RETURN_IF_ERROR(_tablet->all_rs_id(_cur_max_version, 
&_rowset_ids));
         }
+        _rowset_ptrs = _tablet->get_rowset_by_ids(&_rowset_ids);
     }
 
     // check tablet version number
@@ -217,7 +218,7 @@ Status DeltaWriter::init() {
     context.tablet = _tablet;
     context.write_type = DataWriteType::TYPE_DIRECT;
     context.mow_context = std::make_shared<MowContext>(_cur_max_version, 
_req.txn_id, _rowset_ids,
-                                                       _delete_bitmap);
+                                                       _rowset_ptrs, 
_delete_bitmap);
     context.partial_update_info = _partial_update_info;
     RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer));
 
@@ -363,7 +364,7 @@ void DeltaWriter::_reset_mem_table() {
         _mem_table_flush_trackers.push_back(mem_table_flush_tracker);
     }
     auto mow_context = std::make_shared<MowContext>(_cur_max_version, 
_req.txn_id, _rowset_ids,
-                                                    _delete_bitmap);
+                                                    _rowset_ptrs, 
_delete_bitmap);
     _mem_table.reset(new MemTable(_tablet, _schema.get(), 
_tablet_schema.get(), _req.slots,
                                   _req.tuple_desc, _rowset_writer.get(), 
mow_context,
                                   _partial_update_info.get(), 
mem_table_insert_tracker,
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 7d81d6344a2..ede5ca1f03b 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -197,6 +197,7 @@ private:
     std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token;
     // current rowset_ids, used to do diff in publish_version
     RowsetIdUnorderedSet _rowset_ids;
+    std::vector<RowsetSharedPtr> _rowset_ptrs;
     // current max version, used to calculate delete bitmap
     int64_t _cur_max_version;
 
diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h
index 58b8ded5f81..a8696b5c3b5 100644
--- a/be/src/olap/olap_common.h
+++ b/be/src/olap/olap_common.h
@@ -39,6 +39,8 @@
 
 namespace doris {
 
+class Rowset;
+
 static constexpr int64_t MAX_ROWSET_ID = 1L << 56;
 static constexpr int64_t LOW_56_BITS = 0x00ffffffffffffff;
 
@@ -470,11 +472,17 @@ class DeleteBitmap;
 // merge on write context
 struct MowContext {
     MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids,
+               const std::vector<std::shared_ptr<Rowset>>& rowset_ptrs,
                std::shared_ptr<DeleteBitmap> db)
-            : max_version(version), txn_id(txnid), rowset_ids(ids), 
delete_bitmap(db) {}
+            : max_version(version),
+              txn_id(txnid),
+              rowset_ids(ids),
+              rowset_ptrs(rowset_ptrs),
+              delete_bitmap(db) {}
     int64_t max_version;
     int64_t txn_id;
     const RowsetIdUnorderedSet& rowset_ids;
+    std::vector<std::shared_ptr<Rowset>> rowset_ptrs;
     std::shared_ptr<DeleteBitmap> delete_bitmap;
 };
 
diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp 
b/be/src/olap/rowset/segment_v2/segment_writer.cpp
index 5d131c25537..b696a88602d 100644
--- a/be/src/olap/rowset/segment_v2/segment_writer.cpp
+++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp
@@ -402,25 +402,8 @@ Status 
SegmentWriter::append_block_with_partial_content(const vectorized::Block*
     std::vector<RowsetSharedPtr> specified_rowsets;
     {
         std::shared_lock rlock(_tablet->get_header_lock());
-        // Under normal circumstances, `get_rowset_by_ids` does not need to 
consider the stale
-        // rowset, in other word, if a rowset id is not found in the normal 
rowset, we can ignore
-        // it. This is because even if we handle stale rowset here, we need to 
recalculate the
-        // new rowset generated by the corresponding compaction in the publish 
phase.
-        // However, for partial update, ignoring the stale rowset may cause 
some keys to not be
-        // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND), 
and thus be mistaken
-        // as new keys in the flush phase, which will cause the load to fail 
in the following
-        // two cases:
-        // 1. when strict_mode is enabled, new keys are not allowed to be 
added.
-        // 2. Some columns that need to be filled are neither nullable nor 
have a default value,
-        //    in which case the value of the field cannot be filled as a new 
key, leading to a
-        //    failure of the load.
-        bool should_include_stale =
-                _opts.rowset_ctx->partial_update_info->is_strict_mode ||
-                
!_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update;
-        specified_rowsets =
-                _tablet->get_rowset_by_ids(&_mow_context->rowset_ids, 
should_include_stale);
-        if (_opts.rowset_ctx->partial_update_info->is_strict_mode &&
-            specified_rowsets.size() != _mow_context->rowset_ids.size()) {
+        specified_rowsets = _mow_context->rowset_ptrs;
+        if (specified_rowsets.size() != _mow_context->rowset_ids.size()) {
             // Only when this is a strict mode partial update that missing 
rowsets here will lead to problems.
             // In other case, the missing rowsets will be calculated in later 
phases(commit phase/publish phase)
             LOG(WARNING) << fmt::format(
diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp
index d1d6fa19066..51811c2d22d 100644
--- a/be/src/olap/tablet.cpp
+++ b/be/src/olap/tablet.cpp
@@ -3266,7 +3266,7 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset,
 }
 
 std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids(
-        const RowsetIdUnorderedSet* specified_rowset_ids, bool include_stale) {
+        const RowsetIdUnorderedSet* specified_rowset_ids) {
     std::vector<RowsetSharedPtr> rowsets;
     for (auto& rs : _rs_version_map) {
         if (!specified_rowset_ids ||
@@ -3274,14 +3274,6 @@ std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids(
             rowsets.push_back(rs.second);
         }
     }
-    if (include_stale && specified_rowset_ids != nullptr &&
-        rowsets.size() != specified_rowset_ids->size()) {
-        for (auto& rs : _stale_rs_version_map) {
-            if (specified_rowset_ids->find(rs.second->rowset_id()) != 
specified_rowset_ids->end()) {
-                rowsets.push_back(rs.second);
-            }
-        }
-    }
     std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs, 
RowsetSharedPtr& rhs) {
         return lhs->end_version() > rhs->end_version();
     });
diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h
index 29d7209b906..775bfa9262b 100644
--- a/be/src/olap/tablet.h
+++ b/be/src/olap/tablet.h
@@ -469,8 +469,8 @@ public:
                               DeleteBitmapPtr delete_bitmap, int64_t version,
                               CalcDeleteBitmapToken* token, RowsetWriter* 
rowset_writer = nullptr);
 
-    std::vector<RowsetSharedPtr> get_rowset_by_ids(const RowsetIdUnorderedSet* 
specified_rowset_ids,
-                                                   bool include_stale = false);
+    std::vector<RowsetSharedPtr> get_rowset_by_ids(
+            const RowsetIdUnorderedSet* specified_rowset_ids);
 
     Status calc_segment_delete_bitmap(RowsetSharedPtr rowset,
                                       const segment_v2::SegmentSharedPtr& seg,
diff --git 
a/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
 
b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
new file mode 100644
index 00000000000..befad64da0a
--- /dev/null
+++ 
b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy
@@ -0,0 +1,112 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_partial_update_rowset_not_found_fault_injection", 
"p2,nonConcurrent") {
+    def testTable = "test_partial_update_rowset_not_found_fault_injection"
+    sql """ DROP TABLE IF EXISTS ${testTable}"""
+    sql """
+        create table ${testTable}
+        (
+        `k1` INT,
+        `v1` INT NOT NULL,
+        `v2` INT NOT NULL,
+        `v3` INT NOT NULL,
+        `v4` INT NOT NULL,
+        `v5` INT NOT NULL,
+        `v6` INT NOT NULL,
+        `v7` INT NOT NULL,
+        `v8` INT NOT NULL,
+        `v9` INT NOT NULL,
+        `v10` INT NOT NULL
+        )
+        UNIQUE KEY (`k1`)
+        DISTRIBUTED BY HASH(`k1`) BUCKETS 1
+        PROPERTIES (
+        "replication_num" = "1",
+        "disable_auto_compaction" = "true"
+        );
+    """
+
+    def load_data =  {
+        streamLoad {
+            table 'test_partial_update_rowset_not_found_fault_injection'
+            set 'column_separator', ','
+            set 'compress_type', 'GZ'
+
+
+            file 
"""${getS3Url()}/regression/fault_injection/test_partial_update_rowset_not_found_falut_injection1.csv.gz"""
+
+            time 300000 
+
+            check { result, exception, startTime, endTime ->
+                if (exception != null) {
+                    throw exception
+                }
+                log.info("Stream load result: ${result}".toString())
+                def json = parseJson(result)
+                assertEquals("success", json.Status.toLowerCase())
+                assertEquals(json.NumberTotalRows, json.NumberLoadedRows)
+                assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0)
+            }
+        }
+    }
+
+    def backendId_to_backendIP = [:]
+    def backendId_to_backendHttpPort = [:]
+    def backendId_to_params = [string:[:]]
+
+    getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort);
+
+    load_data()
+    def error = false
+
+
+    GetDebugPoint().clearDebugPointsForAllBEs()
+    try {
+        
GetDebugPoint().enableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep")
+        
GetDebugPoint().enableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets")
+        def thread = Thread.start{
+            try {
+                sql """update ${testTable} set v10=1"""
+            }
+            catch (Exception e){
+                logger.info(e.getMessage())
+                error = true
+            }
+        }
+
+        Thread.sleep(2000)
+        // trigger compactions for all tablets in ${tableName}
+        def tablets = sql_return_maparray """ show tablets from ${testTable}; 
"""
+        for (def tablet in tablets) {
+            String tablet_id = tablet.TabletId
+            backend_id = tablet.BackendId
+            (code, out, err) = 
be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), 
backendId_to_backendHttpPort.get(backend_id), tablet_id)
+            logger.info("Run compaction: code=" + code + ", out=" + out + ", 
err=" + err)
+            assertEquals(code, 0)
+        }
+
+        thread.join()
+        assertFalse(error)
+    } catch (Exception e){
+        logger.info(e.getMessage())
+        assertFalse(true)
+    } finally {
+        
GetDebugPoint().disableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep")
+        
GetDebugPoint().disableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets")
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to