gavinchou commented on code in PR #32804: URL: https://github.com/apache/doris/pull/32804#discussion_r1570412055
########## be/src/cloud/config.h: ########## @@ -88,4 +88,6 @@ DECLARE_mInt32(schedule_sync_tablets_interval_s); // Cloud mow DECLARE_mInt32(mow_stream_load_commit_retry_times); +DECLARE_mInt32(sync_load_for_tablets_thread); Review Comment: add a comment ########## be/src/cloud/cloud_tablet.cpp: ########## @@ -191,13 +192,40 @@ void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_ auto add_rowsets_directly = [=, this](std::vector<RowsetSharedPtr>& rowsets) { for (auto& rs : rowsets) { + if (version_overlap || warmup_delta_data) { +#ifndef BE_TEST Review Comment: try to use TEST_SYNC_POINT_RETURN() in stead of ifdef ########## be/src/cloud/cloud_backend_service.cpp: ########## @@ -45,4 +57,137 @@ Status CloudBackendService::create_service(CloudStorageEngine& engine, ExecEnv* return Status::OK(); } +void CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&, + const TSyncLoadForTabletsRequest& request) { + auto f = [this, tablet_ids = request.tablet_ids]() { + std::for_each(tablet_ids.cbegin(), tablet_ids.cend(), [this](int64_t tablet_id) { + CloudTabletSPtr tablet; + auto result = _engine.tablet_mgr().get_tablet(tablet_id, true); + if (!result.has_value()) { + return; + } + Status st = result.value()->sync_rowsets(-1, true); + if (!st.ok()) { + LOG_WARNING("failed to sync load for tablet").error(st); + } + }); + }; + static_cast<void>(_exec_env->sync_load_for_tablets_thread_pool()->submit_func(std::move(f))); +} + +void CloudBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response, + const TGetTopNHotPartitionsRequest& request) { + TabletHotspot::instance()->get_top_n_hot_partition(&response.hot_tables); + response.file_cache_size = io::FileCacheFactory::instance()->get_capacity(); + response.__isset.hot_tables = !response.hot_tables.empty(); +} + +void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, + const TWarmUpTabletsRequest& request) { + Status st; + auto* manager = CloudWarmUpManager::instance(); + switch (request.type) { + case TWarmUpTabletsRequestType::SET_JOB: { + LOG_INFO("receive the warm up request.") + .tag("request_type", "SET_JOB") + .tag("job_id", request.job_id); + st = manager->check_and_set_job_id(request.job_id); + if (!st) { + LOG_WARNING("SET_JOB failed.").error(st); + break; + } + [[fallthrough]]; + } + case TWarmUpTabletsRequestType::SET_BATCH: { + LOG_INFO("receive the warm up request.") + .tag("request_type", "SET_BATCH") + .tag("job_id", request.job_id) + .tag("batch_id", request.batch_id) + .tag("jobs size", request.job_metas.size()); + bool retry = false; + st = manager->check_and_set_batch_id(request.job_id, request.batch_id, &retry); + if (!retry && st) { + manager->add_job(request.job_metas); + } else { + if (retry) { + LOG_WARNING("retry the job.") + .tag("job_id", request.job_id) + .tag("batch_id", request.batch_id); + } else { + LOG_WARNING("SET_BATCH failed.").error(st); + } + } + break; + } + case TWarmUpTabletsRequestType::GET_CURRENT_JOB_STATE_AND_LEASE: { + auto [job_id, batch_id, pending_job_size, finish_job_size] = + manager->get_current_job_state(); + LOG_INFO("receive the warm up request.") + .tag("request_type", "GET_CURRENT_JOB_STATE_AND_LEASE") + .tag("job_id", job_id) + .tag("batch_id", batch_id) + .tag("pending_job_size", pending_job_size) + .tag("finish_job_size", finish_job_size); + response.__set_job_id(job_id); + response.__set_batch_id(batch_id); + response.__set_pending_job_size(pending_job_size); + response.__set_finish_job_size(finish_job_size); + break; + } + case TWarmUpTabletsRequestType::CLEAR_JOB: { + LOG_INFO("receive the warm up request.") + .tag("request_type", "CLEAR_JOB") + .tag("job_id", request.job_id); + st = manager->clear_job(request.job_id); + break; + } + default: + DCHECK(false); + }; + st.to_thrift(&response.status); +} + +void CloudBackendService::pre_cache_async(TPreCacheAsyncResponse& response, + const TPreCacheAsyncRequest& request) { + std::string brpc_addr = fmt::format("{}:{}", request.host, request.brpc_port); Review Comment: Just use the name `warm_up_cache_async()`, `pre_cache` is hard to under stand ########## be/src/cloud/cloud_backend_service.h: ########## @@ -35,6 +35,22 @@ class CloudBackendService final : public BaseBackendService { // TODO(plat1ko): cloud backend functions + // If another cluster load, FE need to notify the cluster to sync the load data + void sync_load_for_tablets(TSyncLoadForTabletsResponse& response, + const TSyncLoadForTabletsRequest& request) override; + + void get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response, + const TGetTopNHotPartitionsRequest& request) override; + + void warm_up_tablets(TWarmUpTabletsResponse& response, + const TWarmUpTabletsRequest& request) override; + + void pre_cache_async(TPreCacheAsyncResponse& response, + const TPreCacheAsyncRequest& request) override; + + void check_pre_cache(TCheckPreCacheResponse& response, + const TCheckPreCacheRequest& request) override; + Review Comment: Add enough comments for each rpc. ########## be/src/cloud/cloud_internal_service.h: ########## @@ -36,6 +36,11 @@ class CloudInternalServiceImpl final : public PInternalService { PAlterVaultSyncResponse* response, google::protobuf::Closure* done) override; + void get_file_cache_meta_by_tablet_id(google::protobuf::RpcController* controller, Review Comment: Add comment for behavior description ########## fe/fe-core/src/main/java/org/apache/doris/cluster/ClusterNamespace.java: ########## @@ -32,6 +32,10 @@ public class ClusterNamespace { private static final String CLUSTER_DELIMITER = ":"; + public static String getFullName(String cluster, String name) { + return linkString(cluster, name); + } Review Comment: the form of `cluster:db` is deprecated for new versions of doris. why do we need this? ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java: ########## @@ -438,4 +452,63 @@ public void replayUpdateCloudReplica(UpdateCloudReplicaInfo info) throws MetaNot public boolean getEnableStorageVault() { return this.enableStorageVault; } + + public long loadCloudWarmUpJob(DataInputStream dis, long checksum) throws Exception { + // same as loadAlterJob + + int size = dis.readInt(); + long newChecksum = checksum ^ size; + if (size > 0) { + // There should be no old cloudWarmUp jobs, if exist throw exception, should not use this FE version + throw new IOException("There are [" + size + "] cloud warm up jobs." + + " Please downgrade FE to an older version and handle residual jobs"); + } + + // finished or cancelled jobs + size = dis.readInt(); + newChecksum ^= size; + if (size > 0) { + throw new IOException("There are [" + size + "] old finished or cancelled cloud warm up jobs." + + " Please downgrade FE to an older version and handle residual jobs"); + } + + size = dis.readInt(); + newChecksum ^= size; + for (int i = 0; i < size; i++) { + CloudWarmUpJob cloudWarmUpJob = CloudWarmUpJob.read(dis); + if (cloudWarmUpJob.isExpire() || cloudWarmUpJob.getJobState() == JobState.DELETED) { + LOG.info("cloud warm up job is expired, {}, ignore it", cloudWarmUpJob.getJobId()); + continue; + } + this.getCacheHotspotMgr().addCloudWarmUpJob(cloudWarmUpJob); + } + LOG.info("finished replay cloud warm up job from image"); + return newChecksum; + } + + public long saveCloudWarmUpJob(CountingDataOutputStream dos, long checksum) throws IOException { + // same as saveAlterJob + + Map<Long, CloudWarmUpJob> cloudWarmUpJobs; + cloudWarmUpJobs = this.getCacheHotspotMgr().getCloudWarmUpJobs(); + + int size = 0; + checksum ^= size; + dos.writeInt(size); + + checksum ^= size; + dos.writeInt(size); Review Comment: why do we write and checksum 0 twice here? add comment if it is by intent ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java: ########## @@ -438,4 +452,63 @@ public void replayUpdateCloudReplica(UpdateCloudReplicaInfo info) throws MetaNot public boolean getEnableStorageVault() { return this.enableStorageVault; } + + public long loadCloudWarmUpJob(DataInputStream dis, long checksum) throws Exception { + // same as loadAlterJob Review Comment: remove unnecessary comment.... ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java: ########## @@ -438,4 +452,63 @@ public void replayUpdateCloudReplica(UpdateCloudReplicaInfo info) throws MetaNot public boolean getEnableStorageVault() { return this.enableStorageVault; } + + public long loadCloudWarmUpJob(DataInputStream dis, long checksum) throws Exception { + // same as loadAlterJob + + int size = dis.readInt(); + long newChecksum = checksum ^ size; + if (size > 0) { + // There should be no old cloudWarmUp jobs, if exist throw exception, should not use this FE version + throw new IOException("There are [" + size + "] cloud warm up jobs." + + " Please downgrade FE to an older version and handle residual jobs"); + } + + // finished or cancelled jobs + size = dis.readInt(); + newChecksum ^= size; + if (size > 0) { + throw new IOException("There are [" + size + "] old finished or cancelled cloud warm up jobs." + + " Please downgrade FE to an older version and handle residual jobs"); + } + + size = dis.readInt(); + newChecksum ^= size; + for (int i = 0; i < size; i++) { + CloudWarmUpJob cloudWarmUpJob = CloudWarmUpJob.read(dis); + if (cloudWarmUpJob.isExpire() || cloudWarmUpJob.getJobState() == JobState.DELETED) { + LOG.info("cloud warm up job is expired, {}, ignore it", cloudWarmUpJob.getJobId()); + continue; + } + this.getCacheHotspotMgr().addCloudWarmUpJob(cloudWarmUpJob); + } + LOG.info("finished replay cloud warm up job from image"); Review Comment: it is not replay, it is just a load from the image. ########## fe/fe-core/src/main/java/org/apache/doris/catalog/OlapTable.java: ########## @@ -2024,6 +2024,10 @@ public String getEstimatePartitionSize() { return ""; } + public boolean containPartition(String partitionName) { Review Comment: naming `containPartition` -> `containsPartition` ########## be/src/cloud/cloud_warm_up_manager.cpp: ########## Review Comment: lack of comment ########## fe/fe-core/src/main/java/org/apache/doris/analysis/CancelCloudWarmUpStmt.java: ########## @@ -0,0 +1,85 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.analysis.BinaryPredicate.Operator; +import org.apache.doris.common.AnalysisException; + +public class CancelCloudWarmUpStmt extends CancelStmt { + private Expr whereClause; + private long jobId; + + public CancelCloudWarmUpStmt(Expr whereClause) { + this.whereClause = whereClause; + } + + public long getJobId() { + return jobId; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException { Review Comment: throw exception if not cloud mode ########## be/src/cloud/cloud_internal_service.cpp: ########## @@ -34,4 +37,68 @@ void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* _engine.sync_storage_vault(); } +FileCacheType cache_type_to_pb(io::FileCacheType type) { + switch (type) { + case io::FileCacheType::TTL: + return FileCacheType::TTL; + break; + case io::FileCacheType::INDEX: + return FileCacheType::INDEX; + break; + case io::FileCacheType::NORMAL: + return FileCacheType::NORMAL; + break; + default: + DCHECK(false); + } + return FileCacheType::NORMAL; +} + +void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( + google::protobuf::RpcController* controller [[maybe_unused]], + const PGetFileCacheMetaRequest* request, PGetFileCacheMetaResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + if (!config::enable_file_cache) { + LOG_WARNING("try to access tablet file cache meta, but file cache not enabled"); + return; + } + std::for_each( + request->tablet_ids().cbegin(), request->tablet_ids().cend(), [&](int64_t tablet_id) { + auto res = _engine.tablet_mgr().get_tablet(tablet_id); + if (!res.has_value()) { + LOG(ERROR) << "failed to get tablet: " << tablet_id + << " err msg: " << res.error().msg(); + return; + } + CloudTabletSPtr tablet = std::move(res.value()); + auto rowsets = tablet->get_snapshot_rowset(); + std::for_each(rowsets.cbegin(), rowsets.cend(), [&](RowsetSharedPtr rowset) { + std::string rowset_id = rowset->rowset_id().to_string(); + for (int64_t segment_id = 0; segment_id < rowset->num_segments(); + segment_id++) { + std::string file_name = fmt::format("{}_{}.dat", rowset_id, segment_id); + auto cache_key = io::BlockFileCache::hash(file_name); + auto* cache = io::FileCacheFactory::instance()->get_by_path(cache_key); + + auto segments_meta = cache->get_hot_blocks_meta(cache_key); + std::for_each( Review Comment: ditto ########## be/src/cloud/cloud_internal_service.cpp: ########## @@ -34,4 +37,68 @@ void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* _engine.sync_storage_vault(); } +FileCacheType cache_type_to_pb(io::FileCacheType type) { + switch (type) { + case io::FileCacheType::TTL: + return FileCacheType::TTL; + break; + case io::FileCacheType::INDEX: + return FileCacheType::INDEX; + break; + case io::FileCacheType::NORMAL: + return FileCacheType::NORMAL; + break; + default: + DCHECK(false); + } + return FileCacheType::NORMAL; +} + +void CloudInternalServiceImpl::get_file_cache_meta_by_tablet_id( + google::protobuf::RpcController* controller [[maybe_unused]], + const PGetFileCacheMetaRequest* request, PGetFileCacheMetaResponse* response, + google::protobuf::Closure* done) { + brpc::ClosureGuard closure_guard(done); + if (!config::enable_file_cache) { + LOG_WARNING("try to access tablet file cache meta, but file cache not enabled"); + return; + } + std::for_each( Review Comment: Use a plain `for` expression to save indention (at least 8 spaces) ``` for (auto it...; it != end();++it) { ... } ``` ########## gensrc/proto/internal_service.proto: ########## @@ -796,7 +796,7 @@ message FileCacheSegmentMeta { } message PGetFileCacheMetaResponse { - repeated FileCacheSegmentMeta file_cache_segment_metas = 1; + repeated FileCacheBlockMeta file_cache_segment_metas = 1; Review Comment: `file_cache_segment_metas` -> `file_cache_block_metas` ########## be/src/cloud/cloud_internal_service.cpp: ########## @@ -34,4 +37,68 @@ void CloudInternalServiceImpl::alter_vault_sync(google::protobuf::RpcController* _engine.sync_storage_vault(); } +FileCacheType cache_type_to_pb(io::FileCacheType type) { + switch (type) { + case io::FileCacheType::TTL: + return FileCacheType::TTL; + break; + case io::FileCacheType::INDEX: + return FileCacheType::INDEX; + break; + case io::FileCacheType::NORMAL: + return FileCacheType::NORMAL; + break; + default: Review Comment: redundant `break` ########## be/src/cloud/cloud_backend_service.cpp: ########## @@ -45,4 +57,137 @@ Status CloudBackendService::create_service(CloudStorageEngine& engine, ExecEnv* return Status::OK(); } +void CloudBackendService::sync_load_for_tablets(TSyncLoadForTabletsResponse&, + const TSyncLoadForTabletsRequest& request) { + auto f = [this, tablet_ids = request.tablet_ids]() { + std::for_each(tablet_ids.cbegin(), tablet_ids.cend(), [this](int64_t tablet_id) { + CloudTabletSPtr tablet; + auto result = _engine.tablet_mgr().get_tablet(tablet_id, true); + if (!result.has_value()) { + return; + } + Status st = result.value()->sync_rowsets(-1, true); + if (!st.ok()) { + LOG_WARNING("failed to sync load for tablet").error(st); + } + }); + }; + static_cast<void>(_exec_env->sync_load_for_tablets_thread_pool()->submit_func(std::move(f))); +} + +void CloudBackendService::get_top_n_hot_partitions(TGetTopNHotPartitionsResponse& response, + const TGetTopNHotPartitionsRequest& request) { + TabletHotspot::instance()->get_top_n_hot_partition(&response.hot_tables); + response.file_cache_size = io::FileCacheFactory::instance()->get_capacity(); + response.__isset.hot_tables = !response.hot_tables.empty(); +} + +void CloudBackendService::warm_up_tablets(TWarmUpTabletsResponse& response, + const TWarmUpTabletsRequest& request) { + Status st; + auto* manager = CloudWarmUpManager::instance(); + switch (request.type) { + case TWarmUpTabletsRequestType::SET_JOB: { + LOG_INFO("receive the warm up request.") + .tag("request_type", "SET_JOB") + .tag("job_id", request.job_id); + st = manager->check_and_set_job_id(request.job_id); + if (!st) { + LOG_WARNING("SET_JOB failed.").error(st); + break; + } + [[fallthrough]]; Review Comment: Why do we need to `fallthough`? ########## fe/fe-core/src/main/java/org/apache/doris/qe/DdlExecutor.java: ########## @@ -374,6 +376,9 @@ public static void execute(Env env, DdlStmt ddlStmt) throws Exception { env.getAuth().refreshLdap((RefreshLdapStmt) ddlStmt); } else if (ddlStmt instanceof AlterUserStmt) { env.getAuth().alterUser((AlterUserStmt) ddlStmt); + } else if (ddlStmt instanceof CancelCloudWarmUpStmt) { + CancelCloudWarmUpStmt stmt = (CancelCloudWarmUpStmt) ddlStmt; + ((CloudEnv) env).cancelCloudWarmUp(stmt); Review Comment: We should check cloud mode here ########## fe/fe-core/src/main/java/org/apache/doris/common/FeConstants.java: ########## @@ -80,6 +80,7 @@ public class FeConstants { public static final String FS_PREFIX_FILE = "file"; public static final String INTERNAL_DB_NAME = "__internal_schema"; + public static final String INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME = "cloud_cache_hotspot"; Review Comment: Is this compatible with the existing warehouses? ########## be/src/cloud/cloud_tablet_hotspot.h: ########## Review Comment: lack of comment ########## fe/fe-core/src/main/java/org/apache/doris/cloud/catalog/CloudEnv.java: ########## @@ -438,4 +452,63 @@ public void replayUpdateCloudReplica(UpdateCloudReplicaInfo info) throws MetaNot public boolean getEnableStorageVault() { return this.enableStorageVault; } + + public long loadCloudWarmUpJob(DataInputStream dis, long checksum) throws Exception { + // same as loadAlterJob + + int size = dis.readInt(); + long newChecksum = checksum ^ size; + if (size > 0) { + // There should be no old cloudWarmUp jobs, if exist throw exception, should not use this FE version + throw new IOException("There are [" + size + "] cloud warm up jobs." + + " Please downgrade FE to an older version and handle residual jobs"); + } + + // finished or cancelled jobs + size = dis.readInt(); + newChecksum ^= size; + if (size > 0) { + throw new IOException("There are [" + size + "] old finished or cancelled cloud warm up jobs." + + " Please downgrade FE to an older version and handle residual jobs"); + } + + size = dis.readInt(); + newChecksum ^= size; + for (int i = 0; i < size; i++) { + CloudWarmUpJob cloudWarmUpJob = CloudWarmUpJob.read(dis); + if (cloudWarmUpJob.isExpire() || cloudWarmUpJob.getJobState() == JobState.DELETED) { + LOG.info("cloud warm up job is expired, {}, ignore it", cloudWarmUpJob.getJobId()); + continue; + } + this.getCacheHotspotMgr().addCloudWarmUpJob(cloudWarmUpJob); + } + LOG.info("finished replay cloud warm up job from image"); + return newChecksum; + } + + public long saveCloudWarmUpJob(CountingDataOutputStream dos, long checksum) throws IOException { Review Comment: make sure the save/load procedures are compactible with the existing warehouse ########## be/src/cloud/cloud_tablet_hotspot.cpp: ########## @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_hotspot.h" + +#include <chrono> +#include <mutex> + +#include "cloud/config.h" +#include "olap/tablet_fwd.h" +#include "runtime/exec_env.h" + +namespace doris { + +void TabletHotspot::count(const BaseTabletSPtr& tablet) { + if (!config::is_cloud_mode()) return; + size_t slot_idx = tablet->tablet_id() % s_slot_size; + auto& slot = _tablets_hotspot[slot_idx]; + std::lock_guard lock(slot.mtx); + HotspotCounterPtr counter; + if (auto iter = slot.map.find(tablet->tablet_id()); iter == slot.map.end()) { + counter = std::make_shared<HotspotCounter>(tablet->table_id(), tablet->index_id(), + tablet->partition_id()); + slot.map.insert(std::make_pair(tablet->tablet_id(), counter)); + } else { + counter = iter->second; + } + counter->last_access_time = std::chrono::system_clock::now(); + counter->cur_counter++; +} + +TabletHotspot::TabletHotspot() { + _counter_thread = std::thread(&TabletHotspot::make_dot_point, this); +} + +TabletHotspot::~TabletHotspot() { + { + std::lock_guard lock(_mtx); + _closed = true; + } + _cond.notify_all(); + if (_counter_thread.joinable()) { + _counter_thread.join(); + } +} + +struct MapKeyHash { + int64_t operator()(const std::pair<int64_t, int64_t>& key) const { + return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second); + } +}; +struct TabletHotspotMapValue { + uint64_t qpd = 0; + uint64_t qpw = 0; Review Comment: Add comment for the abbreviation ########## fe/fe-core/src/main/java/org/apache/doris/analysis/WarmUpClusterStmt.java: ########## @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.OlapTable; +import org.apache.doris.cloud.catalog.CloudEnv; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.ErrorCode; +import org.apache.doris.common.ErrorReport; +import org.apache.doris.common.UserException; + +import com.google.common.base.Strings; +import org.apache.commons.lang3.tuple.ImmutableTriple; +import org.apache.commons.lang3.tuple.Triple; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +public class WarmUpClusterStmt extends StatementBase { + private static final Logger LOG = LogManager.getLogger(WarmUpClusterStmt.class); + private List<Map<TableName, String>> tableList; + private List<Triple<String, String, String>> tables = new ArrayList<>(); + private String dstClusterName; + private String srcClusterName; + private boolean isWarmUpWithTable; + private boolean isForce; + + public WarmUpClusterStmt(String dstClusterName, String srcClusterName, boolean isForce) { + this.dstClusterName = dstClusterName; + this.srcClusterName = srcClusterName; + this.isForce = isForce; + this.isWarmUpWithTable = false; + } + + public WarmUpClusterStmt(String dstClusterName, List<Map<TableName, String>> tableList, boolean isForce) { + this.dstClusterName = dstClusterName; + this.tableList = tableList; + this.isForce = isForce; + this.isWarmUpWithTable = true; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException, UserException { + super.analyze(analyzer); Review Comment: throw exception if not cloud mode ########## fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCacheHotSpotStmt.java: ########## @@ -0,0 +1,182 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.Database; +import org.apache.doris.catalog.Env; +import org.apache.doris.catalog.PrimitiveType; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.cloud.system.CloudSystemInfoService; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.FeConstants; +import org.apache.doris.common.UserException; +import org.apache.doris.datasource.InternalCatalog; +import org.apache.doris.qe.ShowResultSetMetaData; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +import java.io.StringReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class ShowCacheHotSpotStmt extends ShowStmt { + public static final ShowResultSetMetaData[] RESULT_SET_META_DATAS = { + ShowResultSetMetaData.builder() + .addColumn(new Column("cluster_id", ScalarType.createType(PrimitiveType.BIGINT))) + .addColumn(new Column("cluster_name", ScalarType.createVarchar(128))) + .addColumn(new Column("table_id", ScalarType.createType(PrimitiveType.BIGINT))) + .addColumn(new Column("table_name", ScalarType.createVarchar(128))) + .build(), + ShowResultSetMetaData.builder() + .addColumn(new Column("table_id", ScalarType.createType(PrimitiveType.BIGINT))) + .addColumn(new Column("table_name", ScalarType.createVarchar(128))) + .addColumn(new Column("partition_id", ScalarType.createType(PrimitiveType.BIGINT))) + .addColumn(new Column("partition_name", ScalarType.createVarchar(65535))) + .build(), + ShowResultSetMetaData.builder() + .addColumn(new Column("partition_id", ScalarType.createType(PrimitiveType.BIGINT))) + .addColumn(new Column("partition_name", ScalarType.createVarchar(65535))) + .build() + }; + private int metaDataPos; + private static final Logger LOG = LogManager.getLogger(ShowCacheHotSpotStmt.class); + private static final TableName TABLE_NAME = new TableName(InternalCatalog.INTERNAL_CATALOG_NAME, + FeConstants.INTERNAL_DB_NAME, FeConstants.INTERNAL_FILE_CACHE_HOTSPOT_TABLE_NAME); + private final String tablePath; + private List<String> whereExprVariables = Arrays.asList("cluster_name", "table_name"); + private List<String> whereExprValues = new ArrayList<>(); + private List<String> whereExpr = new ArrayList<>(); + private SelectStmt selectStmt; + + public ShowCacheHotSpotStmt(String url) { + tablePath = url; + } + + @Override + public void analyze(Analyzer analyzer) throws UserException { Review Comment: throw exception if not cloud mode ########## fe/fe-core/src/main/java/org/apache/doris/analysis/ShowCloudWarmUpStmt.java: ########## @@ -0,0 +1,129 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.analysis; + +import org.apache.doris.analysis.BinaryPredicate.Operator; +import org.apache.doris.catalog.Column; +import org.apache.doris.catalog.ScalarType; +import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.UserException; +import org.apache.doris.qe.ShowResultSetMetaData; + +import com.google.common.collect.ImmutableList; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +public class ShowCloudWarmUpStmt extends ShowStmt { + private static final Logger LOG = LogManager.getLogger(ShowCloudWarmUpStmt.class); + private Expr whereClause; + private boolean showAllJobs = false; + private long jobId = -1; + + private static final ImmutableList<String> TITLE_NAMES = new ImmutableList.Builder<String>() + .add("JobId") + .add("ClusterName") + .add("Status") + .add("Type") + .add("CreateTime") + .add("FinishBatch") + .add("AllBatch") + .add("FinishTime") + .add("ErrMsg") + .build(); + + public ShowCloudWarmUpStmt(Expr whereClause) { + this.whereClause = whereClause; + } + + public long getJobId() { + return jobId; + } + + public boolean showAllJobs() { + return showAllJobs; + } + + @Override + public void analyze(Analyzer analyzer) throws AnalysisException, UserException { Review Comment: throw exception if not cloud mode ########## be/src/cloud/cloud_warm_up_manager.h: ########## Review Comment: lack of comment ########## be/src/io/cache/block_file_cache_downloader.cpp: ########## Review Comment: lack of comment ########## be/src/olap/compaction.cpp: ########## @@ -1122,6 +1122,8 @@ Status CloudCompactionMixin::construct_output_rowset_writer(RowsetWriterContext& ctx.compaction_level = _engine.cumu_compaction_policy(compaction_policy)->new_compaction_level(_input_rowsets); + ctx.write_file_cache = true; Review Comment: make it a config option. base compaction may flush the cache, which cause performance issue ########## be/src/io/cache/block_file_cache_downloader.h: ########## Review Comment: lack of comment ########## be/src/cloud/cloud_tablet_hotspot.cpp: ########## Review Comment: lack of comment ########## be/src/olap/tablet_meta.h: ########## @@ -191,6 +191,7 @@ class TabletMeta { void revise_delete_bitmap_unlocked(const DeleteBitmap& delete_bitmap); const std::vector<RowsetMetaSharedPtr>& all_stale_rs_metas() const; + std::map<std::string, RowsetMetaSharedPtr> snapshot_rs_metas() const; Review Comment: add comment what is returned, what are the key and value represent for ########## be/src/cloud/cloud_tablet_hotspot.cpp: ########## @@ -0,0 +1,195 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "cloud/cloud_tablet_hotspot.h" + +#include <chrono> +#include <mutex> + +#include "cloud/config.h" +#include "olap/tablet_fwd.h" +#include "runtime/exec_env.h" + +namespace doris { + +void TabletHotspot::count(const BaseTabletSPtr& tablet) { + if (!config::is_cloud_mode()) return; + size_t slot_idx = tablet->tablet_id() % s_slot_size; + auto& slot = _tablets_hotspot[slot_idx]; + std::lock_guard lock(slot.mtx); + HotspotCounterPtr counter; + if (auto iter = slot.map.find(tablet->tablet_id()); iter == slot.map.end()) { + counter = std::make_shared<HotspotCounter>(tablet->table_id(), tablet->index_id(), + tablet->partition_id()); + slot.map.insert(std::make_pair(tablet->tablet_id(), counter)); + } else { + counter = iter->second; + } + counter->last_access_time = std::chrono::system_clock::now(); + counter->cur_counter++; +} + +TabletHotspot::TabletHotspot() { + _counter_thread = std::thread(&TabletHotspot::make_dot_point, this); +} + +TabletHotspot::~TabletHotspot() { + { + std::lock_guard lock(_mtx); + _closed = true; + } + _cond.notify_all(); + if (_counter_thread.joinable()) { + _counter_thread.join(); + } +} + +struct MapKeyHash { + int64_t operator()(const std::pair<int64_t, int64_t>& key) const { + return std::hash<int64_t> {}(key.first) + std::hash<int64_t> {}(key.second); + } +}; +struct TabletHotspotMapValue { + uint64_t qpd = 0; + uint64_t qpw = 0; Review Comment: We may also need to consider query per hour -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org