gavinchou commented on code in PR #47563: URL: https://github.com/apache/doris/pull/47563#discussion_r1961975299
########## be/src/io/tools/file_cache_microbench.cpp: ########## @@ -0,0 +1,1382 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include <brpc/controller.h> +#include <brpc/http_status_code.h> +#include <brpc/server.h> +#include <brpc/uri.h> +#include <bvar/bvar.h> +#include <glog/logging.h> + +#include <atomic> +#include <chrono> +#include <condition_variable> +#include <cstdlib> +#include <filesystem> // 添加这个头文件 +#include <future> +#include <iomanip> +#include <iostream> +#include <map> +#include <memory> +#include <mutex> +#include <queue> +#include <random> +#include <string> +#include <thread> +#include <unordered_set> +#include <vector> + +#include "build/proto/microbench.pb.h" +#include "common/config.h" +#include "common/status.h" +#include "gflags/gflags.h" +#include "io/cache/cached_remote_file_reader.h" +#include "io/file_factory.h" +#include "io/fs/s3_file_system.h" +#include "io/fs/s3_file_writer.h" +#include "rapidjson/document.h" +#include "rapidjson/stringbuffer.h" +#include "rapidjson/writer.h" +#include "runtime/exec_env.h" +#include "util/bvar_helper.h" +#include "util/defer_op.h" +#include "util/stopwatch.hpp" + +bvar::LatencyRecorder write_latency("file_cache_microbench_append"); +bvar::LatencyRecorder read_latency("file_cache_microbench_read_at"); + +const std::string HIDDEN_PREFIX = "test_file_cache_microbench/"; + +// 添加gflags定义 +DEFINE_int32(port, 8888, "Http Port of this server"); + +// 添加一个数据生成器类 +class DataGenerator { +public: + DataGenerator(size_t total_size, size_t buffer_size = 1024 * 1024) // 默认1MB缓冲区 + : _total_size(total_size), _generated_size(0), _buffer_size(buffer_size) { + _buffer.resize(_buffer_size, 'x'); + } + + // 生成特定大小的数据,作为静态函数 + static std::string generate_fixed_size_data(size_t size) { + return std::string(size, 'x'); // 生成指定大小的 'x' 字符串 + } + + // 获取下一块数据 + doris::Slice next_chunk() { + if (_generated_size >= _total_size) { + return doris::Slice(); // 返回空slice表示结束 + } + + size_t remaining = _total_size - _generated_size; + size_t chunk_size = std::min(remaining, _buffer_size); + _generated_size += chunk_size; + + return doris::Slice(_buffer.data(), chunk_size); + } + + // 重置生成器 + void reset() { _generated_size = 0; } + + // 检查是否还有更多数据 + bool has_more() const { return _generated_size < _total_size; } + + // 获取总大小 + size_t total_size() const { return _total_size; } + +private: + const size_t _total_size; + size_t _generated_size; + const size_t _buffer_size; + std::string _buffer; +}; + +// IOPS 统计器 +class IopsStats { +public: + IopsStats() : _start_time(std::chrono::steady_clock::now()), _last_update_time(_start_time) {} + + void record_operation() { + std::lock_guard<std::mutex> lock(_mutex); + auto now = std::chrono::steady_clock::now(); + _op_times.push_back(now); + + // 只保留最近1秒内的操作记录 + auto one_second_ago = now - std::chrono::seconds(1); + while (!_op_times.empty() && _op_times.front() < one_second_ago) { + _op_times.pop_front(); + } + + // 计算当前IOPS(最近1秒内的操作数) + _current_iops = _op_times.size(); + + // 更新峰值IOPS + if (_current_iops > _peak_iops) { + _peak_iops = _current_iops; + } + + // 每秒更新一次显示 + if (now - _last_update_time >= std::chrono::seconds(1)) { + _last_update_time = now; + } + } + + double get_current_iops() const { + std::lock_guard<std::mutex> lock(_mutex); + auto now = std::chrono::steady_clock::now(); + // 如果最后一次操作距离现在超过1秒,返回0 + if (_op_times.empty() || (now - _op_times.back() > std::chrono::seconds(1))) { + return 0.0; + } + return _current_iops; + } + + double get_peak_iops() const { + std::lock_guard<std::mutex> lock(_mutex); + return _peak_iops; + } + + void reset() { + std::lock_guard<std::mutex> lock(_mutex); + _start_time = std::chrono::steady_clock::now(); + _last_update_time = _start_time; + _op_times.clear(); + _current_iops = 0; + _peak_iops = 0; + } + +private: + mutable std::mutex _mutex; + std::chrono::steady_clock::time_point _start_time; + std::chrono::steady_clock::time_point _last_update_time; + std::deque<std::chrono::steady_clock::time_point> _op_times; // 记录每个操作的时间点 + double _current_iops = 0; + double _peak_iops = 0; +}; + +// IOPS Rate Limiter implementation +class IopsRateLimiter { +public: + IopsRateLimiter(int iops_limit) + : _iops_limit(iops_limit), _tokens(0), _last_update(std::chrono::steady_clock::now()) {} + + void acquire() { + if (_iops_limit <= 0) return; + + std::unique_lock<std::mutex> lock(_mutex); + + while (true) { + // 更新令牌桶 + auto now = std::chrono::steady_clock::now(); + auto duration = std::chrono::duration_cast<std::chrono::nanoseconds>(now - _last_update) + .count(); + double new_tokens = (duration / 1e9) * _iops_limit; + _tokens = std::min(_iops_limit * 1.0, _tokens + new_tokens); + _last_update = now; + + if (_tokens >= 1.0) { + _tokens -= 1.0; + break; + } + + // 计算需要等待的时间 + double tokens_needed = 1.0 - _tokens; + int64_t wait_ns = static_cast<int64_t>((tokens_needed / _iops_limit) * 1e9); + auto wait_time = now + std::chrono::nanoseconds(wait_ns); + + _cv.wait_until(lock, wait_time); + } + } + + void set_iops_limit(int iops_limit) { + std::lock_guard<std::mutex> lock(_mutex); + _iops_limit = iops_limit; + _cv.notify_all(); // 通知所有等待的线程重新检查限制 + } + +private: + std::mutex _mutex; + std::condition_variable _cv; // 添加条件变量 + std::atomic<int> _iops_limit; + double _tokens; + std::chrono::steady_clock::time_point _last_update; +}; + +// 定义一个结构体来存储文件信息 +struct FileInfo { + std::string filename; // 文件名 + size_t data_size; // 数据大小 + std::string job_id; // 关联的作业ID +}; + +class S3FileRecords { +public: + void add_file_info(const std::string& job_id, const FileInfo& file_info) { + std::lock_guard<std::mutex> lock(mutex_); + records_[job_id].emplace_back(file_info); + } + + int64_t get_exist_job_perfile_size_by_prefix(const std::string& file_prefix) { + std::lock_guard<std::mutex> lock(mutex_); + for (const auto& pair : records_) { + const std::vector<FileInfo>& file_infos = pair.second; + for (const auto& file_info : file_infos) { + if (file_info.filename.compare(0, file_prefix.length(), file_prefix) == 0) { + return file_info.data_size; + } + } + } + return -1; + } + + std::map<std::string, std::vector<FileInfo>> get_all_records() { + std::lock_guard<std::mutex> lock(mutex_); + return records_; + } + + std::string find_job_id_by_prefix(const std::string& file_prefix) { + std::lock_guard<std::mutex> lock(mutex_); + for (const auto& pair : records_) { + const std::vector<FileInfo>& file_infos = pair.second; + for (const auto& file_info : file_infos) { + if (file_info.filename.compare(0, file_prefix.length(), file_prefix) == 0) { + return pair.first; + } + } + } + return ""; + } + +private: + std::mutex mutex_; + std::map<std::string, std::vector<FileInfo>> records_; +}; + +// 创建一个全局的 S3FileRecords 实例 +S3FileRecords s3_file_records; + +// IOPS-controlled S3 file writer +class IopsControlledS3FileWriter { +public: + IopsControlledS3FileWriter(std::shared_ptr<doris::io::ObjClientHolder> client, + const std::string& bucket, const std::string& key, + const doris::io::FileWriterOptions* options, + std::shared_ptr<IopsRateLimiter> rate_limiter, + std::shared_ptr<IopsStats> stats) + : _writer(client, bucket, key, options), _rate_limiter(rate_limiter), _stats(stats) {} + + doris::Status appendv(const doris::Slice* slices, size_t slices_size) { + _rate_limiter->acquire(); + _stats->record_operation(); + using namespace doris; + SCOPED_BVAR_LATENCY(write_latency) + return _writer.appendv(slices, slices_size); + } + + doris::Status close() { return _writer.close(); } + +private: + doris::io::S3FileWriter _writer; + std::shared_ptr<IopsRateLimiter> _rate_limiter; + std::shared_ptr<IopsStats> _stats; +}; + +// IOPS-controlled file reader +class IopsControlledFileReader { +public: + IopsControlledFileReader(std::shared_ptr<doris::io::FileReader> base_reader, + std::shared_ptr<IopsRateLimiter> rate_limiter, + std::shared_ptr<IopsStats> stats) + : _base_reader(std::move(base_reader)), _rate_limiter(rate_limiter), _stats(stats) {} + + doris::Status read_at(size_t offset, const doris::Slice& result, size_t* bytes_read, + const doris::io::IOContext* io_ctx) { + _rate_limiter->acquire(); + _stats->record_operation(); + using namespace doris; + SCOPED_BVAR_LATENCY(read_latency) + return _base_reader->read_at(offset, result, bytes_read, io_ctx); + } + + size_t size() const { return _base_reader->size(); } + + doris::Status close() { return _base_reader->close(); } + +private: + std::shared_ptr<doris::io::FileReader> _base_reader; + std::shared_ptr<IopsRateLimiter> _rate_limiter; + std::shared_ptr<IopsStats> _stats; +}; + +// 线程池实现 +class ThreadPool { +public: + ThreadPool(size_t num_threads) : stop(false) { + for (size_t i = 0; i < num_threads; ++i) { + workers.emplace_back([this] { + while (true) { + std::function<void()> task; + { + std::unique_lock<std::mutex> lock(queue_mutex); + condition.wait(lock, [this] { return stop || !tasks.empty(); }); + if (stop && tasks.empty()) { + return; + } + task = std::move(tasks.front()); + tasks.pop(); + } + task(); + } + }); + } + } + + template <class F> + std::future<void> enqueue(F&& f) { + auto task = std::make_shared<std::packaged_task<void()>>(std::forward<F>(f)); + std::future<void> res = task->get_future(); + { + std::unique_lock<std::mutex> lock(queue_mutex); + if (stop) { + throw std::runtime_error("enqueue on stopped ThreadPool"); + } + tasks.emplace([task]() { (*task)(); }); + } + condition.notify_one(); + return res; + } + + ~ThreadPool() { + { + std::unique_lock<std::mutex> lock(queue_mutex); + stop = true; + } + condition.notify_all(); + for (std::thread& worker : workers) { + worker.join(); + } + } + +private: + std::vector<std::thread> workers; + std::queue<std::function<void()>> tasks; + std::mutex queue_mutex; + std::condition_variable condition; + bool stop; +}; + +// 添加一个文件完成状态跟踪器 +class FileCompletionTracker { +public: + void mark_completed(const std::string& key) { + std::lock_guard<std::mutex> lock(_mutex); + _completed_files.insert(key); + _cv.notify_all(); // 通知所有等待的线程 + } + + bool is_completed(const std::string& key) { + return _completed_files.find(key) != _completed_files.end(); + } + + void wait_for_completion(const std::string& key) { + std::unique_lock<std::mutex> lock(_mutex); + _cv.wait(lock, [&] { return is_completed(key); }); + } + +private: + std::mutex _mutex; + std::condition_variable _cv; // 添加条件变量 + std::unordered_set<std::string> _completed_files; +}; + +std::string get_usage(const std::string& progname) { + std::string usage = R"( + )" + progname + R"( is the Doris microbench tool for testing file cache in cloud. + + Usage: + Start the server: + )" + progname + R"( --port=<port_number> + + API Endpoints: + POST /submit_job + Submit a job with the following JSON body: + { + "size_bytes_perfile": <size>, // Number of bytes to write per segment file + "write_iops": <limit>, // IOPS limit for writing per segment files + "read_iops": <limit>, // IOPS limit for reading per segment files + "num_threads": <count>, // Number of threads in the thread pool, default 200 + "num_files": <count>, // Number of segments to write/read + "file_prefix": "<prefix>", // Prefix for segment files, Notice: this tools hide prefix(test_file_cache_microbench/) before file_prefix + "write_batch_size": <size>, // Size of data to write in each write operation + "cache_type": <type>, // Write or Read data enter file cache queue type, support NORMAL | TTL | INDEX | DISPOSABLE, default NORMAL + "expiration": <timestamp>, // File cache ttl expire time, value is a unix timestamp + "repeat": <count>, // Read repeat times, default 1 + "read_offset": [<left>, <right>], // Range for reading (left inclusive, right exclusive) + "read_length": [<left>, <right>] // Range for reading length (left inclusive, right exclusive) + } + + GET /get_job_status/<job_id> + Retrieve the status of a submitted job. + Parameters: + - job_id: The ID of the job to retrieve status for. + - files (optional): If provided, returns the associated file records for the job. + Example: /get_job_status/job_id?files=10 + + GET /list_jobs + List all submitted jobs and their statuses. + + GET /get_help + Get this help information. + + Notes: + - Ensure that the S3 configuration is set correctly in the environment. + - The program will create and read files in the specified S3 bucket. + - Monitor the logs for detailed execution information and errors. + )"; + return usage; +} + +// Job配置结构 +struct JobConfig { + int64_t size_bytes_perfile; + int32_t write_iops = 0; + int32_t read_iops = 0; + int32_t num_threads; + int32_t num_files; + std::string file_prefix; + std::string cache_type; + int64_t expiration; + int32_t repeat = 1; + int64_t write_batch_size; + int64_t read_offset_left; + int64_t read_offset_right; + int64_t read_length_left; + int64_t read_length_right; + bool write_file_cache = true; + + // 从JSON解析配置 + static JobConfig from_json(const std::string& json_str) { + JobConfig config; + // 使用rapidjson解析 + rapidjson::Document d; + d.Parse(json_str.c_str()); + + if (d.HasParseError()) { + throw std::runtime_error("JSON parse error"); + } + validate(d); + if (d.HasMember("write_file_cache") && d["write_file_cache"].GetBool() == false) { + config.write_file_cache = false; + } + config.num_files = d["num_files"].GetInt(); + if (config.num_files == 0) { + config.num_files = 1; + } + config.size_bytes_perfile = d["size_bytes_perfile"].GetInt64(); + config.write_iops = d["write_iops"].GetInt(); + config.read_iops = d["read_iops"].GetInt(); + config.num_threads = d["num_threads"].GetInt(); + if (config.num_threads == 0) { + config.num_threads = 200; + } + config.file_prefix = d["file_prefix"].GetString(); + + if (!d.HasMember("cache_type")) { + config.cache_type = "NORMAL"; + } else { + config.cache_type = d["cache_type"].IsString() ? d["cache_type"].GetString() : "NORMAL"; + } + + if (config.cache_type == "TTL") { + if (!d.HasMember("expiration")) { + throw std::runtime_error("expiration is required when cache type eq TTL"); + } + config.expiration = d["expiration"].GetInt64(); + if (config.expiration <= 0) { + throw std::runtime_error("expiration <= 0 when cache type eq TTL"); + } + } + + config.write_batch_size = d["write_batch_size"].GetInt64(); + if (config.write_batch_size == 0) { + config.write_batch_size = doris::config::s3_write_buffer_size; + } + + // such as [0, 100) + const rapidjson::Value& read_offset_array = d["read_offset"]; + if (!read_offset_array.IsArray() || read_offset_array.Size() != 2) { + throw std::runtime_error("Invalid read_offset format, expected array of size 2"); + } + config.read_offset_left = read_offset_array[0].GetInt64(); + config.read_offset_right = read_offset_array[1].GetInt64(); + if (config.read_offset_left >= config.read_offset_right) { + throw std::runtime_error("read_offset_left must be less than read_offset_right"); + } + + // such as [100, 500) or [-200, -10) + const rapidjson::Value& read_length_array = d["read_length"]; + if (!read_length_array.IsArray() || read_length_array.Size() != 2) { + throw std::runtime_error("Invalid read_length format, expected array of size 2"); + } + config.read_length_left = read_length_array[0].GetInt64(); + config.read_length_right = read_length_array[1].GetInt64(); + if (config.read_length_left >= config.read_length_right) { + throw std::runtime_error("read_length_left must be less than read_length_right"); + } + + return config; + } + + static void validate(const rapidjson::Document& json_data) { + if (!json_data.HasMember("file_prefix") || !json_data["file_prefix"].IsString() || + strlen(json_data["file_prefix"].GetString()) == 0) { + throw std::runtime_error("file_prefix is required and cannot be empty"); + } + } + + std::string to_string() const { + std::stringstream ss; + ss << "size_bytes_perfile: " << size_bytes_perfile << ", write_iops: " << write_iops + << ", read_iops: " << read_iops << ", num_threads: " << num_threads + << ", num_files: " << num_files << ", file_prefix: " << HIDDEN_PREFIX + file_prefix + << ", write_file_cache: " << write_file_cache + << ", more than write_batch_size: " << write_batch_size << ", read repeat: " << repeat + << ", ttl expiration: " << expiration << ", cache_type: " << cache_type + << " will append data to s3 writer, read_offset: [" << read_offset_left << " , " + << read_offset_right << "), read_length: [" << read_length_left << " , " + << read_length_right << ")"; + return ss.str(); + } +}; + +// Job状态 +enum class JobStatus { PENDING, RUNNING, COMPLETED, FAILED }; + +// Job结构 +struct Job { + std::string job_id; + JobConfig config; + JobStatus status; + std::string error_message; + std::chrono::system_clock::time_point create_time; + std::chrono::system_clock::time_point start_time; + std::chrono::system_clock::time_point end_time; + + // Job执行结果统计 + struct Statistics { + double peak_write_iops; + double peak_read_iops; + int64_t cache_hits; + int64_t cache_misses; + int64_t bytes_read_local; + int64_t bytes_read_remote; + std::string total_write_time; + std::string total_read_time; + } stats; + + // 记录与作业相关的文件信息 + std::vector<FileInfo> file_records; // 记录文件信息 + + // 添加 completion_tracker + std::shared_ptr<FileCompletionTracker> completion_tracker; + + // 默认构造函数 + Job() + : job_id(""), + config(), + status(JobStatus::PENDING), + create_time(std::chrono::system_clock::now()), + completion_tracker(std::make_shared<FileCompletionTracker>()) {} + + // 带参数的构造函数 + Job(const std::string& id, const JobConfig& cfg) + : job_id(id), + config(cfg), + status(JobStatus::PENDING), + create_time(std::chrono::system_clock::now()) { + if (config.write_iops && config.read_iops) { + completion_tracker = std::make_shared<FileCompletionTracker>(); + } else { + completion_tracker = nullptr; + } + } +}; + +// Job管理器 +class JobManager { +public: + JobManager() : _next_job_id(0), _job_executor_pool(4) {} // 创建4个线程的执行池 + + std::string submit_job(const JobConfig& config) { + try { + std::lock_guard<std::mutex> lock(_mutex); + std::string job_id = "job_" + std::to_string(std::time(nullptr)) + "_" + + std::to_string(_next_job_id++); + _jobs.emplace(job_id, Job(job_id, config)); + + // 将作业提交到执行线程池 + _job_executor_pool.enqueue([this, job_id]() { + try { + { + std::lock_guard<std::mutex> lock(_mutex); + _jobs[job_id].status = JobStatus::RUNNING; + _jobs[job_id].start_time = std::chrono::system_clock::now(); + } + + execute_job(job_id); + + { + std::lock_guard<std::mutex> lock(_mutex); + _jobs[job_id].status = JobStatus::COMPLETED; + _jobs[job_id].end_time = std::chrono::system_clock::now(); + } + } catch (const std::exception& e) { + std::lock_guard<std::mutex> lock(_mutex); + _jobs[job_id].status = JobStatus::FAILED; + _jobs[job_id].error_message = e.what(); + _jobs[job_id].end_time = std::chrono::system_clock::now(); + LOG(ERROR) << "Job " << job_id << " failed: " << e.what(); + } + }); + + return job_id; + } catch (const std::exception& e) { + LOG(ERROR) << "Error submitting job: " << e.what(); + // 返回错误信息 + return R"({"error": ")" + std::string(e.what()) + R"("})"; + } + } + + Job get_job_status(const std::string& job_id) { + std::lock_guard<std::mutex> lock(_mutex); + auto it = _jobs.find(job_id); + if (it != _jobs.end()) { + return it->second; + } + throw std::runtime_error("Job not found"); + } + + std::vector<Job> list_jobs() { + std::lock_guard<std::mutex> lock(_mutex); + std::vector<Job> result; + for (const auto& pair : _jobs) { + result.push_back(pair.second); + } + return result; + } + + void start() { + // 不再需要启动worker线程 + } + + void stop() { + // 等待所有作业完成 + _job_executor_pool.~ThreadPool(); + } + + void record_file_info(const std::string& key, size_t data_size, const std::string& job_id) { + std::lock_guard<std::mutex> lock(_mutex); // 确保线程安全 + auto it = _jobs.find(job_id); + if (it != _jobs.end()) { + FileInfo file_info = {key, data_size, job_id}; + it->second.file_records.push_back(file_info); // 更新找到的作业的文件记录 + + // 将 FileInfo 添加到 s3_file_records 中 + s3_file_records.add_file_info(job_id, file_info); + } else { + LOG(ERROR) << "Job ID not found: " << job_id; // 记录错误信息 + } + } + + void execute_job(const std::string& job_id) { + Job& job = _jobs[job_id]; + JobConfig config = job.config; + LOG(INFO) << "begin to Run " << job_id << " job config: " << config.to_string(); + + // 生成多个key + std::vector<std::string> keys; + keys.reserve(config.num_files); + + std::string rewrite_job_id = job_id; + // Job Read the previously job uploaded files + if (config.read_iops != 0 && config.write_iops == 0) { + // 当 read_iops != 0 && write_iops == 0 时,表示读取之前写入的文件 + std::string old_job_id = + s3_file_records.find_job_id_by_prefix(HIDDEN_PREFIX + config.file_prefix); + if (old_job_id == "") { + std::string err_msg = + "Can't find previously job uploaded files, Please make sure read " + "files exist in obj or It is also possible that you have restarted " + "the file_cache_microbench program, job_id = " + + job_id; + LOG(WARNING) << err_msg; + throw std::runtime_error(err_msg); + } + rewrite_job_id = old_job_id; + } + + // 继续生成 keys + for (int i = 0; i < config.num_files; ++i) { + keys.push_back(HIDDEN_PREFIX + config.file_prefix + "/" + rewrite_job_id + "_" + + std::to_string(i)); + } + + if (config.write_iops) { + // 执行写操作 + execute_write_tasks(keys, job, config); + } + + if (config.read_iops) { + for (int i = 0; i < config.repeat; i++) { + // 执行读操作 + execute_read_tasks(keys, job, config); + } + } + LOG(INFO) << "finish to Run " << job_id; + } + +private: + doris::S3ClientConf create_s3_client_conf(const JobConfig& config) { + doris::S3ClientConf s3_conf; + s3_conf.max_connections = std::max(256, config.num_threads * 4); + s3_conf.request_timeout_ms = 60000; + s3_conf.connect_timeout_ms = 3000; + s3_conf.ak = doris::config::test_s3_ak; + s3_conf.sk = doris::config::test_s3_sk; + s3_conf.region = doris::config::test_s3_region; + s3_conf.endpoint = doris::config::test_s3_endpoint; + return s3_conf; + } + + void execute_write_tasks(const std::vector<std::string>& keys, Job& job, + const JobConfig& config) { + // 创建 S3 客户端配置 + doris::S3ClientConf s3_conf = create_s3_client_conf(config); + + // 初始化 S3 客户端 + auto client = std::make_shared<doris::io::ObjClientHolder>(s3_conf); + doris::Status init_status = client->init(); + if (!init_status.ok()) { + throw std::runtime_error("Failed to initialize S3 client: " + init_status.to_string()); + } + + // 创建速率限制器和统计器 + std::vector<std::shared_ptr<IopsRateLimiter>> write_limiters; + write_limiters.reserve(config.num_files); + for (int i = 0; i < config.num_files; ++i) { + write_limiters.push_back(std::make_shared<IopsRateLimiter>(config.write_iops)); + } + auto write_stats = std::make_shared<IopsStats>(); + + std::atomic<int> completed_writes(0); + std::vector<std::future<void>> write_futures; + ThreadPool write_pool(config.num_threads); + + // 启动写入任务 + doris::MonotonicStopWatch write_stopwatch; // 添加写入任务计时器 + write_stopwatch.start(); + for (int i = 0; i < keys.size(); ++i) { + const auto& key = keys[i]; + write_futures.push_back(write_pool.enqueue([&, key, i]() { + try { + DataGenerator data_generator(config.size_bytes_perfile); + doris::io::FileWriterOptions options; + if (config.cache_type == "TTL") { + options.file_cache_expiration = config.expiration; + } + options.write_file_cache = config.write_file_cache; + auto writer = std::make_unique<IopsControlledS3FileWriter>( + client, doris::config::test_s3_bucket, key, &options, write_limiters[i], + write_stats); + + std::vector<doris::Slice> slices; + slices.reserve(4); + size_t accumulated_size = 0; + + // 流式写入数据 + while (data_generator.has_more()) { + doris::Slice chunk = data_generator.next_chunk(); + slices.push_back(chunk); + accumulated_size += chunk.size; + + if (accumulated_size >= config.write_batch_size || + !data_generator.has_more()) { + doris::Status status = writer->appendv(slices.data(), slices.size()); + if (!status.ok()) { + throw std::runtime_error("Write error for key " + key + ": " + + status.to_string()); + } + slices.clear(); + accumulated_size = 0; + } + } + + doris::Status status = writer->close(); + if (!status.ok()) { + throw std::runtime_error("Close error for key " + key + ": " + + status.to_string()); + } + if (job.completion_tracker) { + job.completion_tracker->mark_completed(key); + } + completed_writes++; + } catch (const std::exception& e) { + LOG(ERROR) << "Write task failed for segment " << key << ": " << e.what(); + } + })); + } + + // 等待所有写入任务完成 + for (auto& future : write_futures) { + future.get(); + } + write_stopwatch.stop(); // 停止计时 + + // 将写入时间从纳秒转换为秒并格式化为字符串 + double total_write_time_seconds = write_stopwatch.elapsed_time() / 1e9; // 纳秒转秒 + job.stats.total_write_time = + std::to_string(total_write_time_seconds) + " seconds"; // 保存为字符串 + job.stats.peak_write_iops = write_stats->get_peak_iops(); + LOG(INFO) << "Total write time: " << job.stats.total_write_time << " seconds"; + + // 记录写入的文件信息 + for (const auto& key : keys) { + size_t data_size = config.size_bytes_perfile; + record_file_info(key, data_size, job.job_id); + } + } + + void execute_read_tasks(const std::vector<std::string>& keys, Job& job, JobConfig& config) { + int64_t exist_job_perfile_size = s3_file_records.get_exist_job_perfile_size_by_prefix( + HIDDEN_PREFIX + config.file_prefix); + std::vector<std::future<void>> read_futures; + doris::io::IOContext io_ctx; + doris::io::FileCacheStatistics total_stats; + io_ctx.file_cache_stats = &total_stats; + if (config.cache_type == "DISPOSABLE") { + io_ctx.is_disposable = true; + } else if (config.cache_type == "TTL") { + io_ctx.expiration_time = config.expiration; + } else if (config.cache_type == "INDEX") { + io_ctx.is_index_data = true; + } else { // default NORMAL + // do nothing + } + ThreadPool read_pool(config.num_threads); + std::vector<std::shared_ptr<IopsRateLimiter>> read_limiters; + read_limiters.reserve(config.num_files); + auto read_stats = std::make_shared<IopsStats>(); + for (int i = 0; i < config.num_files; ++i) { + read_limiters.push_back(std::make_shared<IopsRateLimiter>(config.read_iops)); + } + std::atomic<int> completed_reads(0); + doris::MonotonicStopWatch read_stopwatch; // 添加读取任务计时器 + + // 创建 S3 客户端配置 + doris::S3ClientConf s3_conf = create_s3_client_conf(config); + + read_stopwatch.start(); + for (int i = 0; i < keys.size(); ++i) { + const auto& key = keys[i]; + read_futures.push_back(read_pool.enqueue([&, key, i]() { + try { + if (job.completion_tracker) { + job.completion_tracker->wait_for_completion(key); // 等待文件完成 + } + doris::io::FileReaderOptions reader_opts; Review Comment: init file_size if possible, because we known the file size. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org