This is an automated email from the ASF dual-hosted git repository. kxiao pushed a commit to branch branch-2.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.0 by this push: new 74ed6e85013 [Fix](partial update) Fix rowset not found error when doing partial update #34112 (#34357) 74ed6e85013 is described below commit 74ed6e8501302c5824f0a00b3be8e1ae2b196cb9 Author: abmdocrt <yukang.lian2...@gmail.com> AuthorDate: Tue Apr 30 20:04:24 2024 +0800 [Fix](partial update) Fix rowset not found error when doing partial update #34112 (#34357) --- be/src/olap/cumulative_compaction_policy.cpp | 2 + be/src/olap/delta_writer.cpp | 5 +- be/src/olap/delta_writer.h | 1 + be/src/olap/olap_common.h | 10 +- be/src/olap/rowset/segment_v2/segment_writer.cpp | 21 +--- be/src/olap/tablet.cpp | 10 +- be/src/olap/tablet.h | 4 +- ..._update_rowset_not_found_fault_injection.groovy | 112 +++++++++++++++++++++ 8 files changed, 132 insertions(+), 33 deletions(-) diff --git a/be/src/olap/cumulative_compaction_policy.cpp b/be/src/olap/cumulative_compaction_policy.cpp index 8587b29f512..2f47826b7f0 100644 --- a/be/src/olap/cumulative_compaction_policy.cpp +++ b/be/src/olap/cumulative_compaction_policy.cpp @@ -284,6 +284,8 @@ int SizeBasedCumulativeCompactionPolicy::pick_input_rowsets( transient_size += 1; input_rowsets->push_back(rowset); } + DBUG_EXECUTE_IF("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets", + { return transient_size; }) if (total_size >= promotion_size) { return transient_size; diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp index 35d2a13905c..594bc7b630c 100644 --- a/be/src/olap/delta_writer.cpp +++ b/be/src/olap/delta_writer.cpp @@ -175,6 +175,7 @@ Status DeltaWriter::init() { } else { RETURN_IF_ERROR(_tablet->all_rs_id(_cur_max_version, &_rowset_ids)); } + _rowset_ptrs = _tablet->get_rowset_by_ids(&_rowset_ids); } // check tablet version number @@ -217,7 +218,7 @@ Status DeltaWriter::init() { context.tablet = _tablet; context.write_type = DataWriteType::TYPE_DIRECT; context.mow_context = std::make_shared<MowContext>(_cur_max_version, _req.txn_id, _rowset_ids, - _delete_bitmap); + _rowset_ptrs, _delete_bitmap); context.partial_update_info = _partial_update_info; RETURN_IF_ERROR(_tablet->create_rowset_writer(context, &_rowset_writer)); @@ -363,7 +364,7 @@ void DeltaWriter::_reset_mem_table() { _mem_table_flush_trackers.push_back(mem_table_flush_tracker); } auto mow_context = std::make_shared<MowContext>(_cur_max_version, _req.txn_id, _rowset_ids, - _delete_bitmap); + _rowset_ptrs, _delete_bitmap); _mem_table.reset(new MemTable(_tablet, _schema.get(), _tablet_schema.get(), _req.slots, _req.tuple_desc, _rowset_writer.get(), mow_context, _partial_update_info.get(), mem_table_insert_tracker, diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h index 7d81d6344a2..ede5ca1f03b 100644 --- a/be/src/olap/delta_writer.h +++ b/be/src/olap/delta_writer.h @@ -197,6 +197,7 @@ private: std::unique_ptr<CalcDeleteBitmapToken> _calc_delete_bitmap_token; // current rowset_ids, used to do diff in publish_version RowsetIdUnorderedSet _rowset_ids; + std::vector<RowsetSharedPtr> _rowset_ptrs; // current max version, used to calculate delete bitmap int64_t _cur_max_version; diff --git a/be/src/olap/olap_common.h b/be/src/olap/olap_common.h index 58b8ded5f81..a8696b5c3b5 100644 --- a/be/src/olap/olap_common.h +++ b/be/src/olap/olap_common.h @@ -39,6 +39,8 @@ namespace doris { +class Rowset; + static constexpr int64_t MAX_ROWSET_ID = 1L << 56; static constexpr int64_t LOW_56_BITS = 0x00ffffffffffffff; @@ -470,11 +472,17 @@ class DeleteBitmap; // merge on write context struct MowContext { MowContext(int64_t version, int64_t txnid, const RowsetIdUnorderedSet& ids, + const std::vector<std::shared_ptr<Rowset>>& rowset_ptrs, std::shared_ptr<DeleteBitmap> db) - : max_version(version), txn_id(txnid), rowset_ids(ids), delete_bitmap(db) {} + : max_version(version), + txn_id(txnid), + rowset_ids(ids), + rowset_ptrs(rowset_ptrs), + delete_bitmap(db) {} int64_t max_version; int64_t txn_id; const RowsetIdUnorderedSet& rowset_ids; + std::vector<std::shared_ptr<Rowset>> rowset_ptrs; std::shared_ptr<DeleteBitmap> delete_bitmap; }; diff --git a/be/src/olap/rowset/segment_v2/segment_writer.cpp b/be/src/olap/rowset/segment_v2/segment_writer.cpp index 5d131c25537..b696a88602d 100644 --- a/be/src/olap/rowset/segment_v2/segment_writer.cpp +++ b/be/src/olap/rowset/segment_v2/segment_writer.cpp @@ -402,25 +402,8 @@ Status SegmentWriter::append_block_with_partial_content(const vectorized::Block* std::vector<RowsetSharedPtr> specified_rowsets; { std::shared_lock rlock(_tablet->get_header_lock()); - // Under normal circumstances, `get_rowset_by_ids` does not need to consider the stale - // rowset, in other word, if a rowset id is not found in the normal rowset, we can ignore - // it. This is because even if we handle stale rowset here, we need to recalculate the - // new rowset generated by the corresponding compaction in the publish phase. - // However, for partial update, ignoring the stale rowset may cause some keys to not be - // found in the flush phase (lookup_row_key returns KEY_NOT_FOUND), and thus be mistaken - // as new keys in the flush phase, which will cause the load to fail in the following - // two cases: - // 1. when strict_mode is enabled, new keys are not allowed to be added. - // 2. Some columns that need to be filled are neither nullable nor have a default value, - // in which case the value of the field cannot be filled as a new key, leading to a - // failure of the load. - bool should_include_stale = - _opts.rowset_ctx->partial_update_info->is_strict_mode || - !_opts.rowset_ctx->partial_update_info->can_insert_new_rows_in_partial_update; - specified_rowsets = - _tablet->get_rowset_by_ids(&_mow_context->rowset_ids, should_include_stale); - if (_opts.rowset_ctx->partial_update_info->is_strict_mode && - specified_rowsets.size() != _mow_context->rowset_ids.size()) { + specified_rowsets = _mow_context->rowset_ptrs; + if (specified_rowsets.size() != _mow_context->rowset_ids.size()) { // Only when this is a strict mode partial update that missing rowsets here will lead to problems. // In other case, the missing rowsets will be calculated in later phases(commit phase/publish phase) LOG(WARNING) << fmt::format( diff --git a/be/src/olap/tablet.cpp b/be/src/olap/tablet.cpp index d1d6fa19066..51811c2d22d 100644 --- a/be/src/olap/tablet.cpp +++ b/be/src/olap/tablet.cpp @@ -3266,7 +3266,7 @@ Status Tablet::calc_delete_bitmap(RowsetSharedPtr rowset, } std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids( - const RowsetIdUnorderedSet* specified_rowset_ids, bool include_stale) { + const RowsetIdUnorderedSet* specified_rowset_ids) { std::vector<RowsetSharedPtr> rowsets; for (auto& rs : _rs_version_map) { if (!specified_rowset_ids || @@ -3274,14 +3274,6 @@ std::vector<RowsetSharedPtr> Tablet::get_rowset_by_ids( rowsets.push_back(rs.second); } } - if (include_stale && specified_rowset_ids != nullptr && - rowsets.size() != specified_rowset_ids->size()) { - for (auto& rs : _stale_rs_version_map) { - if (specified_rowset_ids->find(rs.second->rowset_id()) != specified_rowset_ids->end()) { - rowsets.push_back(rs.second); - } - } - } std::sort(rowsets.begin(), rowsets.end(), [](RowsetSharedPtr& lhs, RowsetSharedPtr& rhs) { return lhs->end_version() > rhs->end_version(); }); diff --git a/be/src/olap/tablet.h b/be/src/olap/tablet.h index 29d7209b906..775bfa9262b 100644 --- a/be/src/olap/tablet.h +++ b/be/src/olap/tablet.h @@ -469,8 +469,8 @@ public: DeleteBitmapPtr delete_bitmap, int64_t version, CalcDeleteBitmapToken* token, RowsetWriter* rowset_writer = nullptr); - std::vector<RowsetSharedPtr> get_rowset_by_ids(const RowsetIdUnorderedSet* specified_rowset_ids, - bool include_stale = false); + std::vector<RowsetSharedPtr> get_rowset_by_ids( + const RowsetIdUnorderedSet* specified_rowset_ids); Status calc_segment_delete_bitmap(RowsetSharedPtr rowset, const segment_v2::SegmentSharedPtr& seg, diff --git a/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy new file mode 100644 index 00000000000..befad64da0a --- /dev/null +++ b/regression-test/suites/fault_injection_p2/test_partial_update_rowset_not_found_fault_injection.groovy @@ -0,0 +1,112 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_partial_update_rowset_not_found_fault_injection", "p2,nonConcurrent") { + def testTable = "test_partial_update_rowset_not_found_fault_injection" + sql """ DROP TABLE IF EXISTS ${testTable}""" + sql """ + create table ${testTable} + ( + `k1` INT, + `v1` INT NOT NULL, + `v2` INT NOT NULL, + `v3` INT NOT NULL, + `v4` INT NOT NULL, + `v5` INT NOT NULL, + `v6` INT NOT NULL, + `v7` INT NOT NULL, + `v8` INT NOT NULL, + `v9` INT NOT NULL, + `v10` INT NOT NULL + ) + UNIQUE KEY (`k1`) + DISTRIBUTED BY HASH(`k1`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "disable_auto_compaction" = "true" + ); + """ + + def load_data = { + streamLoad { + table 'test_partial_update_rowset_not_found_fault_injection' + set 'column_separator', ',' + set 'compress_type', 'GZ' + + + file """${getS3Url()}/regression/fault_injection/test_partial_update_rowset_not_found_falut_injection1.csv.gz""" + + time 300000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + assertEquals(json.NumberTotalRows, json.NumberLoadedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + + def backendId_to_backendIP = [:] + def backendId_to_backendHttpPort = [:] + def backendId_to_params = [string:[:]] + + getBackendIpHttpPort(backendId_to_backendIP, backendId_to_backendHttpPort); + + load_data() + def error = false + + + GetDebugPoint().clearDebugPointsForAllBEs() + try { + GetDebugPoint().enableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep") + GetDebugPoint().enableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets") + def thread = Thread.start{ + try { + sql """update ${testTable} set v10=1""" + } + catch (Exception e){ + logger.info(e.getMessage()) + error = true + } + } + + Thread.sleep(2000) + // trigger compactions for all tablets in ${tableName} + def tablets = sql_return_maparray """ show tablets from ${testTable}; """ + for (def tablet in tablets) { + String tablet_id = tablet.TabletId + backend_id = tablet.BackendId + (code, out, err) = be_run_cumulative_compaction(backendId_to_backendIP.get(backend_id), backendId_to_backendHttpPort.get(backend_id), tablet_id) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assertEquals(code, 0) + } + + thread.join() + assertFalse(error) + } catch (Exception e){ + logger.info(e.getMessage()) + assertFalse(true) + } finally { + GetDebugPoint().disableDebugPointForAllBEs("VerticalSegmentWriter._append_block_with_partial_content.sleep") + GetDebugPoint().disableDebugPointForAllBEs("SizeBaseCumulativeCompactionPolicy.pick_input_rowsets.return_input_rowsets") + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org