This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 84f61c986bc branch-3.0-pick: [Fix](cloud) Fix dup key problem when `enable_new_tablet_do_compaction=true` (#48399) (#49019) 84f61c986bc is described below commit 84f61c986bc6596e5b83e24d9e59a6df214fb553 Author: bobhan1 <bao...@selectdb.com> AuthorDate: Fri Mar 14 09:59:59 2025 +0800 branch-3.0-pick: [Fix](cloud) Fix dup key problem when `enable_new_tablet_do_compaction=true` (#48399) (#49019) pick https://github.com/apache/doris/pull/48399 --- be/src/cloud/cloud_compaction_stop_token.cpp | 125 ++++++++++++++++++ be/src/cloud/cloud_compaction_stop_token.h | 45 +++++++ .../cloud/cloud_cumulative_compaction_policy.cpp | 7 +- be/src/cloud/cloud_schema_change_job.cpp | 7 + be/src/cloud/cloud_storage_engine.cpp | 65 ++++++++++ be/src/cloud/cloud_storage_engine.h | 8 ++ cloud/src/meta-service/meta_service_job.cpp | 79 +++++++++++- gensrc/proto/cloud.proto | 1 + .../cloud/test_cloud_mow_new_tablet_compaction.out | Bin 0 -> 206 bytes .../test_cloud_mow_new_tablet_compaction.groovy | 143 +++++++++++++++++++++ 10 files changed, 477 insertions(+), 3 deletions(-) diff --git a/be/src/cloud/cloud_compaction_stop_token.cpp b/be/src/cloud/cloud_compaction_stop_token.cpp new file mode 100644 index 00000000000..9d6f1b614a6 --- /dev/null +++ b/be/src/cloud/cloud_compaction_stop_token.cpp @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_compaction_stop_token.h" + +#include "cloud/cloud_meta_mgr.h" +#include "cloud/config.h" +#include "common/logging.h" +#include "gen_cpp/cloud.pb.h" + +namespace doris { + +CloudCompactionStopToken::CloudCompactionStopToken(CloudStorageEngine& engine, + CloudTabletSPtr tablet, int64_t initiator) + : _engine {engine}, _tablet {std::move(tablet)}, _initiator(initiator) { + auto uuid = UUIDGenerator::instance()->next_uuid(); + std::stringstream ss; + ss << uuid; + _uuid = ss.str(); +} + +void CloudCompactionStopToken::do_lease() { + cloud::TabletJobInfoPB job; + auto* idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto* compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + using namespace std::chrono; + int64_t lease_time = duration_cast<seconds>(system_clock::now().time_since_epoch()).count() + + (config::lease_compaction_interval_seconds * 4); + compaction_job->set_lease(lease_time); + auto st = _engine.meta_mgr().lease_tablet_job(job); + if (!st.ok()) { + LOG_WARNING("failed to lease compaction stop token") + .tag("job_id", _uuid) + .tag("delete_bitmap_lock_initiator", _initiator) + .tag("tablet_id", _tablet->tablet_id()) + .error(st); + } +} + +Status CloudCompactionStopToken::do_register() { + int64_t base_compaction_cnt = 0; + int64_t cumulative_compaction_cnt = 0; + { + std::lock_guard lock {_tablet->get_header_lock()}; + base_compaction_cnt = _tablet->base_compaction_cnt(); + cumulative_compaction_cnt = _tablet->cumulative_compaction_cnt(); + } + cloud::TabletJobInfoPB job; + auto* idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto* compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + compaction_job->set_delete_bitmap_lock_initiator(_initiator); + compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + compaction_job->set_type(cloud::TabletCompactionJobPB::STOP_TOKEN); + // required by MS to check if it's a valid compaction job + compaction_job->set_base_compaction_cnt(base_compaction_cnt); + compaction_job->set_cumulative_compaction_cnt(cumulative_compaction_cnt); + using namespace std::chrono; + int64_t now = duration_cast<seconds>(system_clock::now().time_since_epoch()).count(); + compaction_job->set_expiration(now + config::compaction_timeout_seconds); + compaction_job->set_lease(now + (config::lease_compaction_interval_seconds * 4)); + cloud::StartTabletJobResponse resp; + auto st = _engine.meta_mgr().prepare_tablet_job(job, &resp); + if (!st.ok()) { + LOG_WARNING("failed to register compaction stop token") + .tag("job_id", _uuid) + .tag("delete_bitmap_lock_initiator", _initiator) + .tag("tablet_id", _tablet->tablet_id()) + .error(st); + } + return st; +} + +Status CloudCompactionStopToken::do_unregister() { + cloud::TabletJobInfoPB job; + auto* idx = job.mutable_idx(); + idx->set_tablet_id(_tablet->tablet_id()); + idx->set_table_id(_tablet->table_id()); + idx->set_index_id(_tablet->index_id()); + idx->set_partition_id(_tablet->partition_id()); + auto* compaction_job = job.add_compaction(); + compaction_job->set_id(_uuid); + compaction_job->set_delete_bitmap_lock_initiator(_initiator); + compaction_job->set_initiator(BackendOptions::get_localhost() + ':' + + std::to_string(config::heartbeat_service_port)); + compaction_job->set_type(cloud::TabletCompactionJobPB::STOP_TOKEN); + auto st = _engine.meta_mgr().abort_tablet_job(job); + if (!st.ok()) { + LOG_WARNING("failed to unregister compaction stop token") + .tag("job_id", _uuid) + .tag("delete_bitmap_lock_initiator", _initiator) + .tag("tablet_id", _tablet->tablet_id()) + .error(st); + } + return st; +} + +int64_t CloudCompactionStopToken::initiator() const { + return _initiator; +} +} // namespace doris diff --git a/be/src/cloud/cloud_compaction_stop_token.h b/be/src/cloud/cloud_compaction_stop_token.h new file mode 100644 index 00000000000..ce61ebc3747 --- /dev/null +++ b/be/src/cloud/cloud_compaction_stop_token.h @@ -0,0 +1,45 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <string> + +#include "cloud/cloud_storage_engine.h" +#include "cloud/cloud_tablet.h" + +namespace doris { + +class CloudCompactionStopToken { +public: + CloudCompactionStopToken(CloudStorageEngine& engine, CloudTabletSPtr tablet, int64_t initiator); + ~CloudCompactionStopToken() = default; + + void do_lease(); + Status do_register(); + Status do_unregister(); + + int64_t initiator() const; + +private: + CloudStorageEngine& _engine; + CloudTabletSPtr _tablet; + std::string _uuid; + int64_t _initiator; +}; + +} // namespace doris diff --git a/be/src/cloud/cloud_cumulative_compaction_policy.cpp b/be/src/cloud/cloud_cumulative_compaction_policy.cpp index 9e3ca3eb3db..c49448e1998 100644 --- a/be/src/cloud/cloud_cumulative_compaction_policy.cpp +++ b/be/src/cloud/cloud_cumulative_compaction_policy.cpp @@ -66,8 +66,13 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets( input_rowsets->push_back(rowset); } } + LOG_INFO( + "[CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_" + "input_rowsets] tablet_id={}, start={}, end={}, " + "input_rowsets->size()={}", + target_tablet_id, start_version, end_version, input_rowsets->size()); + return input_rowsets->size(); } - return input_rowsets->size(); }) size_t promotion_size = cloud_promotion_size(tablet); diff --git a/be/src/cloud/cloud_schema_change_job.cpp b/be/src/cloud/cloud_schema_change_job.cpp index a109640e467..79a502587f6 100644 --- a/be/src/cloud/cloud_schema_change_job.cpp +++ b/be/src/cloud/cloud_schema_change_job.cpp @@ -418,6 +418,11 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, .tag("out_rowset_size", _output_rowsets.size()) .tag("start_calc_delete_bitmap_version", start_calc_delete_bitmap_version) .tag("alter_version", alter_version); + RETURN_IF_ERROR(_cloud_storage_engine.register_compaction_stop_token(_new_tablet, initiator)); + Defer defer {[&]() { + static_cast<void>(_cloud_storage_engine.unregister_compaction_stop_token(_new_tablet)); + }}; + TabletMetaSharedPtr tmp_meta = std::make_shared<TabletMeta>(*(_new_tablet->tablet_meta())); tmp_meta->delete_bitmap().delete_bitmap.clear(); std::shared_ptr<CloudTablet> tmp_tablet = @@ -437,6 +442,8 @@ Status CloudSchemaChangeJob::_process_delete_bitmap(int64_t alter_version, if (max_version >= start_calc_delete_bitmap_version) { RETURN_IF_ERROR(tmp_tablet->capture_consistent_rowsets_unlocked( {start_calc_delete_bitmap_version, max_version}, &incremental_rowsets)); + DBUG_EXECUTE_IF("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock", + DBUG_BLOCK); for (auto rowset : incremental_rowsets) { RETURN_IF_ERROR(CloudTablet::update_delete_bitmap_without_lock(tmp_tablet, rowset)); } diff --git a/be/src/cloud/cloud_storage_engine.cpp b/be/src/cloud/cloud_storage_engine.cpp index 9c403ac8e3b..52ab28b52c3 100644 --- a/be/src/cloud/cloud_storage_engine.cpp +++ b/be/src/cloud/cloud_storage_engine.cpp @@ -25,9 +25,11 @@ #include <rapidjson/stringbuffer.h> #include <algorithm> +#include <mutex> #include <variant> #include "cloud/cloud_base_compaction.h" +#include "cloud/cloud_compaction_stop_token.h" #include "cloud/cloud_cumulative_compaction.h" #include "cloud/cloud_cumulative_compaction_policy.h" #include "cloud/cloud_full_compaction.h" @@ -37,6 +39,8 @@ #include "cloud/cloud_txn_delete_bitmap_cache.h" #include "cloud/cloud_warm_up_manager.h" #include "cloud/config.h" +#include "common/config.h" +#include "common/status.h" #include "io/cache/block_file_cache_downloader.h" #include "io/cache/block_file_cache_factory.h" #include "io/cache/file_cache_common.h" @@ -758,6 +762,7 @@ void CloudStorageEngine::_lease_compaction_thread_callback() { std::vector<std::shared_ptr<CloudFullCompaction>> full_compactions; std::vector<std::shared_ptr<CloudBaseCompaction>> base_compactions; std::vector<std::shared_ptr<CloudCumulativeCompaction>> cumu_compactions; + std::vector<std::shared_ptr<CloudCompactionStopToken>> compation_stop_tokens; { std::lock_guard lock(_compaction_mtx); for (auto& [_, base] : _submitted_base_compactions) { @@ -775,8 +780,16 @@ void CloudStorageEngine::_lease_compaction_thread_callback() { full_compactions.push_back(full); } } + for (auto& [_, stop_token] : _active_compaction_stop_tokens) { + if (stop_token) { + compation_stop_tokens.push_back(stop_token); + } + } } // TODO(plat1ko): Support batch lease rpc + for (auto& stop_token : compation_stop_tokens) { + stop_token->do_lease(); + } for (auto& comp : full_compactions) { comp->do_lease(); } @@ -854,5 +867,57 @@ std::shared_ptr<CloudCumulativeCompactionPolicy> CloudStorageEngine::cumu_compac return _cumulative_compaction_policies.at(compaction_policy); } +Status CloudStorageEngine::register_compaction_stop_token(CloudTabletSPtr tablet, + int64_t initiator) { + { + std::lock_guard lock(_compaction_mtx); + auto [_, success] = _active_compaction_stop_tokens.emplace(tablet->tablet_id(), nullptr); + if (!success) { + return Status::AlreadyExist("stop token already exists for tablet_id={}", + tablet->tablet_id()); + } + } + + auto stop_token = std::make_shared<CloudCompactionStopToken>(*this, tablet, initiator); + auto st = stop_token->do_register(); + + if (!st.ok()) { + std::lock_guard lock(_compaction_mtx); + _active_compaction_stop_tokens.erase(tablet->tablet_id()); + return st; + } + + { + std::lock_guard lock(_compaction_mtx); + _active_compaction_stop_tokens[tablet->tablet_id()] = stop_token; + } + LOG_INFO( + "successfully register compaction stop token for tablet_id={}, " + "delete_bitmap_lock_initiator={}", + tablet->tablet_id(), initiator); + return st; +} + +Status CloudStorageEngine::unregister_compaction_stop_token(CloudTabletSPtr tablet) { + std::shared_ptr<CloudCompactionStopToken> stop_token; + { + std::lock_guard lock(_compaction_mtx); + if (auto it = _active_compaction_stop_tokens.find(tablet->tablet_id()); + it != _active_compaction_stop_tokens.end()) { + stop_token = it->second; + } else { + return Status::NotFound("stop token not found for tablet_id={}", tablet->tablet_id()); + } + _active_compaction_stop_tokens.erase(tablet->tablet_id()); + } + // stop token will be removed when SC commit or abort + // RETURN_IF_ERROR(stop_token->do_unregister()); + LOG_INFO( + "successfully unregister compaction stop token for tablet_id={}, " + "delete_bitmap_lock_initiator={}", + tablet->tablet_id(), stop_token->initiator()); + return Status::OK(); +} + #include "common/compile_check_end.h" } // namespace doris diff --git a/be/src/cloud/cloud_storage_engine.h b/be/src/cloud/cloud_storage_engine.h index 5e51285d93a..6381fbe6001 100644 --- a/be/src/cloud/cloud_storage_engine.h +++ b/be/src/cloud/cloud_storage_engine.h @@ -44,6 +44,7 @@ class CloudBaseCompaction; class CloudFullCompaction; class TabletHotspot; class CloudWarmUpManager; +class CloudCompactionStopToken; class CloudStorageEngine final : public BaseStorageEngine { public: @@ -143,6 +144,10 @@ public: return *_sync_load_for_tablets_thread_pool; } + Status register_compaction_stop_token(CloudTabletSPtr tablet, int64_t initiator); + + Status unregister_compaction_stop_token(CloudTabletSPtr tablet); + private: void _refresh_storage_vault_info_thread_callback(); void _vacuum_stale_rowsets_thread_callback(); @@ -188,6 +193,9 @@ private: // tablet_id -> submitted cumu compactions, guarded by `_compaction_mtx` std::unordered_map<int64_t, std::vector<std::shared_ptr<CloudCumulativeCompaction>>> _submitted_cumu_compactions; + // tablet_id -> active compaction stop tokens + std::unordered_map<int64_t, std::shared_ptr<CloudCompactionStopToken>> + _active_compaction_stop_tokens; std::unique_ptr<ThreadPool> _base_compaction_thread_pool; std::unique_ptr<ThreadPool> _cumu_compaction_thread_pool; diff --git a/cloud/src/meta-service/meta_service_job.cpp b/cloud/src/meta-service/meta_service_job.cpp index 5299b85f41d..99b61e7ebf0 100644 --- a/cloud/src/meta-service/meta_service_job.cpp +++ b/cloud/src/meta-service/meta_service_job.cpp @@ -20,6 +20,7 @@ #include <gen_cpp/cloud.pb.h> #include <glog/logging.h> +#include <algorithm> #include <chrono> #include <cstddef> #include <sstream> @@ -61,7 +62,8 @@ bool check_compaction_input_verions(const TabletCompactionJobPB& compaction, if (!job_pb.has_schema_change() || !job_pb.schema_change().has_alter_version()) { return true; } - if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE) { + if (compaction.type() == TabletCompactionJobPB::EMPTY_CUMULATIVE || + compaction.type() == TabletCompactionJobPB::STOP_TOKEN) { return true; } if (compaction.input_versions_size() != 2 || @@ -192,11 +194,27 @@ void start_compaction_job(MetaServiceCode& code, std::string& msg, std::stringst }), compactions.end()); // clang-format on // Check conflict job + if (std::ranges::any_of(compactions, [](const auto& c) { + return c.type() == TabletCompactionJobPB::STOP_TOKEN; + })) { + auto it = std::ranges::find_if(compactions, [](const auto& c) { + return c.type() == TabletCompactionJobPB::STOP_TOKEN; + }); + msg = fmt::format( + "compactions are not allowed on tablet_id={} currently, blocked by schema " + "change job delete_bitmap_initiator={}", + tablet_id, it->delete_bitmap_lock_initiator()); + code = MetaServiceCode::JOB_TABLET_BUSY; + return; + } if (compaction.type() == TabletCompactionJobPB::FULL) { // Full compaction is generally used for data correctness repair // for MOW table, so priority should be given to performing full // compaction operations and canceling other types of compaction. compactions.Clear(); + } else if (compaction.type() == TabletCompactionJobPB::STOP_TOKEN) { + // fail all existing compactions + compactions.Clear(); } else if ((!compaction.has_check_input_versions_range() && compaction.input_versions().empty()) || (compaction.has_check_input_versions_range() && @@ -1111,6 +1129,25 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str auto job_val = recorded_job.SerializeAsString(); txn->put(job_key, job_val); if (!new_tablet_job_val.empty()) { + auto& compactions = *new_recorded_job.mutable_compaction(); + auto origin_size = compactions.size(); + compactions.erase( + std::remove_if( + compactions.begin(), compactions.end(), + [&](auto& c) { + return c.has_delete_bitmap_lock_initiator() && + c.delete_bitmap_lock_initiator() == + schema_change.delete_bitmap_lock_initiator(); + }), + compactions.end()); + if (compactions.size() < origin_size) { + INSTANCE_LOG(INFO) + << "remove " << (origin_size - compactions.size()) + << " STOP_TOKEN for schema_change job tablet_id=" << tablet_id + << " delete_bitmap_lock_initiator=" + << schema_change.delete_bitmap_lock_initiator() + << " key=" << hex(job_key); + } new_recorded_job.clear_schema_change(); new_tablet_job_val = new_recorded_job.SerializeAsString(); txn->put(new_tablet_job_key, new_tablet_job_val); @@ -1150,7 +1187,28 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str return; } if (schema_change.alter_version() < 2) { // no need to update stats - // TODO(cyx): clear schema_change job? + // TODO(cyx): clear schema_change job? + if (!new_tablet_job_val.empty()) { + auto& compactions = *new_recorded_job.mutable_compaction(); + auto origin_size = compactions.size(); + compactions.erase( + std::remove_if(compactions.begin(), compactions.end(), + [&](auto& c) { + return c.has_delete_bitmap_lock_initiator() && + c.delete_bitmap_lock_initiator() == + schema_change.delete_bitmap_lock_initiator(); + }), + compactions.end()); + if (compactions.size() < origin_size) { + INSTANCE_LOG(INFO) + << "remove " << (origin_size - compactions.size()) + << " STOP_TOKEN for schema_change job tablet_id=" << tablet_id + << " delete_bitmap_lock_initiator=" + << schema_change.delete_bitmap_lock_initiator() << " key=" << hex(job_key); + } + new_tablet_job_val = new_recorded_job.SerializeAsString(); + txn->put(new_tablet_job_key, new_tablet_job_val); + } need_commit = true; return; } @@ -1287,6 +1345,23 @@ void process_schema_change_job(MetaServiceCode& code, std::string& msg, std::str auto job_val = recorded_job.SerializeAsString(); txn->put(job_key, job_val); if (!new_tablet_job_val.empty()) { + auto& compactions = *new_recorded_job.mutable_compaction(); + auto origin_size = compactions.size(); + compactions.erase( + std::remove_if(compactions.begin(), compactions.end(), + [&](auto& c) { + return c.has_delete_bitmap_lock_initiator() && + c.delete_bitmap_lock_initiator() == + schema_change.delete_bitmap_lock_initiator(); + }), + compactions.end()); + if (compactions.size() < origin_size) { + INSTANCE_LOG(INFO) << "remove " << (origin_size - compactions.size()) + << " STOP_TOKEN for schema_change job tablet_id=" << tablet_id + << " delete_bitmap_lock_initiator=" + << schema_change.delete_bitmap_lock_initiator() + << " key=" << hex(job_key); + } new_recorded_job.clear_schema_change(); new_tablet_job_val = new_recorded_job.SerializeAsString(); txn->put(new_tablet_job_key, new_tablet_job_val); diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index ff0279990ee..28ec3ba67d8 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -514,6 +514,7 @@ message TabletCompactionJobPB { CUMULATIVE = 2; EMPTY_CUMULATIVE = 3; // just update cumulative point FULL = 4; + STOP_TOKEN = 5; // fail existing compactions and deny newly incomming compactions } // IP and port of the node which initiates this job optional string initiator = 1; // prepare diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out new file mode 100644 index 00000000000..4f02f6683a2 Binary files /dev/null and b/regression-test/data/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.out differ diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy new file mode 100644 index 00000000000..467e9fddb43 --- /dev/null +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_mow_new_tablet_compaction.groovy @@ -0,0 +1,143 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.util.concurrent.TimeUnit +import org.awaitility.Awaitility + +suite("test_cloud_mow_new_tablet_compaction", "nonConcurrent") { + if (!isCloudMode()) { + return + } + + GetDebugPoint().clearDebugPointsForAllFEs() + GetDebugPoint().clearDebugPointsForAllBEs() + + def customBeConfig = [ + enable_new_tablet_do_compaction : true + ] + + setBeConfigTemporary(customBeConfig) { + def table1 = "test_cloud_mow_new_tablet_compaction" + sql "DROP TABLE IF EXISTS ${table1} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table1} ( + `k1` int NOT NULL, + `c1` int, + `c2` int, + `c3` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table1} values(1,1,1,1);" + sql "insert into ${table1} values(2,2,2,2);" + sql "insert into ${table1} values(3,3,3,2);" + sql "sync;" + qt_sql "select * from ${table1} order by k1;" + + def backends = sql_return_maparray('show backends') + def tabletStats = sql_return_maparray("show tablets from ${table1};") + assert tabletStats.size() == 1 + def tabletId = tabletStats[0].TabletId + def tabletBackendId = tabletStats[0].BackendId + def tabletBackend + for (def be : backends) { + if (be.BackendId == tabletBackendId) { + tabletBackend = be + break; + } + } + logger.info("tablet ${tabletId} on backend ${tabletBackend.Host} with backendId=${tabletBackend.BackendId}"); + + try { + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + GetDebugPoint().enableDebugPointForAllBEs("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock") + sql "alter table ${table1} modify column c1 varchar(100);" + + Thread.sleep(3000) + + tabletStats = sql_return_maparray("show tablets from ${table1};") + def newTabletId = "-1" + for (def stat : tabletStats) { + if (stat.TabletId != tabletId) { + newTabletId = stat.TabletId + break + } + } + + logger.info("new_tablet_id=${newTabletId}") + + int start_ver = 5 + int end_ver = 4 + + // these load will skip to calculate bitmaps in publish phase on new tablet because it's in NOT_READY state + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + GetDebugPoint().enableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + def threads = [] + threads << Thread.start { sql "insert into ${table1} values(1,99,99,99),(3,99,99,99);"} + ++end_ver + Thread.sleep(200) + threads << Thread.start { sql "insert into ${table1} values(5,88,88,88),(1,88,88,88);" } + ++end_ver + Thread.sleep(200) + threads << Thread.start { sql "insert into ${table1} values(3,77,77,77),(5,77,77,77);" } + ++end_ver + Thread.sleep(2000) + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.enable_spin_wait") + GetDebugPoint().disableDebugPointForAllFEs("CloudGlobalTransactionMgr.getDeleteBitmapUpdateLock.block") + threads.each { it.join() } + + + // let sc capture these rowsets when calculating increment rowsets without lock + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_convert_historical_rowsets.block") + Thread.sleep(1000) + + // do cumu compaction on these rowsets on new tablet + // this can happen when enable_new_tablet_do_compaction=true + GetDebugPoint().enableDebugPointForAllBEs("CloudSizeBasedCumulativeCompactionPolicy::pick_input_rowsets.set_input_rowsets", + [tablet_id:"${newTabletId}", start_version: start_ver, end_version: end_ver]); + { + // trigger cumu compaction, should fail + logger.info("trigger cumu compaction on tablet=${newTabletId} BE.Host=${tabletBackend.Host} with backendId=${tabletBackend.BackendId}") + def (code, out, err) = be_run_cumulative_compaction(tabletBackend.Host, tabletBackend.HttpPort, newTabletId) + logger.info("Run compaction: code=" + code + ", out=" + out + ", err=" + err) + assert code == 0 + def compactJson = parseJson(out.trim()) + assert "success" != compactJson.status.toLowerCase() + } + + GetDebugPoint().disableDebugPointForAllBEs("CloudSchemaChangeJob::_process_delete_bitmap.after.capture_without_lock") + // wait for sc to finish + waitForSchemaChangeDone { + sql """ SHOW ALTER TABLE COLUMN WHERE TableName='${table1}' ORDER BY createtime DESC LIMIT 1 """ + time 1000 + } + + qt_dup_key_count "select k1,count() as cnt from ${table1} group by k1 having cnt>1;" + order_qt_sql "select * from ${table1};" + + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org