bobhan1 commented on code in PR #53824:
URL: https://github.com/apache/doris/pull/53824#discussion_r2227603772
##########
be/src/cloud/cloud_tablet.cpp:
##########
@@ -242,6 +249,169 @@ TabletSchemaSPtr CloudTablet::merged_tablet_schema()
const {
return _merged_tablet_schema;
}
+bool CloudTablet::split_rowsets_by_version_overlap(
+ const std::vector<RowsetSharedPtr>& input_rowsets,
+ std::vector<RowsetSharedPtr>* new_rowsets,
+ std::vector<RowsetSharedPtr>* overlapping_rowsets) {
+ auto max_version = max_version_unlocked();
+ for (auto rs : input_rowsets) {
+ if (rs->version().first > max_version) {
+ new_rowsets->push_back(rs);
+ } else if (rs->version().second <= max_version) {
+ overlapping_rowsets->push_back(rs);
+ } else {
+ new_rowsets->clear();
+ overlapping_rowsets->clear();
+ return false;
+ }
+ }
+ return true;
+}
+
+void CloudTablet::warm_up_rowset_unlocked(RowsetSharedPtr rowset, bool
version_overlap,
+ bool delay_add_rowset) {
+ if (_rowset_warm_up_states.find(rowset->rowset_id()) !=
_rowset_warm_up_states.end()) {
+ return;
+ }
+ if (delay_add_rowset) {
+ g_file_cache_shield_delayed_rowset_num << 1;
+ LOG(INFO) << "triggered a warm up for overlapping rowset " <<
rowset->version()
+ << ", will add it to tablet meta latter";
+ }
+ // warmup rowset data in background
+ bool download_task_submitted = false;
+ for (int seg_id = 0; seg_id < rowset->num_segments(); ++seg_id) {
+ const auto& rowset_meta = rowset->rowset_meta();
+ constexpr int64_t interval = 600; // 10 mins
+ // When BE restart and receive the `load_sync` rpc, it will sync all
historical rowsets first time.
+ // So we need to filter out the old rowsets avoid to download the
whole table.
+ if (!version_overlap &&
+ ::time(nullptr) - rowset_meta->newest_write_timestamp() >=
interval) {
+ continue;
+ }
+
+ auto storage_resource = rowset_meta->remote_storage_resource();
+ if (!storage_resource) {
+ LOG(WARNING) << storage_resource.error();
+ continue;
+ }
+
+ int64_t expiration_time =
+ _tablet_meta->ttl_seconds() == 0 ||
rowset_meta->newest_write_timestamp() <= 0
+ ? 0
+ : rowset_meta->newest_write_timestamp() +
_tablet_meta->ttl_seconds();
+ // clang-format off
+
_engine.file_cache_block_downloader().submit_download_task(io::DownloadFileMeta
{
+ .path =
storage_resource.value()->remote_segment_path(*rowset_meta, seg_id),
+ .file_size = rowset->rowset_meta()->segment_file_size(seg_id),
+ .file_system = storage_resource.value()->fs,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ .download_done {[this, rowset, delay_add_rowset](Status st) {
+ warm_up_done_cb(rowset, st, delay_add_rowset);
+ if (!st) {
+ LOG_WARNING("add rowset warm up error ").error(st);
+ }
+ }},
+ });
+ download_task_submitted = true;
+
+ auto download_idx_file = [&](const io::Path& idx_path) {
+ io::DownloadFileMeta meta {
+ .path = idx_path,
+ .file_size = -1,
+ .file_system = storage_resource.value()->fs,
+ .ctx =
+ {
+ .expiration_time = expiration_time,
+ .is_dryrun =
config::enable_reader_dryrun_when_download_file_cache,
+ },
+ .download_done {[](Status st) {
+ if (!st) {
+ LOG_WARNING("add rowset warm up error ").error(st);
+ }
+ }},
+ };
+
_engine.file_cache_block_downloader().submit_download_task(std::move(meta));
+ };
+ // clang-format on
+ auto schema_ptr = rowset_meta->tablet_schema();
+ auto idx_version = schema_ptr->get_inverted_index_storage_format();
+ if (idx_version == InvertedIndexStorageFormatPB::V1) {
+ for (const auto& index : schema_ptr->inverted_indexes()) {
+ auto idx_path = storage_resource.value()->remote_idx_v1_path(
+ *rowset_meta, seg_id, index->index_id(),
index->get_index_suffix());
+ download_idx_file(idx_path);
+ }
+ } else {
+ if (schema_ptr->has_inverted_index()) {
+ auto idx_path =
storage_resource.value()->remote_idx_v2_path(*rowset_meta, seg_id);
+ download_idx_file(idx_path);
+ }
+ }
+ }
+ if (download_task_submitted) {
+ VLOG_DEBUG << "warm up rowset " << rowset->version() << " triggerd by
sync rowset";
+ _rowset_warm_up_states[rowset->rowset_id()] =
WarmUpState::TRIGGERED_BY_SYNC_ROWSET;
+ }
+}
+
+bool CloudTablet::is_warm_up_confilict_with_compaction() {
+ if (!config::enable_read_cluster_file_cache_shield) {
+ return false;
+ }
+ for (auto& [rowset_id, state] : _rowset_warm_up_states) {
+ if (state == WarmUpState::TRIGGERED_BY_SYNC_ROWSET) {
+ return true;
+ }
+ }
+ return false;
+}
+
+void CloudTablet::warm_up_done_cb(RowsetSharedPtr rowset, Status status, bool
delay_add_rowset) {
+ if (delay_add_rowset) {
+ DBUG_EXECUTE_IF("CloudTablet.warm_up_done_cb.inject_sleep_s", {
+ auto sleep_time = dp->param("sleep", 3);
+ LOG_WARNING("CloudTablet.warm_up_done_cb.inject_sleep {} s",
sleep_time)
+ .tag("tablet_id", tablet_id());
+ std::this_thread::sleep_for(std::chrono::seconds(sleep_time));
+ });
+
+ g_file_cache_shield_delayed_rowset_add_num << 1;
+ }
+ std::unique_lock<std::shared_mutex> meta_lock;
Review Comment:
```suggestion
std::unique_lock<std::shared_mutex> meta_lock{_meta_lock};
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]