This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 0c5c400b327 branch-3.0: [feature](cloud)Impl file cache microbench
#47563 (#49440)
0c5c400b327 is described below
commit 0c5c400b3270053b2fa1da2762429436b70ffe6f
Author: deardeng <[email protected]>
AuthorDate: Sun Mar 30 10:39:28 2025 +0800
branch-3.0: [feature](cloud)Impl file cache microbench #47563 (#49440)
cherry pick from #47563
---
be/CMakeLists.txt | 10 +
be/src/io/tools/CMakeLists.txt | 68 +
be/src/io/tools/Makefile | 36 +
be/src/io/tools/file_cache_microbench.cpp | 2354 +++++++++++++++++++++++++++++
be/src/io/tools/proto/Makefile | 38 +
be/src/io/tools/proto/microbench.proto | 39 +
be/src/io/tools/readme.md | 133 ++
build.sh | 96 +-
run-be-ut.sh | 1 +
9 files changed, 2735 insertions(+), 40 deletions(-)
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 1124eb864e1..4e3ea02ef47 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -86,6 +86,11 @@ if (DISPLAY_BUILD_TIME)
set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE "time -f 'TimeUsage:
real=%es, user=%Us, sys=%Ss'")
endif()
+if (BUILD_FILE_CACHE_MICROBENCH_TOOL)
+ add_definitions(-DBE_TEST)
+ add_definitions(-DBE_BENCHMARK)
+endif()
+
message(STATUS "GLIBC_COMPATIBILITY is ${GLIBC_COMPATIBILITY}")
message(STATUS "USE_LIBCPP is ${USE_LIBCPP}")
message(STATUS "USE_MEM_TRACKER is ${USE_MEM_TRACKER}")
@@ -765,6 +770,11 @@ if (BUILD_META_TOOL)
add_subdirectory(${SRC_DIR}/tools)
endif()
+option(BUILD_FILE_CACHE_MICROBENCH_TOOL "Build file cache mirobench Tool" OFF)
+if (BUILD_FILE_CACHE_MICROBENCH_TOOL)
+ add_subdirectory(${SRC_DIR}/io/tools)
+endif()
+
option(BUILD_INDEX_TOOL "Build index tool" OFF)
if (BUILD_INDEX_TOOL)
add_subdirectory(${SRC_DIR}/index-tools)
diff --git a/be/src/io/tools/CMakeLists.txt b/be/src/io/tools/CMakeLists.txt
new file mode 100644
index 00000000000..40088892a97
--- /dev/null
+++ b/be/src/io/tools/CMakeLists.txt
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_DIR}/src/tools")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_DIR}/src/tools")
+
+# 编译proto文件
+execute_process(
+ COMMAND make clean -C ${CMAKE_CURRENT_SOURCE_DIR}/proto
+ RESULT_VARIABLE CLEAN_RESULT
+)
+
+execute_process(
+ COMMAND make -C ${CMAKE_CURRENT_SOURCE_DIR}/proto
+ RESULT_VARIABLE MAKE_RESULT
+)
+
+if(NOT ${MAKE_RESULT} EQUAL 0)
+ message(FATAL_ERROR "Failed to compile proto files")
+endif()
+
+# 打印当前源代码目录
+message(STATUS "CMAKE_CURRENT_SOURCE_DIR: ${CMAKE_CURRENT_SOURCE_DIR}")
+
+# 查找生成的proto文件
+file(GLOB PROTO_SRCS "${CMAKE_CURRENT_SOURCE_DIR}/build/proto/*.pb.cc")
+file(GLOB PROTO_HDRS "${CMAKE_CURRENT_SOURCE_DIR}/build/proto/*.pb.h")
+
+# 打印PROTO_SRCS和PROTO_HDRS
+message(STATUS "PROTO_SRCS: ${PROTO_SRCS}")
+message(STATUS "PROTO_HDRS: ${PROTO_HDRS}")
+
+# 添加 file_cache_microbench 可执行文件
+add_executable(file_cache_microbench
+ file_cache_microbench.cpp
+ ${PROTO_SRCS}
+)
+
+# 添加proto生成文件的包含路径
+target_include_directories(file_cache_microbench PRIVATE
+ ${CMAKE_CURRENT_SOURCE_DIR}/build/proto
+)
+
+# 链接所需的库
+target_link_libraries(file_cache_microbench
+ ${DORIS_LINK_LIBS}
+ protobuf
+)
+
+# 安装规则
+install(TARGETS file_cache_microbench DESTINATION ${OUTPUT_DIR}/lib/)
diff --git a/be/src/io/tools/Makefile b/be/src/io/tools/Makefile
new file mode 100644
index 00000000000..d3be0e6905d
--- /dev/null
+++ b/be/src/io/tools/Makefile
@@ -0,0 +1,36 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This file is to make all generated file needed by fe and be.
+
+BUILD_DIR = ${CURDIR}/build/
+
+all: subdirs
+.PHONY: all
+
+# build all subdir
+SUBDIR = script proto thrift
+subdirs: ${SUBDIR}
+.PHONY: subdirs ${SUBDIR}
+${SUBDIR}:
+ $(MAKE) -C $@
+# script will product new thrift file.
+thrift: script
+
+clean:
+ rm -rf ${BUILD_DIR}
+.PHONY: clean
diff --git a/be/src/io/tools/file_cache_microbench.cpp
b/be/src/io/tools/file_cache_microbench.cpp
new file mode 100644
index 00000000000..0111ff9f68c
--- /dev/null
+++ b/be/src/io/tools/file_cache_microbench.cpp
@@ -0,0 +1,2354 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#if defined(BE_TEST) && defined(BUILD_FILE_CACHE_MICROBENCH_TOOL)
+#include <brpc/controller.h>
+#include <brpc/http_status_code.h>
+#include <brpc/server.h>
+#include <brpc/uri.h>
+#include <bvar/bvar.h>
+#include <fmt/format.h>
+#include <glog/logging.h>
+
+#include <atomic>
+#include <chrono>
+#include <condition_variable>
+#include <cstdlib>
+#include <filesystem> // Add this header file
+#include <future>
+#include <iomanip>
+#include <iostream>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <queue>
+#include <random>
+#include <string>
+#include <thread>
+#include <unordered_set>
+#include <vector>
+
+#include "build/proto/microbench.pb.h"
+#include "common/config.h"
+#include "common/status.h"
+#include "gflags/gflags.h"
+#include "io/cache/block_file_cache.h"
+#include "io/cache/block_file_cache_factory.h"
+#include "io/cache/cached_remote_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/s3_file_system.h"
+#include "io/fs/s3_file_writer.h"
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
+#ifdef __clang__
+#pragma clang diagnostic push
+#pragma clang diagnostic ignored "-Wkeyword-macro"
+#elif defined(__GNUC__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wshadow"
+#endif
+
+#define private public
+#include "runtime/exec_env.h"
+#undef private
+
+#ifdef __clang__
+#pragma clang diagnostic pop
+#elif defined(__GNUC__)
+#pragma GCC diagnostic pop
+#endif
+
+#include <gen_cpp/cloud_version.h>
+
+#include "util/bvar_helper.h"
+#include "util/defer_op.h"
+#include "util/stopwatch.hpp"
+#include "util/string_util.h"
+#include "util/threadpool.h"
+
+using doris::io::FileCacheFactory;
+using doris::io::BlockFileCache;
+
+bvar::LatencyRecorder microbench_write_latency("file_cache_microbench_append");
+bvar::LatencyRecorder microbench_read_latency("file_cache_microbench_read_at");
+
+const std::string HIDDEN_PREFIX = "test_file_cache_microbench/";
+const char PAD_CHAR = 'x';
+const size_t BUFFER_SIZE = 1024 * 1024;
+// Just 10^9.
+static constexpr auto NS = 1000000000UL;
+
+DEFINE_int32(port, 8888, "Http Port of this server");
+
+static std::string build_info() {
+ std::stringstream ss;
+ ss << R"(
+ Version: {)";
+ ss << DORIS_CLOUD_BUILD_VERSION;
+
+#if defined(NDEBUG)
+ ss << R"(-release})";
+#else
+ ss << R"(-debug})";
+#endif
+
+ ss << R"(
+ Code_version: {commit=)" DORIS_CLOUD_BUILD_HASH R"( time=)"
DORIS_CLOUD_BUILD_VERSION_TIME R"(
+ Build_info: {initiator=)" DORIS_CLOUD_BUILD_INITIATOR R"( build_at=)"
DORIS_CLOUD_BUILD_TIME R"(
+ Build_on: )" DORIS_CLOUD_BUILD_OS_VERSION R"(})";
+ return ss.str();
+}
+
+// Modify DataGenerator class to generate more standard data blocks
+class DataGenerator {
+public:
+ DataGenerator(size_t total_size) : _total_size(total_size),
_generated_size(0) {
+ _buffer.resize(BUFFER_SIZE);
+ }
+
+ // Get the next chunk of data
+ doris::Slice next_chunk(const std::string& key) {
+ if (_generated_size >= _total_size) {
+ // Return an empty slice to indicate the end
+ return doris::Slice();
+ }
+
+ size_t remaining = _total_size - _generated_size;
+ size_t chunk_size = std::min(remaining, BUFFER_SIZE);
+
+ // Generate the tag for this block
+ std::string tag = fmt::format("key={},offset={}\n", key,
_generated_size);
+ size_t tag_size = tag.size();
+
+ // Ensure chunk_size is not less than tag_size
+ if (chunk_size < tag_size) {
+ std::memcpy(_buffer.data(), tag.data(), chunk_size);
+ } else {
+ // Fill the buffer with key:offset
+ std::memcpy(_buffer.data(), tag.data(), tag_size);
+ // Fill the remaining part
+ std::fill(_buffer.data() + tag_size, _buffer.data() + chunk_size,
PAD_CHAR);
+ }
+
+ _generated_size += chunk_size;
+ return doris::Slice(_buffer.data(), chunk_size);
+ }
+
+ bool has_more() const { return _generated_size < _total_size; }
+
+private:
+ const size_t _total_size;
+ size_t _generated_size;
+ std::vector<char> _buffer;
+};
+
+class DataVerifier {
+public:
+ static bool verify_data(const std::string& key, size_t file_size, size_t
read_offset,
+ const std::string& data, size_t data_size) {
+ size_t current_block_start = (read_offset / BUFFER_SIZE) * BUFFER_SIZE;
+ size_t data_pos = 0;
+
+ while (data_pos < data_size) {
+ // Calculate the offset in the current block
+ size_t block_offset = read_offset + data_pos - current_block_start;
+
+ // Check if it exceeds the total file size
+ if (current_block_start >= file_size) {
+ break;
+ }
+
+ // Generate the expected tag
+ std::string expected_tag = fmt::format("key={},offset={}\n", key,
current_block_start);
+
+ // If within the tag range, need to verify the tag
+ if (block_offset < expected_tag.size()) {
+ // Calculate the length of the tag that can be read in the
current data
+ size_t available_tag_len =
+ std::min(expected_tag.size() - block_offset, data_size
- data_pos);
+
+ // If already at the end of the file, only verify the actual
existing data
+ if (read_offset + data_pos + available_tag_len > file_size) {
+ available_tag_len = file_size - (read_offset + data_pos);
+ }
+
+ if (available_tag_len == 0) break;
+ std::string_view actual_tag(data.data() + data_pos,
available_tag_len);
+ std::string_view expected_tag_part(expected_tag.data() +
block_offset,
+ available_tag_len);
+
+ if (actual_tag != expected_tag_part) {
+ LOG(ERROR) << "Tag mismatch at offset " << (read_offset +
data_pos)
+ << "\nExpected: " << expected_tag_part <<
"\nGot: " << actual_tag;
+ return false;
+ }
+ data_pos += available_tag_len;
+ } else {
+ char expected_byte = static_cast<char>(PAD_CHAR);
+ if (data[data_pos] != expected_byte) {
+ LOG(ERROR) << "Data mismatch at offset " << (read_offset +
data_pos)
+ << "\nExpected byte: " << (char)expected_byte
+ << "\nGot byte: " << (char)data[data_pos];
+ return false;
+ }
+ data_pos++;
+ }
+
+ // If reaching the end of the block, move to the next block
+ if ((read_offset + data_pos) % BUFFER_SIZE == 0) {
+ current_block_start += BUFFER_SIZE;
+ }
+ }
+
+ return true;
+ }
+};
+
+// Define a struct to store file information
+struct FileInfo {
+ std::string filename; // File name
+ size_t data_size; // Data size
+ std::string job_id; // Associated job ID
+};
+
+class S3FileRecords {
+public:
+ void add_file_info(const std::string& job_id, const FileInfo& file_info) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ records_[job_id].emplace_back(file_info);
+ }
+
+ int64_t get_exist_job_perfile_size_by_prefix(const std::string&
file_prefix) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (const auto& pair : records_) {
+ const std::vector<FileInfo>& file_infos = pair.second;
+ for (const auto& file_info : file_infos) {
+ if (file_info.filename.compare(0, file_prefix.length(),
file_prefix) == 0) {
+ return file_info.data_size;
+ }
+ }
+ }
+ return -1;
+ }
+
+ void get_exist_job_files_by_prefix(const std::string& file_prefix,
+ std::vector<std::string>& result, int
file_number = -1) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (const auto& pair : records_) {
+ const std::vector<FileInfo>& file_infos = pair.second;
+ for (const auto& file_info : file_infos) {
+ if (file_info.filename.compare(0, file_prefix.length(),
file_prefix) == 0) {
+ if (file_number == -1 || result.size() < file_number) {
+ result.push_back(file_info.filename);
+ }
+ if (file_number != -1 && result.size() >= file_number) {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ std::string find_job_id_by_prefix(const std::string& file_prefix) {
+ std::lock_guard<std::mutex> lock(mutex_);
+ for (const auto& pair : records_) {
+ const std::vector<FileInfo>& file_infos = pair.second;
+ for (const auto& file_info : file_infos) {
+ if (file_info.filename.compare(0, file_prefix.length(),
file_prefix) == 0) {
+ return pair.first;
+ }
+ }
+ }
+ return "";
+ }
+
+private:
+ std::mutex mutex_;
+ std::map<std::string, std::vector<FileInfo>> records_;
+};
+
+// Create a global S3FileRecords instance
+S3FileRecords s3_file_records;
+
+class MircobenchS3FileWriter {
+public:
+ MircobenchS3FileWriter(std::shared_ptr<doris::io::ObjClientHolder> client,
+ const std::string& bucket, const std::string& key,
+ const doris::io::FileWriterOptions* options,
+ std::shared_ptr<doris::S3RateLimiterHolder>
rate_limiter)
+ : _writer(client, bucket, key, options),
_rate_limiter(rate_limiter) {}
+
+ doris::Status appendv(const doris::Slice* slices, size_t slices_size,
+ const std::shared_ptr<bvar::LatencyRecorder>&
write_bvar) {
+ if (_rate_limiter) {
+ _rate_limiter->add(1); // Consume a token
+ }
+ using namespace doris;
+ if (write_bvar) {
+ SCOPED_BVAR_LATENCY(*write_bvar)
+ }
+ SCOPED_BVAR_LATENCY(microbench_write_latency);
+ return _writer.appendv(slices, slices_size);
+ }
+
+ doris::Status close() { return _writer.close(); }
+
+private:
+ doris::io::S3FileWriter _writer;
+ std::shared_ptr<doris::S3RateLimiterHolder> _rate_limiter;
+};
+
+class MicrobenchFileReader {
+public:
+ MicrobenchFileReader(std::shared_ptr<doris::io::FileReader> base_reader,
+ std::shared_ptr<doris::S3RateLimiterHolder>
rate_limiter)
+ : _base_reader(std::move(base_reader)),
_rate_limiter(rate_limiter) {}
+
+ doris::Status read_at(size_t offset, const doris::Slice& result, size_t*
bytes_read,
+ const doris::io::IOContext* io_ctx,
+ std::shared_ptr<bvar::LatencyRecorder> read_bvar) {
+ if (_rate_limiter) {
+ _rate_limiter->add(1); // Consume a token
+ }
+ using namespace doris;
+ if (read_bvar) {
+ SCOPED_BVAR_LATENCY(*read_bvar)
+ }
+ SCOPED_BVAR_LATENCY(microbench_write_latency);
+ return _base_reader->read_at(offset, result, bytes_read, io_ctx);
+ }
+
+ size_t size() const { return _base_reader->size(); }
+
+ doris::Status close() { return _base_reader->close(); }
+
+private:
+ std::shared_ptr<doris::io::FileReader> _base_reader;
+ std::shared_ptr<doris::S3RateLimiterHolder> _rate_limiter;
+};
+
+class ThreadPool {
+public:
+ ThreadPool(size_t num_threads) : stop(false) {
+ try {
+ for (size_t i = 0; i < num_threads; ++i) {
+ workers.emplace_back([this] {
+ try {
+ while (true) {
+ std::function<void()> task;
+ {
+ std::unique_lock<std::mutex> lock(queue_mutex);
+ condition.wait(lock, [this] { return stop ||
!tasks.empty(); });
+ if (stop && tasks.empty()) {
+ return;
+ }
+ task = std::move(tasks.front());
+ tasks.pop();
+ }
+ task();
+ }
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Exception in thread pool worker: " <<
e.what();
+ } catch (...) {
+ LOG(ERROR) << "Unknown exception in thread pool
worker";
+ }
+ });
+ }
+ } catch (...) {
+ // Ensure proper cleanup in case of exception during construction
+ stop = true;
+ condition.notify_all();
+ throw;
+ }
+ }
+
+ template <class F>
+ std::future<void> enqueue(F&& f) {
+ auto task =
std::make_shared<std::packaged_task<void()>>(std::forward<F>(f));
+ std::future<void> res = task->get_future();
+ {
+ std::unique_lock<std::mutex> lock(queue_mutex);
+ if (stop) {
+ throw std::runtime_error("enqueue on stopped ThreadPool");
+ }
+ tasks.emplace([task]() {
+ try {
+ (*task)();
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Exception in task: " << e.what();
+ } catch (...) {
+ LOG(ERROR) << "Unknown exception in task";
+ }
+ });
+ }
+ condition.notify_one();
+ return res;
+ }
+
+ ~ThreadPool() {
+ {
+ std::unique_lock<std::mutex> lock(queue_mutex);
+ stop = true;
+ }
+ condition.notify_all();
+
+ // Safely wait for all threads to complete
+ for (auto& worker : workers) {
+ try {
+ if (worker.joinable()) {
+ worker.join();
+ }
+ } catch (const std::system_error& e) {
+ LOG(WARNING) << "Failed to join thread: " << e.what();
+ }
+ }
+ }
+
+private:
+ std::vector<std::thread> workers;
+ std::queue<std::function<void()>> tasks;
+ std::mutex queue_mutex;
+ std::condition_variable condition;
+ bool stop;
+};
+
+class FileCompletionTracker {
+public:
+ void mark_completed(const std::string& key) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _completed_files.insert(key);
+ _cv.notify_all(); // Notify all waiting threads
+ }
+
+ bool is_completed(const std::string& key) {
+ return _completed_files.find(key) != _completed_files.end();
+ }
+
+ void wait_for_completion(const std::string& key) {
+ std::unique_lock<std::mutex> lock(_mutex);
+ _cv.wait(lock, [&] { return is_completed(key); });
+ }
+
+private:
+ std::mutex _mutex;
+ std::condition_variable _cv;
+ std::unordered_set<std::string> _completed_files;
+};
+
+std::string get_usage(const std::string& progname) {
+ std::string usage = R"(
+ )" + progname + R"( is the Doris microbench tool for testing file cache in
cloud.
+
+ Usage:
+ Start the server:
+ )" + progname + R"( --port=<port_number>
+
+ API Endpoints:
+ POST /submit_job
+ Submit a job with the following JSON body:
+ {
+ "size_bytes_perfile": <size>, // Number of bytes to write per
segment file
+ "write_iops": <limit>, // IOPS limit for writing per
segment files
+ "read_iops": <limit>, // IOPS limit for reading per
segment files
+ "num_threads": <count>, // Number of threads in the
thread pool, default 200
+ "num_files": <count>, // Number of segments to
write/read
+ "file_prefix": "<prefix>", // Prefix for segment files,
Notice: this tools hide prefix(test_file_cache_microbench/) before file_prefix
+ "write_batch_size": <size>, // Size of data to write in
each write operation
+ "cache_type": <type>, // Write or Read data enter
file cache queue type, support NORMAL | TTL | INDEX | DISPOSABLE, default NORMAL
+ "expiration": <timestamp>, // File cache ttl expire time,
value is a unix timestamp
+ "repeat": <count>, // Read repeat times, default 1
+ "read_offset": [<left>, <right>], // Range for reading (left
inclusive, right exclusive)
+ "read_length": [<left>, <right>] // Range for reading length
(left inclusive, right exclusive)
+ }
+
+ GET /get_job_status/<job_id>
+ Retrieve the status of a submitted job.
+ Parameters:
+ - job_id: The ID of the job to retrieve status for.
+ - files (optional): If provided, returns the associated file records
for the job.
+ Example: /get_job_status/job_id?files=10
+
+ GET /list_jobs
+ List all submitted jobs and their statuses.
+
+ GET /get_help
+ Get this help information.
+
+ GET /file_cache_clear
+ Clear the file cache with the following query parameters:
+ {
+ "sync": <true|false>, // Whether to synchronize the
cache clear operation
+ "segment_path": "<path>" // Optional path of the segment
to clear from the cache
+ }
+ If "segment_path" is not provided, all caches will be cleared based on
the "sync" parameter.
+
+ GET /file_cache_reset
+ Reset the file cache with the following query parameters:
+ {
+ "capacity": <new_capacity>, // New capacity for the
specified path
+ "path": "<path>" // Path of the segment to reset
+ }
+
+ GET /file_cache_release
+ Release the file cache with the following query parameters:
+ {
+ "base_path": "<base_path>" // Optional base path to
release specific caches
+ }
+
+ GET /update_config
+ Update the configuration with the following JSON body:
+ {
+ "config_key": "<key>", // The configuration key to
update
+ "config_value": "<value>", // The new value for the
configuration key
+ "persist": <true|false> // Whether to persist the
configuration change
+ }
+
+ GET /show_config
+ Retrieve the current configuration settings.
+
+ Notes:
+ - Ensure that the S3 configuration is set correctly in the environment.
+ - The program will create and read files in the specified S3 bucket.
+ - Monitor the logs for detailed execution information and errors.
+ )" + build_info();
+
+ return usage;
+}
+
+// Job configuration structure
+struct JobConfig {
+ // Default value initialization
+ int64_t size_bytes_perfile = 1024 * 1024;
+ int32_t write_iops = 0;
+ int32_t read_iops = 0;
+ int32_t num_threads = 200;
+ int32_t num_files = 1;
+ std::string file_prefix;
+ std::string cache_type = "NORMAL";
+ int64_t expiration = 0;
+ int32_t repeat = 1;
+ int64_t write_batch_size = doris::config::s3_write_buffer_size;
+ int64_t read_offset_left = 0;
+ int64_t read_offset_right = 0;
+ int64_t read_length_left = 0;
+ int64_t read_length_right = 0;
+ bool write_file_cache = true;
+ bool bvar_enable = false;
+
+ // Parse configuration from JSON
+ static JobConfig from_json(const std::string& json_str) {
+ JobConfig config;
+ rapidjson::Document d;
+ d.Parse(json_str.c_str());
+
+ if (d.HasParseError()) {
+ throw std::runtime_error("JSON parse error json args=" + json_str);
+ }
+
+ // Basic validation
+ validate(d);
+
+ // Use helper functions to parse each field
+ parse_basic_fields(d, config);
+ parse_cache_settings(d, config);
+ parse_read_settings(d, config);
+
+ // Additional validation
+ validate_config(config);
+
+ return config;
+ }
+
+private:
+ // Validate the JSON document
+ static void validate(const rapidjson::Document& json_data) {
+ if (!json_data.HasMember("file_prefix") ||
!json_data["file_prefix"].IsString() ||
+ strlen(json_data["file_prefix"].GetString()) == 0) {
+ throw std::runtime_error("file_prefix is required and cannot be
empty");
+ }
+ }
+
+ // Parse basic fields
+ static void parse_basic_fields(const rapidjson::Document& d, JobConfig&
config) {
+ // Parse file_prefix (required field)
+ config.file_prefix = d["file_prefix"].GetString();
+
+ // Parse optional fields
+ if (d.HasMember("num_files") && d["num_files"].IsInt()) {
+ config.num_files = d["num_files"].GetInt();
+ }
+
+ if (d.HasMember("size_bytes_perfile") &&
d["size_bytes_perfile"].IsInt64()) {
+ config.size_bytes_perfile = d["size_bytes_perfile"].GetInt64();
+ }
+
+ if (d.HasMember("write_iops") && d["write_iops"].IsInt()) {
+ config.write_iops = d["write_iops"].GetInt();
+ }
+
+ if (d.HasMember("read_iops") && d["read_iops"].IsInt()) {
+ config.read_iops = d["read_iops"].GetInt();
+ }
+
+ if (d.HasMember("num_threads") && d["num_threads"].IsInt()) {
+ config.num_threads = d["num_threads"].GetInt();
+ }
+
+ if (d.HasMember("repeat") && d["repeat"].IsInt64()) {
+ config.repeat = d["repeat"].GetInt64();
+ }
+
+ if (d.HasMember("write_batch_size") &&
d["write_batch_size"].IsInt64()) {
+ config.write_batch_size = d["write_batch_size"].GetInt64();
+ }
+
+ if (d.HasMember("write_file_cache") && d["write_file_cache"].IsBool())
{
+ config.write_file_cache = d["write_file_cache"].GetBool();
+ }
+
+ if (d.HasMember("bvar_enable") && d["bvar_enable"].IsBool()) {
+ config.bvar_enable = d["bvar_enable"].GetBool();
+ }
+ }
+
+ // Parse cache-related settings
+ static void parse_cache_settings(const rapidjson::Document& d, JobConfig&
config) {
+ if (d.HasMember("cache_type") && d["cache_type"].IsString()) {
+ config.cache_type = d["cache_type"].GetString();
+ }
+
+ // Check for TTL cache type
+ if (config.cache_type == "TTL") {
+ if (!d.HasMember("expiration") || !d["expiration"].IsInt64()) {
+ throw std::runtime_error(
+ "expiration is required and must be an integer when
cache type is TTL");
+ }
+ config.expiration = d["expiration"].GetInt64();
+ }
+ }
+
+ // Parse read-related settings
+ static void parse_read_settings(const rapidjson::Document& d, JobConfig&
config) {
+ if (config.read_iops > 0) {
+ // Parse read_offset
+ if (d.HasMember("read_offset") && d["read_offset"].IsArray() &&
+ d["read_offset"].Size() == 2) {
+ const rapidjson::Value& read_offset_array = d["read_offset"];
+ config.read_offset_left = read_offset_array[0].GetInt64();
+ config.read_offset_right = read_offset_array[1].GetInt64();
+ } else {
+ throw std::runtime_error("Invalid read_offset format, expected
array of size 2");
+ }
+
+ // Parse read_length
+ if (d.HasMember("read_length") && d["read_length"].IsArray() &&
+ d["read_length"].Size() == 2) {
+ const rapidjson::Value& read_length_array = d["read_length"];
+ config.read_length_left = read_length_array[0].GetInt64();
+ config.read_length_right = read_length_array[1].GetInt64();
+ } else {
+ throw std::runtime_error("Invalid read_length format, expected
array of size 2");
+ }
+ }
+ }
+
+ // Validate the validity of the configuration
+ static void validate_config(const JobConfig& config) {
+ if (config.num_threads <= 0 || config.num_threads > 10000) {
+ throw std::runtime_error("num_threads must be between 1 and
10000");
+ }
+
+ if (config.size_bytes_perfile <= 0) {
+ throw std::runtime_error("size_bytes_perfile must be positive");
+ }
+
+ if (config.read_iops > 0) {
+ if (config.read_offset_left >= config.read_offset_right) {
+ throw std::runtime_error("read_offset_left must be less than
read_offset_right");
+ }
+
+ if (config.read_length_left >= config.read_length_right) {
+ throw std::runtime_error("read_length_left must be less than
read_length_right");
+ }
+ }
+
+ if (config.cache_type == "TTL" && config.expiration <= 0) {
+ throw std::runtime_error("expiration must be positive when cache
type is TTL");
+ }
+ }
+
+public:
+ std::string to_string() const {
+ return fmt::format(
+ "size_bytes_perfile: {}, write_iops: {}, read_iops: {},
num_threads: {}, "
+ "num_files: {}, file_prefix: {}, write_file_cache: {},
write_batch_size: {}, "
+ "repeat: {}, expiration: {}, cache_type: {}, read_offset: [{},
{}), "
+ "read_length: [{}, {})",
+ size_bytes_perfile, write_iops, read_iops, num_threads,
num_files,
+ HIDDEN_PREFIX + file_prefix, write_file_cache,
write_batch_size, repeat, expiration,
+ cache_type, read_offset_left, read_offset_right,
read_length_left,
+ read_length_right);
+ }
+};
+
+// Job status
+enum class JobStatus { PENDING, RUNNING, COMPLETED, FAILED };
+
+// Job structure
+struct Job {
+ std::string job_id;
+ JobConfig config;
+ JobStatus status;
+ std::string error_message;
+ std::chrono::system_clock::time_point create_time;
+ std::chrono::system_clock::time_point start_time;
+ std::chrono::system_clock::time_point end_time;
+
+ std::shared_ptr<doris::S3RateLimiterHolder> write_limiter;
+ std::shared_ptr<doris::S3RateLimiterHolder> read_limiter;
+
+ // Job execution result statistics
+ struct Statistics {
+ std::string total_write_time;
+ std::string total_read_time;
+ // struct FileCacheStatistics
+ int64_t num_local_io_total = 0;
+ int64_t num_remote_io_total = 0;
+ int64_t num_inverted_index_remote_io_total = 0;
+ int64_t local_io_timer = 0;
+ int64_t bytes_read_from_local = 0;
+ int64_t bytes_read_from_remote = 0;
+ int64_t remote_io_timer = 0;
+ int64_t write_cache_io_timer = 0;
+ int64_t bytes_write_into_cache = 0;
+ int64_t num_skip_cache_io_total = 0;
+ int64_t read_cache_file_directly_timer = 0;
+ int64_t cache_get_or_set_timer = 0;
+ int64_t lock_wait_timer = 0;
+ int64_t get_timer = 0;
+ int64_t set_timer = 0;
+ } stats;
+
+ // Record associated file information for the job
+ std::vector<FileInfo> file_records;
+
+ // Add completion_tracker
+ std::shared_ptr<FileCompletionTracker> completion_tracker;
+
+ std::shared_ptr<bvar::LatencyRecorder> write_latency;
+ std::shared_ptr<bvar::Adder<int64_t>> write_rate_limit_s;
+ std::shared_ptr<bvar::LatencyRecorder> read_latency;
+ std::shared_ptr<bvar::Adder<int64_t>> read_rate_limit_s;
+
+ // Default constructor
+ Job() : job_id(""), status(JobStatus::PENDING),
create_time(std::chrono::system_clock::now()) {
+ init_latency_recorders("");
+ completion_tracker = std::make_shared<FileCompletionTracker>();
+ }
+
+ // Constructor with parameters
+ Job(const std::string& id, const JobConfig& cfg)
+ : job_id(id),
+ config(cfg),
+ status(JobStatus::PENDING),
+ create_time(std::chrono::system_clock::now()) {
+ init_latency_recorders(id);
+ if (cfg.write_iops > 0 && cfg.read_iops > 0) {
+ completion_tracker = std::make_shared<FileCompletionTracker>();
+ }
+ init_limiters(cfg);
+ }
+
+private:
+ void init_latency_recorders(const std::string& id) {
+ if (config.write_iops > 0 && config.bvar_enable) {
+ write_latency =
+
std::make_shared<bvar::LatencyRecorder>("file_cache_microbench_append_" + id);
+ write_rate_limit_s = std::make_shared<bvar::Adder<int64_t>>(
+ "file_cache_microbench_append_rate_limit_ns_" + id);
+ }
+
+ if (config.read_iops > 0 && config.bvar_enable) {
+ read_latency =
+
std::make_shared<bvar::LatencyRecorder>("file_cache_microbench_read_at_" + id);
+ read_rate_limit_s = std::make_shared<bvar::Adder<int64_t>>(
+ "file_cache_microbench_read_rate_limit_ns_" + id);
+ }
+ }
+
+ void init_limiters(const JobConfig& cfg) {
+ if (cfg.write_iops > 0) {
+ write_limiter = std::make_shared<doris::S3RateLimiterHolder>(
+ doris::S3RateLimitType::PUT,
+ cfg.write_iops, // max_speed (IOPS)
+ cfg.write_iops, // max_burst
+ 0, // no limit
+ [this](int64_t wait_time_ns) {
+ if (wait_time_ns > 0 && write_rate_limit_s) {
+ *write_rate_limit_s << wait_time_ns / NS;
+ }
+ });
+ }
+
+ if (cfg.read_iops > 0) {
+ read_limiter = std::make_shared<doris::S3RateLimiterHolder>(
+ doris::S3RateLimitType::GET,
+ cfg.read_iops, // max_speed (IOPS)
+ cfg.read_iops, // max_burst
+ 0, // no limit
+ [this](int64_t wait_time_ns) {
+ if (wait_time_ns > 0 && read_rate_limit_s) {
+ *read_rate_limit_s << wait_time_ns / NS;
+ }
+ });
+ }
+ }
+};
+
+// Job manager
+class JobManager {
+public:
+ JobManager() : _next_job_id(0),
_job_executor_pool(std::thread::hardware_concurrency()) {
+ LOG(INFO) << "Initialized JobManager with " <<
std::thread::hardware_concurrency()
+ << " executor threads";
+ }
+
+ ~JobManager() {
+ try {
+ stop();
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error stopping JobManager: " << e.what();
+ }
+ }
+
+ // Submit a new job
+ std::string submit_job(const JobConfig& config) {
+ try {
+ std::string job_id = generate_job_id();
+
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ _jobs[job_id] = std::make_shared<Job>(job_id, config);
+ }
+
+ LOG(INFO) << "Submitting job " << job_id << " with config: " <<
config.to_string();
+
+ // Execute the job asynchronously
+ _job_executor_pool.enqueue(
+ [this, job_id]() {
execute_job_with_status_updates(job_id); });
+
+ return job_id;
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error submitting job: " << e.what();
+ throw std::runtime_error("Failed to submit job: " +
std::string(e.what()));
+ }
+ }
+
+ // Get job status
+ const Job& get_job_status(const std::string& job_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ auto it = _jobs.find(job_id);
+ if (it != _jobs.end()) {
+ return *(it->second);
+ }
+ throw std::runtime_error("Job not found: " + job_id);
+ }
+
+ std::shared_ptr<Job> get_job_ptr(const std::string& job_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ auto it = _jobs.find(job_id);
+ if (it != _jobs.end()) {
+ return it->second;
+ }
+ return nullptr;
+ }
+
+ // List all jobs
+ std::vector<std::shared_ptr<Job>> list_jobs() {
+ std::lock_guard<std::mutex> lock(_mutex);
+ std::vector<std::shared_ptr<Job>> job_list;
+ job_list.reserve(_jobs.size());
+ for (const auto& pair : _jobs) {
+ job_list.push_back(pair.second);
+ }
+ return job_list;
+ }
+
+ void start() { LOG(INFO) << "JobManager started"; }
+
+ void stop() {
+ LOG(INFO) << "Stopping JobManager and waiting for all jobs to
complete";
+ _job_executor_pool.~ThreadPool();
+ LOG(INFO) << "JobManager stopped";
+ }
+
+ // Record file information
+ void record_file_info(const std::string& key, size_t data_size, const
std::string& job_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ auto it = _jobs.find(job_id);
+ if (it != _jobs.end()) {
+ FileInfo file_info = {key, data_size, job_id};
+ it->second->file_records.push_back(file_info);
+ s3_file_records.add_file_info(job_id, file_info);
+ } else {
+ LOG(ERROR) << "Job ID not found when recording file info: " <<
job_id;
+ }
+ }
+
+ // Cancel job (not implemented yet)
+ bool cancel_job(const std::string& job_id) {
+ LOG(WARNING) << "Job cancellation not implemented yet: " << job_id;
+ return false;
+ }
+
+private:
+ // Generate a unique job ID
+ std::string generate_job_id() {
+ std::lock_guard<std::mutex> lock(_mutex);
+ std::string job_id =
+ "job_" + std::to_string(std::time(nullptr)) + "_" +
std::to_string(_next_job_id++);
+ return job_id;
+ }
+
+ // Execute job with status updates
+ void execute_job_with_status_updates(const std::string& job_id) {
+ std::shared_ptr<Job> job_ptr;
+
+ // Get job pointer and update status to RUNNING
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ auto it = _jobs.find(job_id);
+ if (it == _jobs.end()) {
+ LOG(ERROR) << "Job not found for execution: " << job_id;
+ return;
+ }
+ job_ptr = it->second;
+ job_ptr->status = JobStatus::RUNNING;
+ job_ptr->start_time = std::chrono::system_clock::now();
+ }
+
+ LOG(INFO) << "Starting execution of job " << job_id;
+
+ try {
+ // Execute job
+ execute_job(job_id);
+
+ // Update status to COMPLETED
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ job_ptr->status = JobStatus::COMPLETED;
+ job_ptr->end_time = std::chrono::system_clock::now();
+ }
+
+ LOG(INFO) << "Job " << job_id << " completed successfully";
+ } catch (const std::exception& e) {
+ // Update status to FAILED
+ {
+ std::lock_guard<std::mutex> lock(_mutex);
+ job_ptr->status = JobStatus::FAILED;
+ job_ptr->error_message = e.what();
+ job_ptr->end_time = std::chrono::system_clock::now();
+ }
+
+ LOG(ERROR) << "Job " << job_id << " failed: " << e.what();
+ }
+ }
+
+ // Core logic for executing a job
+ void execute_job(const std::string& job_id) {
+ std::shared_ptr<Job> job_ptr = get_job_ptr(job_id);
+ if (!job_ptr) {
+ throw std::runtime_error("Job not found");
+ }
+
+ Job& job = *job_ptr;
+ JobConfig& config = job.config;
+ LOG(INFO) << "Executing job " << job_id << " with config: " <<
config.to_string();
+
+ // Generate multiple keys
+ std::vector<std::string> keys;
+ keys.reserve(config.num_files);
+
+ std::string rewrite_job_id = job_id;
+ // If it's a read-only job, find the previously written files
+ if (config.read_iops > 0 && config.write_iops == 0) {
+ std::string old_job_id =
+ s3_file_records.find_job_id_by_prefix(HIDDEN_PREFIX +
config.file_prefix);
+ if (old_job_id.empty()) {
+ throw std::runtime_error(
+ "Can't find previously job uploaded files. Please make
sure read "
+ "files exist in obj or It is also possible that you
have restarted "
+ "the file_cache_microbench program, job_id = " +
+ job_id);
+ }
+ rewrite_job_id = old_job_id;
+ }
+
+ // Generate file keys
+ for (int i = 0; i < config.num_files; ++i) {
+ keys.push_back(HIDDEN_PREFIX + config.file_prefix + "/" +
rewrite_job_id + "_" +
+ std::to_string(i));
+ }
+
+ // Execute write tasks
+ if (config.write_iops > 0) {
+ execute_write_tasks(keys, job, config);
+ }
+
+ // Execute read tasks
+ if (config.read_iops > 0) {
+ execute_read_tasks(keys, job, config);
+ }
+
+ LOG(INFO) << "Job " << job_id << " execution completed";
+ }
+
+private:
+ doris::S3ClientConf create_s3_client_conf(const JobConfig& config) {
+ doris::S3ClientConf s3_conf;
+ s3_conf.max_connections = std::max(256, config.num_threads * 4);
+ s3_conf.request_timeout_ms = 60000;
+ s3_conf.connect_timeout_ms = 3000;
+ s3_conf.ak = doris::config::test_s3_ak;
+ s3_conf.sk = doris::config::test_s3_sk;
+ s3_conf.region = doris::config::test_s3_region;
+ s3_conf.endpoint = doris::config::test_s3_endpoint;
+ return s3_conf;
+ }
+
+ // Execute write tasks
+ void execute_write_tasks(const std::vector<std::string>& keys, Job& job,
+ const JobConfig& config) {
+ // Create S3 client configuration
+ doris::S3ClientConf s3_conf = create_s3_client_conf(config);
+
+ // Initialize S3 client
+ auto client = std::make_shared<doris::io::ObjClientHolder>(s3_conf);
+ doris::Status init_status = client->init();
+ if (!init_status.ok()) {
+ throw std::runtime_error("Failed to initialize S3 client: " +
init_status.to_string());
+ }
+
+ std::atomic<int> completed_writes(0);
+ std::vector<std::future<void>> write_futures;
+ write_futures.reserve(keys.size());
+ ThreadPool write_pool(config.num_threads);
+
+ // Start write tasks
+ doris::MonotonicStopWatch write_stopwatch;
+ write_stopwatch.start();
+ for (int i = 0; i < keys.size(); ++i) {
+ const auto& key = keys[i];
+ write_futures.push_back(write_pool.enqueue([&, key]() {
+ try {
+ DataGenerator data_generator(config.size_bytes_perfile);
+ doris::io::FileWriterOptions options;
+ if (config.cache_type == "TTL") {
+ options.file_cache_expiration = config.expiration;
+ }
+ options.write_file_cache = config.write_file_cache;
+ auto writer = std::make_unique<MircobenchS3FileWriter>(
+ client, doris::config::test_s3_bucket, key,
&options,
+ job.write_limiter);
+ doris::Defer defer {[&]() {
+ if (auto status = writer->close(); !status.ok()) {
+ LOG(ERROR) << "close file writer failed" <<
status.to_string();
+ }
+ }};
+
+ std::vector<doris::Slice> slices;
+ slices.reserve(4);
+ size_t accumulated_size = 0;
+
+ // Stream data writing
+ while (data_generator.has_more()) {
+ doris::Slice chunk = data_generator.next_chunk(key);
+ slices.push_back(chunk);
+ accumulated_size += chunk.size;
+
+ if (accumulated_size >= config.write_batch_size ||
+ !data_generator.has_more()) {
+ doris::Status status =
writer->appendv(slices.data(), slices.size(),
+
job.write_latency);
+ if (!status.ok()) {
+ throw std::runtime_error("Write error for key
" + key + ": " +
+ status.to_string());
+ }
+ slices.clear();
+ accumulated_size = 0;
+ }
+ }
+ if (job.completion_tracker) {
+ job.completion_tracker->mark_completed(key);
+ }
+
+ // Record successful file information
+ size_t data_size = config.size_bytes_perfile;
+ record_file_info(key, data_size, job.job_id);
+ completed_writes++;
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Write task failed for segment " << key <<
": " << e.what();
+ }
+ }));
+ }
+
+ // Wait for all write tasks to complete
+ for (auto& future : write_futures) {
+ future.get();
+ }
+ write_stopwatch.stop();
+
+ // Convert write time from nanoseconds to seconds and format as string
+ double total_write_time_seconds =
+ write_stopwatch.elapsed_time() / 1e9; // nanoseconds to seconds
+ job.stats.total_write_time =
+ std::to_string(total_write_time_seconds) + " seconds"; // Save
as string
+ LOG(INFO) << "Total write time: " << job.stats.total_write_time << "
seconds";
+ }
+
+ // Execute read tasks
+ void execute_read_tasks(const std::vector<std::string>& keys, Job& job,
JobConfig& config) {
+ LOG(INFO) << "Starting read tasks for job " << job.job_id << ",
num_keys=" << keys.size()
+ << ", read_iops=" << config.read_iops;
+ auto start_time = std::chrono::steady_clock::now();
+
+ int64_t exist_job_perfile_size =
s3_file_records.get_exist_job_perfile_size_by_prefix(
+ HIDDEN_PREFIX + config.file_prefix);
+ std::vector<std::future<void>> read_futures;
+ doris::io::IOContext io_ctx;
+ doris::io::FileCacheStatistics total_stats;
+ io_ctx.file_cache_stats = &total_stats;
+ if (config.cache_type == "DISPOSABLE") {
+ io_ctx.is_disposable = true;
+ } else if (config.cache_type == "TTL") {
+ io_ctx.expiration_time = config.expiration;
+ } else if (config.cache_type == "INDEX") {
+ io_ctx.is_index_data = true;
+ } else { // default NORMAL
+ // do nothing
+ }
+ ThreadPool read_pool(config.num_threads);
+ std::atomic<int> completed_reads(0);
+ doris::MonotonicStopWatch read_stopwatch; // Add read task timer
+
+ // Create S3 client configuration
+ doris::S3ClientConf s3_conf = create_s3_client_conf(config);
+ std::vector<std::string> read_files;
+ if (exist_job_perfile_size != -1) {
+ // read exist files
+ s3_file_records.get_exist_job_files_by_prefix(HIDDEN_PREFIX +
config.file_prefix,
+ read_files,
config.num_files);
+ }
+
+ if (read_files.empty()) {
+ // not read exist files
+ read_files = keys;
+ }
+ LOG(INFO) << "job_id = " << job.job_id << " read_files size = " <<
read_files.size();
+
+ read_stopwatch.start();
+ for (int i = 0; i < read_files.size(); ++i) {
+ const auto& key = read_files[i];
+ read_futures.push_back(read_pool.enqueue([&, key]() {
+ try {
+ if (job.completion_tracker) {
+ job.completion_tracker->wait_for_completion(
+ key); // Wait for file completion
+ }
+ doris::io::FileReaderOptions reader_opts;
+ reader_opts.cache_type =
doris::io::FileCachePolicy::FILE_BLOCK_CACHE;
+ reader_opts.is_doris_table = true;
+
+ doris::io::FileDescription fd;
+ std::string obj_path = "s3://" +
doris::config::test_s3_bucket + "/";
+ fd.path = doris::io::Path(obj_path + key);
+ fd.file_size = exist_job_perfile_size != -1 ?
exist_job_perfile_size
+ :
config.size_bytes_perfile;
+ doris::io::FileSystemProperties fs_props;
+ fs_props.system_type = doris::TFileType::FILE_S3;
+
+ std::map<std::string, std::string> props;
+ props["AWS_ACCESS_KEY"] = s3_conf.ak;
+ props["AWS_SECRET_KEY"] = s3_conf.sk;
+ props["AWS_ENDPOINT"] = s3_conf.endpoint;
+ props["AWS_REGION"] = s3_conf.region;
+ props["AWS_MAX_CONNECTIONS"] =
std::to_string(s3_conf.max_connections);
+ props["AWS_REQUEST_TIMEOUT_MS"] =
std::to_string(s3_conf.request_timeout_ms);
+ props["AWS_CONNECT_TIMEOUT_MS"] =
std::to_string(s3_conf.connect_timeout_ms);
+ props["use_path_style"] = s3_conf.use_virtual_addressing ?
"false" : "true";
+
+ fs_props.properties = std::move(props);
+
+ int read_retry_count = 0;
+ const int max_read_retries = 50;
+ while (read_retry_count < max_read_retries) {
+ auto status_or_reader =
doris::FileFactory::create_file_reader(
+ fs_props, fd, reader_opts, nullptr);
+ if (!status_or_reader.has_value()) {
+ if (++read_retry_count >= max_read_retries) {
+ LOG(ERROR) << "Failed to create reader for key
" << key
+ << status_or_reader.error();
+ }
+
std::this_thread::sleep_for(std::chrono::seconds(1));
+ continue;
+ }
+
+ for (int i = 0; i < config.repeat; i++) {
+ auto reader =
std::make_unique<MicrobenchFileReader>(
+ status_or_reader.value(),
job.read_limiter);
+ doris::Defer defer {[&]() {
+ if (auto status = reader->close();
!status.ok()) {
+ LOG(ERROR) << "close file reader failed"
<< status.to_string();
+ }
+ }};
+
+ size_t read_offset = 0;
+ size_t read_length = 0;
+
+ bool use_random = true;
+ if (config.read_offset_left + 1 ==
config.read_offset_right) {
+ use_random = false;
+ }
+ if (exist_job_perfile_size != -1) {
+ // read exist files
+ if (config.read_offset_right >
exist_job_perfile_size) {
+ config.read_offset_right =
exist_job_perfile_size;
+ }
+ if (config.read_length_right >
exist_job_perfile_size) {
+ config.read_length_right =
exist_job_perfile_size;
+ }
+
+ if (use_random) {
+ std::random_device rd;
+ std::mt19937 gen(rd());
+ // Generate random read_offset between
read_offset_left and read_offset_right - 1
+ std::uniform_int_distribution<size_t>
dis_offset(
+ config.read_offset_left,
config.read_offset_right - 1);
+ read_offset = dis_offset(gen); // Generate
random read_offset
+ std::uniform_int_distribution<size_t>
dis_length(
+ config.read_length_left,
config.read_length_right - 1);
+ read_length = dis_length(gen); // Generate
random read_length
+ if (read_offset + read_length >
exist_job_perfile_size) {
+ read_length = exist_job_perfile_size -
read_offset;
+ }
+ } else { // not random
+ read_offset = config.read_offset_left;
+ read_length = config.read_length_left;
+ }
+ } else {
+ // new files
+ read_offset = config.read_offset_left;
+ read_length = config.read_length_left;
+ if (read_length == -1 ||
+ read_offset + read_length >
config.size_bytes_perfile) {
+ read_length = config.size_bytes_perfile -
read_offset;
+ }
+ }
+ LOG(INFO) << "read_offset=" << read_offset
+ << " read_length=" << read_length;
+ CHECK(read_offset >= 0)
+ << "Calculated read_offset is negative: "
<< read_offset;
+ CHECK(read_length >= 0)
+ << "Calculated read_length is negative: "
<< read_length;
+
+ std::string read_buffer;
+ read_buffer.resize(read_length);
+
+ size_t total_bytes_read = 0;
+ while (total_bytes_read < read_length) {
+ size_t bytes_to_read = std::min(
+ read_length - total_bytes_read,
+ static_cast<size_t>(4 * 1024 * 1024));
// 4MB chunks
+
+ doris::Slice read_slice(read_buffer.data() +
total_bytes_read,
+ bytes_to_read);
+ size_t bytes_read = 0;
+
+ doris::Status read_status =
+ reader->read_at(read_offset +
total_bytes_read, read_slice,
+ &bytes_read, &io_ctx,
job.read_latency);
+
+ if (!read_status.ok()) {
+ throw std::runtime_error("Read error: " +
+
read_status.to_string());
+ }
+
+ if (bytes_read != bytes_to_read) {
+ throw std::runtime_error("Incomplete read:
expected " +
+
std::to_string(bytes_to_read) +
+ " bytes, got " +
+
std::to_string(bytes_read));
+ }
+
+ total_bytes_read += bytes_read;
+ }
+
+ size_t file_size = config.size_bytes_perfile;
+ if (exist_job_perfile_size != -1) {
+ file_size = exist_job_perfile_size;
+ }
+
+ // Verify read data
+ if (!DataVerifier::verify_data(key, file_size,
read_offset, read_buffer,
+ read_length)) {
+ throw std::runtime_error("Data verification
failed for key: " +
+ key);
+ }
+
+ LOG(INFO)
+ << "read_offset=" << read_offset
+ << " read_length=" << read_length << "
file_size=" << file_size;
+
+ completed_reads++;
+ }
+ break;
+ }
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Read task failed for key " << key << ": "
<< e.what();
+ }
+ }));
+ }
+
+ // Wait for all read tasks to complete
+ for (auto& future : read_futures) {
+ future.get();
+ }
+ read_stopwatch.stop(); // Stop timer
+
+ // Convert read time from nanoseconds to seconds and format as string
+ double total_read_time_seconds =
+ read_stopwatch.elapsed_time() / 1e9; // nanoseconds to seconds
+ job.stats.total_read_time =
+ std::to_string(total_read_time_seconds) + " seconds"; // Save
as string
+ LOG(INFO) << "Total read time: " << job.stats.total_read_time << "
seconds";
+
+ // Update job statistics
+ job.stats.num_local_io_total = total_stats.num_local_io_total;
+ job.stats.num_remote_io_total = total_stats.num_remote_io_total;
+ job.stats.num_inverted_index_remote_io_total =
+ total_stats.num_inverted_index_remote_io_total;
+ job.stats.local_io_timer = total_stats.local_io_timer;
+ job.stats.bytes_read_from_local = total_stats.bytes_read_from_local;
+ job.stats.bytes_read_from_remote = total_stats.bytes_read_from_remote;
+ job.stats.remote_io_timer = total_stats.remote_io_timer;
+ job.stats.write_cache_io_timer = total_stats.write_cache_io_timer;
+ job.stats.bytes_write_into_cache = total_stats.bytes_write_into_cache;
+ job.stats.num_skip_cache_io_total =
total_stats.num_skip_cache_io_total;
+ job.stats.read_cache_file_directly_timer =
total_stats.read_cache_file_directly_timer;
+ job.stats.cache_get_or_set_timer = total_stats.cache_get_or_set_timer;
+ job.stats.lock_wait_timer = total_stats.lock_wait_timer;
+ job.stats.get_timer = total_stats.lock_wait_timer;
+ job.stats.set_timer = total_stats.lock_wait_timer;
+
+ auto end_time = std::chrono::steady_clock::now();
+ auto duration =
+ std::chrono::duration_cast<std::chrono::milliseconds>(end_time
- start_time);
+ LOG(INFO) << "Completed read tasks for job " << job.job_id
+ << ", duration=" << duration.count() << "ms";
+ }
+
+ std::mutex _mutex;
+ std::atomic<int> _next_job_id;
+ std::map<std::string, std::shared_ptr<Job>> _jobs;
+ ThreadPool _job_executor_pool;
+};
+
+namespace microbenchService {
+
+class MicrobenchServiceImpl : public microbench::MicrobenchService {
+public:
+ MicrobenchServiceImpl(JobManager& job_manager) : _job_manager(job_manager)
{}
+ virtual ~MicrobenchServiceImpl() {}
+
+ /**
+ * Submit a job
+ *
+ * Receive JSON-formatted job configuration, create and submit the job
+ * Return a JSON response containing the job ID
+ */
+ void submit_job(google::protobuf::RpcController* cntl_base,
+ const microbench::HttpRequest* request,
microbench::HttpResponse* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+ LOG(INFO) << "Received submit job request";
+
+ try {
+ // Parse request body JSON
+ std::string job_config = cntl->request_attachment().to_string();
+ JobConfig config = JobConfig::from_json(job_config);
+
+ LOG(INFO) << "Parsed JobConfig: " << config.to_string();
+
+ std::string job_id = _job_manager.submit_job(config);
+ LOG(INFO) << "Job submitted successfully with ID: " << job_id;
+
+ // Set response headers
+ cntl->http_response().set_content_type("application/json");
+
+ // Return job_id
+ rapidjson::Document response_doc;
+ response_doc.SetObject();
+ rapidjson::Document::AllocatorType& allocator =
response_doc.GetAllocator();
+ response_doc.AddMember("job_id", rapidjson::Value(job_id.c_str(),
allocator),
+ allocator);
+ response_doc.AddMember("status", "success", allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ response_doc.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error submitting job: " << e.what();
+
+ // Set error status code and response
+
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+ cntl->http_response().set_content_type("application/json");
+
+ // Build error response
+ rapidjson::Document error_doc;
+ error_doc.SetObject();
+ rapidjson::Document::AllocatorType& allocator =
error_doc.GetAllocator();
+ error_doc.AddMember("status", "error", allocator);
+ error_doc.AddMember("message", rapidjson::Value(e.what(),
allocator), allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ error_doc.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ }
+ }
+
+ /**
+ * Get job status
+ *
+ * Return detailed job status information based on job ID
+ * Optional parameter 'files' is used to limit the number of file records
returned
+ */
+ void get_job_status(google::protobuf::RpcController* cntl_base,
+ const microbench::HttpRequest* request,
microbench::HttpResponse* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+ std::string job_id = cntl->http_request().unresolved_path();
+ const std::string* files_value =
cntl->http_request().uri().GetQuery("files");
+ size_t max_files = 1000; // Set maximum file record limit
+
+ if (files_value != nullptr) {
+ try {
+ max_files = std::stoi(*files_value);
+ } catch (const std::exception& e) {
+ LOG(WARNING) << "Invalid files parameter: " << *files_value
+ << ", using default, error: " << e.what();
+ }
+ }
+
+ LOG(INFO) << "Received get_job_status request for job " << job_id
+ << ", max_files=" << max_files;
+
+ try {
+ const Job& job = _job_manager.get_job_status(job_id);
+
+ // Set response headers
+ cntl->http_response().set_content_type("application/json");
+
+ // Build JSON response
+ rapidjson::Document d;
+ d.SetObject();
+ rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+
+ d.AddMember("job_id", rapidjson::Value(job.job_id.c_str(),
allocator), allocator);
+ d.AddMember("status",
+
rapidjson::Value(get_status_string(job.status).c_str(), allocator),
+ allocator);
+
+ // Add time information
+ add_time_info(d, allocator, job);
+
+ // Add error information (if any)
+ if (!job.error_message.empty()) {
+ d.AddMember("error_message",
rapidjson::Value(job.error_message.c_str(), allocator),
+ allocator);
+ }
+
+ // Add configuration information
+ add_config_info(d, allocator, job.config);
+
+ // Add statistics information
+ add_stats_info(d, allocator, job.stats);
+
+ // Add file records (if requested)
+ if (files_value) {
+ add_file_records(d, allocator, job.file_records, max_files);
+ }
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ d.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error getting job status: " << e.what();
+
+ // Set error status code and response
+ cntl->http_response().set_status_code(brpc::HTTP_STATUS_NOT_FOUND);
+ cntl->http_response().set_content_type("application/json");
+
+ // Build error response
+ rapidjson::Document error_doc;
+ error_doc.SetObject();
+ rapidjson::Document::AllocatorType& allocator =
error_doc.GetAllocator();
+ error_doc.AddMember("status", "error", allocator);
+ error_doc.AddMember("message", "Job not found", allocator);
+ error_doc.AddMember("exception", rapidjson::Value(e.what(),
allocator), allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ error_doc.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ }
+ }
+
+ /**
+ * List all jobs
+ *
+ * Return a list of basic information for all jobs
+ */
+ void list_jobs(google::protobuf::RpcController* cntl_base,
+ const microbench::HttpRequest* request,
microbench::HttpResponse* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+ LOG(INFO) << "Received list_jobs request";
+
+ try {
+ std::vector<std::shared_ptr<Job>> jobs = _job_manager.list_jobs();
+
+ // Set response headers
+ cntl->http_response().set_content_type("application/json");
+
+ // Build JSON response
+ rapidjson::Document d;
+ d.SetObject();
+ rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+
+ rapidjson::Value jobs_array(rapidjson::kArrayType);
+ for (const auto& job : jobs) {
+ rapidjson::Value job_obj(rapidjson::kObjectType);
+ job_obj.AddMember("job_id",
rapidjson::Value(job->job_id.c_str(), allocator),
+ allocator);
+ job_obj.AddMember(
+ "status",
+
rapidjson::Value(get_status_string(job->status).c_str(), allocator),
+ allocator);
+
+ // Add creation time
+ auto create_time_t =
std::chrono::system_clock::to_time_t(job->create_time);
+ std::string create_time_str = std::ctime(&create_time_t);
+ if (!create_time_str.empty() && create_time_str.back() ==
'\n') {
+ create_time_str.pop_back(); // Remove trailing newline
character
+ }
+ job_obj.AddMember("create_time",
+ rapidjson::Value(create_time_str.c_str(),
allocator), allocator);
+
+ // Add file prefix
+ job_obj.AddMember("file_prefix",
+
rapidjson::Value(job->config.file_prefix.c_str(), allocator),
+ allocator);
+
+ jobs_array.PushBack(job_obj, allocator);
+ }
+
+ d.AddMember("jobs", jobs_array, allocator);
+ d.AddMember("total",
rapidjson::Value(static_cast<int>(jobs.size())), allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ d.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error listing jobs: " << e.what();
+
+ // Set error status code and response
+
cntl->http_response().set_status_code(brpc::HTTP_STATUS_INTERNAL_SERVER_ERROR);
+ cntl->http_response().set_content_type("application/json");
+
+ // Build error response
+ rapidjson::Document error_doc;
+ error_doc.SetObject();
+ rapidjson::Document::AllocatorType& allocator =
error_doc.GetAllocator();
+ error_doc.AddMember("status", "error", allocator);
+ error_doc.AddMember("message", rapidjson::Value(e.what(),
allocator), allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ error_doc.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ }
+ }
+
+ /**
+ * Cancel a job
+ *
+ * Attempt to cancel the specified job (currently not implemented)
+ */
+ void cancel_job(google::protobuf::RpcController* cntl_base,
+ const microbench::HttpRequest* request,
microbench::HttpResponse* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+ std::string job_id = cntl->http_request().unresolved_path();
+ LOG(INFO) << "Received cancel_job request for job " << job_id;
+
+ // Set response headers
+ cntl->http_response().set_content_type("application/json");
+
cntl->http_response().set_status_code(brpc::HTTP_STATUS_NOT_IMPLEMENTED);
+
+ // Build response
+ rapidjson::Document d;
+ d.SetObject();
+ rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+ d.AddMember("status", "error", allocator);
+ d.AddMember("message", "Job cancellation not implemented", allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ d.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ }
+
+ /**
+ * Get help information
+ *
+ * Return usage instructions for the tool
+ */
+ void get_help(google::protobuf::RpcController* cntl_base,
+ const microbench::HttpRequest* request,
microbench::HttpResponse* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+ LOG(INFO) << "Received get_help request";
+
+ // Get usage help information
+ std::string help_info = get_usage("Doris Microbench Tool");
+
+ // Return help information
+ cntl->response_attachment().append(help_info);
+ }
+
+ /**
+ * Clear file cache
+ *
+ * Clear file cache for the specified path or all caches
+ */
+ void file_cache_clear(google::protobuf::RpcController* cntl_base,
+ const microbench::HttpRequest* request,
+ microbench::HttpResponse* response,
google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+ const std::string* sync_str =
cntl->http_request().uri().GetQuery("sync");
+ const std::string* segment_path =
cntl->http_request().uri().GetQuery("segment_path");
+
+ LOG(INFO) << "Received file_cache_clear request, sync=" << (sync_str ?
*sync_str : "")
+ << ", segment_path=" << (segment_path ? *segment_path : "");
+
+ try {
+ bool sync = sync_str ? (doris::to_lower(*sync_str) == "true") :
false;
+
+ if (segment_path == nullptr) {
+ // Clear all caches
+ FileCacheFactory::instance()->clear_file_caches(sync);
+ LOG(INFO) << "Cleared all file caches, sync=" << sync;
+ } else {
+ // Clear cache for specific path
+ doris::io::UInt128Wrapper hash =
doris::io::BlockFileCache::hash(*segment_path);
+ doris::io::BlockFileCache* cache =
FileCacheFactory::instance()->get_by_path(hash);
+ if (cache) {
+ cache->remove_if_cached(hash);
+ LOG(INFO) << "Cleared cache for path: " << *segment_path;
+ } else {
+ LOG(WARNING) << "No cache found for path: " <<
*segment_path;
+ }
+ }
+
+ // Set response headers
+ cntl->http_response().set_content_type("application/json");
+
+ // Build success response
+ rapidjson::Document d;
+ d.SetObject();
+ rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+ d.AddMember("status", "OK", allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ d.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error clearing file cache: " << e.what();
+
+ // Set error status code and response
+
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+ cntl->http_response().set_content_type("application/json");
+
+ // Build error response
+ rapidjson::Document error_doc;
+ error_doc.SetObject();
+ rapidjson::Document::AllocatorType& allocator =
error_doc.GetAllocator();
+ error_doc.AddMember("status", "error", allocator);
+ error_doc.AddMember("message", rapidjson::Value(e.what(),
allocator), allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ error_doc.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ }
+ }
+
+ /**
+ * Reset file cache
+ *
+ * Reset file cache for the specified path or all caches
+ */
+ void file_cache_reset(google::protobuf::RpcController* cntl_base,
+ const microbench::HttpRequest* request,
+ microbench::HttpResponse* response,
google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ LOG(INFO) << "Received file_cache_reset request";
+
+ try {
+ const std::string* capacity_str =
cntl->http_request().uri().GetQuery("capacity");
+ int64_t new_capacity = 0;
+ new_capacity = std::stoll(*capacity_str);
+ if (new_capacity <= 0) {
+ LOG(ERROR) << "Invalid capacity: " << (capacity_str ?
*capacity_str : "null");
+ throw std::runtime_error("Invalid capacity");
+ }
+ const std::string* path_str =
cntl->http_request().uri().GetQuery("path");
+ if (path_str == nullptr) {
+ LOG(ERROR) << "Path is empty";
+ throw std::runtime_error("Path is empty");
+ }
+ std::string path = *path_str;
+ auto ret = FileCacheFactory::instance()->reset_capacity(path,
new_capacity);
+ LOG(INFO) << "Reset capacity for path: " << path << ", new
capacity: " << new_capacity
+ << ", result: " << ret;
+
+ // Set response headers
+ cntl->http_response().set_content_type("application/json");
+
+ // Build success response
+ rapidjson::Document d;
+ d.SetObject();
+ rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+ d.AddMember("status", "OK", allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ d.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error resetting file cache: " << e.what();
+
+ // Set error status code and response
+
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+ cntl->http_response().set_content_type("application/json");
+
+ // Build error response
+ rapidjson::Document error_doc;
+ error_doc.SetObject();
+ rapidjson::Document::AllocatorType& allocator =
error_doc.GetAllocator();
+ error_doc.AddMember("status", "error", allocator);
+ error_doc.AddMember("message", rapidjson::Value(e.what(),
allocator), allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ error_doc.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ }
+ }
+
+ /**
+ * Release file cache
+ *
+ * Release file cache for the specified path or all caches
+ */
+ void file_cache_release(google::protobuf::RpcController* cntl_base,
+ const microbench::HttpRequest* request,
+ microbench::HttpResponse* response,
google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ LOG(INFO) << "Received file_cache_release request";
+
+ try {
+ const std::string* base_path_str =
cntl->http_request().uri().GetQuery("base_path");
+ size_t released = 0;
+ if (base_path_str == nullptr) {
+ released = FileCacheFactory::instance()->try_release();
+ } else {
+ released =
FileCacheFactory::instance()->try_release(*base_path_str);
+ }
+ LOG(INFO) << "Released file caches: " << released
+ << " path: " << (base_path_str ? *base_path_str :
"null");
+
+ // Set response headers
+ cntl->http_response().set_content_type("application/json");
+
+ // Build success response
+ rapidjson::Document d;
+ d.SetObject();
+ rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+ d.AddMember("status", "OK", allocator);
+ d.AddMember("released_elements", released, allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ d.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error releasing file cache: " << e.what();
+
+ // Set error status code and response
+
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+ cntl->http_response().set_content_type("application/json");
+
+ // Build error response
+ rapidjson::Document error_doc;
+ error_doc.SetObject();
+ rapidjson::Document::AllocatorType& allocator =
error_doc.GetAllocator();
+ error_doc.AddMember("status", "error", allocator);
+ error_doc.AddMember("message", rapidjson::Value(e.what(),
allocator), allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ error_doc.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ }
+ }
+
+ /**
+ * Update configuration
+ */
+ void update_config(google::protobuf::RpcController* cntl_base,
+ const microbench::HttpRequest* request,
microbench::HttpResponse* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ LOG(INFO) << "Received update_config request";
+
+ try {
+ bool need_persist = false;
+ const std::string* persist_str =
cntl->http_request().uri().GetQuery("persist");
+ if (persist_str && *persist_str == "true") {
+ need_persist = true;
+ }
+ cntl->http_request().uri().RemoveQuery("persist");
+ std::string key = "";
+ std::string value = "";
+ for (brpc::URI::QueryIterator it =
cntl->http_request().uri().QueryBegin();
+ it != cntl->http_request().uri().QueryEnd(); ++it) {
+ key = it->first;
+ value = it->second;
+ auto s = doris::config::set_config(key, value, need_persist);
+ if (s.ok()) {
+ LOG(INFO) << "set_config " << key << "=" << value
+ << " success. persist: " << need_persist;
+ } else {
+ LOG(WARNING) << "set_config " << key << "=" << value << "
failed";
+ }
+ // just support update one config
+ break;
+ }
+
+ // Set response headers
+ cntl->http_response().set_content_type("application/json");
+
+ // Build success response
+ rapidjson::Document d;
+ d.SetObject();
+ rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+ d.AddMember("status", "OK", allocator);
+ d.AddMember(rapidjson::Value(key.c_str(), allocator),
+ rapidjson::Value(value.c_str(), allocator), allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ d.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error updating config: " << e.what();
+
+ // Set error status code and response
+
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+ cntl->http_response().set_content_type("application/json");
+
+ // Build error response
+ rapidjson::Document error_doc;
+ error_doc.SetObject();
+ rapidjson::Document::AllocatorType& allocator =
error_doc.GetAllocator();
+ error_doc.AddMember("status", "error", allocator);
+ error_doc.AddMember("message", rapidjson::Value(e.what(),
allocator), allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ error_doc.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ }
+ }
+
+ /**
+ * Show configuration
+ */
+ void show_config(google::protobuf::RpcController* cntl_base,
+ const microbench::HttpRequest* request,
microbench::HttpResponse* response,
+ google::protobuf::Closure* done) {
+ brpc::ClosureGuard done_guard(done);
+ brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+ LOG(INFO) << "Received show_config request";
+
+ try {
+ std::vector<std::vector<std::string>> config_info =
doris::config::get_config_info();
+ rapidjson::Document d;
+ d.SetObject();
+ rapidjson::Document::AllocatorType& allocator = d.GetAllocator();
+
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+
+ // Write config array
+ writer.StartArray();
+ const std::string* conf_item_str =
cntl->http_request().uri().GetQuery("conf_item");
+ std::string conf_item = conf_item_str ? *conf_item_str : "";
+ for (const auto& _config : config_info) {
+ if (!conf_item.empty()) {
+ if (_config[0] == conf_item) {
+ writer.StartArray();
+ for (const std::string& config_filed : _config) {
+ writer.String(config_filed.c_str());
+ }
+ writer.EndArray();
+ break;
+ }
+ } else {
+ writer.StartArray();
+ for (const std::string& config_filed : _config) {
+ writer.String(config_filed.c_str());
+ }
+ writer.EndArray();
+ }
+ }
+ writer.EndArray();
+
+ // Set response headers
+ cntl->http_response().set_content_type("application/json");
+
+ // Build success response
+ d.AddMember("status", "OK", allocator);
+ d.AddMember("config", rapidjson::Value(buffer.GetString(),
allocator), allocator);
+
+ buffer.Clear();
+ d.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error showing config: " << e.what();
+
+ // Set error status code and response
+
cntl->http_response().set_status_code(brpc::HTTP_STATUS_BAD_REQUEST);
+ cntl->http_response().set_content_type("application/json");
+
+ // Build error response
+ rapidjson::Document error_doc;
+ error_doc.SetObject();
+ rapidjson::Document::AllocatorType& allocator =
error_doc.GetAllocator();
+ error_doc.AddMember("status", "error", allocator);
+ error_doc.AddMember("message", rapidjson::Value(e.what(),
allocator), allocator);
+
+ // Serialize to string
+ rapidjson::StringBuffer buffer;
+ rapidjson::Writer<rapidjson::StringBuffer> writer(buffer);
+ error_doc.Accept(writer);
+
+ cntl->response_attachment().append(buffer.GetString());
+ }
+ }
+
+private:
+ // Get string representation of job status
+ std::string get_status_string(JobStatus status) {
+ switch (status) {
+ case JobStatus::PENDING:
+ return "PENDING";
+ case JobStatus::RUNNING:
+ return "RUNNING";
+ case JobStatus::COMPLETED:
+ return "COMPLETED";
+ case JobStatus::FAILED:
+ return "FAILED";
+ default:
+ return "UNKNOWN";
+ }
+ }
+
+ // Add time information to JSON response
+ void add_time_info(rapidjson::Document& doc,
rapidjson::Document::AllocatorType& allocator,
+ const Job& job) {
+ // Add creation time
+ auto create_time_t =
std::chrono::system_clock::to_time_t(job.create_time);
+ std::string create_time_str = std::ctime(&create_time_t);
+ if (!create_time_str.empty() && create_time_str.back() == '\n') {
+ create_time_str.pop_back(); // Remove trailing newline character
+ }
+ doc.AddMember("create_time", rapidjson::Value(create_time_str.c_str(),
allocator),
+ allocator);
+
+ // Add start time (if available)
+ if (job.status != JobStatus::PENDING) {
+ auto start_time_t =
std::chrono::system_clock::to_time_t(job.start_time);
+ std::string start_time_str = std::ctime(&start_time_t);
+ if (!start_time_str.empty() && start_time_str.back() == '\n') {
+ start_time_str.pop_back();
+ }
+ doc.AddMember("start_time",
rapidjson::Value(start_time_str.c_str(), allocator),
+ allocator);
+ }
+
+ // Add end time (if available)
+ if (job.status == JobStatus::COMPLETED || job.status ==
JobStatus::FAILED) {
+ auto end_time_t =
std::chrono::system_clock::to_time_t(job.end_time);
+ std::string end_time_str = std::ctime(&end_time_t);
+ if (!end_time_str.empty() && end_time_str.back() == '\n') {
+ end_time_str.pop_back();
+ }
+ doc.AddMember("end_time", rapidjson::Value(end_time_str.c_str(),
allocator), allocator);
+
+ // Calculate duration of the run
+ auto duration =
+
std::chrono::duration_cast<std::chrono::seconds>(job.end_time - job.start_time)
+ .count();
+ doc.AddMember("duration_seconds", duration, allocator);
+ }
+ }
+
+ // Add configuration information to JSON response
+ void add_config_info(rapidjson::Document& doc,
rapidjson::Document::AllocatorType& allocator,
+ const JobConfig& config) {
+ rapidjson::Value config_obj(rapidjson::kObjectType);
+
+ config_obj.AddMember("size_bytes_perfile", config.size_bytes_perfile,
allocator);
+ config_obj.AddMember("write_iops", config.write_iops, allocator);
+ config_obj.AddMember("read_iops", config.read_iops, allocator);
+ config_obj.AddMember("num_threads", config.num_threads, allocator);
+ config_obj.AddMember("num_files", config.num_files, allocator);
+ config_obj.AddMember("file_prefix",
rapidjson::Value(config.file_prefix.c_str(), allocator),
+ allocator);
+ config_obj.AddMember("cache_type",
rapidjson::Value(config.cache_type.c_str(), allocator),
+ allocator);
+ config_obj.AddMember("expiration", config.expiration, allocator);
+ config_obj.AddMember("repeat", config.repeat, allocator);
+ config_obj.AddMember("write_batch_size", config.write_batch_size,
allocator);
+ config_obj.AddMember("write_file_cache", config.write_file_cache,
allocator);
+ config_obj.AddMember("bvar_enable", config.bvar_enable, allocator);
+
+ // Add read offset (if applicable)
+ if (config.read_iops > 0) {
+ rapidjson::Value read_offset_array(rapidjson::kArrayType);
+ read_offset_array.PushBack(config.read_offset_left, allocator);
+ read_offset_array.PushBack(config.read_offset_right, allocator);
+ config_obj.AddMember("read_offset", read_offset_array, allocator);
+
+ rapidjson::Value read_length_array(rapidjson::kArrayType);
+ read_length_array.PushBack(config.read_length_left, allocator);
+ read_length_array.PushBack(config.read_length_right, allocator);
+ config_obj.AddMember("read_length", read_length_array, allocator);
+ }
+
+ doc.AddMember("config", config_obj, allocator);
+ }
+
+ // Add statistics information to JSON response
+ void add_stats_info(rapidjson::Document& doc,
rapidjson::Document::AllocatorType& allocator,
+ const Job::Statistics& stats) {
+ rapidjson::Value stats_obj(rapidjson::kObjectType);
+
+ stats_obj.AddMember("total_write_time",
+ rapidjson::Value(stats.total_write_time.c_str(),
allocator), allocator);
+ stats_obj.AddMember("total_read_time",
+ rapidjson::Value(stats.total_read_time.c_str(),
allocator), allocator);
+
+ // struct FileCacheStatistics
+ stats_obj.AddMember("num_local_io_total",
static_cast<uint64_t>(stats.num_local_io_total),
+ allocator);
+ stats_obj.AddMember("num_remote_io_total",
static_cast<uint64_t>(stats.num_remote_io_total),
+ allocator);
+ stats_obj.AddMember("num_inverted_index_remote_io_total",
+
static_cast<uint64_t>(stats.num_inverted_index_remote_io_total),
+ allocator);
+ stats_obj.AddMember("local_io_timer",
static_cast<uint64_t>(stats.local_io_timer),
+ allocator);
+ stats_obj.AddMember("bytes_read_from_local",
+
static_cast<uint64_t>(stats.bytes_read_from_local), allocator);
+ stats_obj.AddMember("bytes_read_from_remote",
+
static_cast<uint64_t>(stats.bytes_read_from_remote), allocator);
+ stats_obj.AddMember("remote_io_timer",
static_cast<uint64_t>(stats.remote_io_timer),
+ allocator);
+ stats_obj.AddMember("write_cache_io_timer",
+ static_cast<uint64_t>(stats.write_cache_io_timer),
allocator);
+ stats_obj.AddMember("bytes_write_into_cache",
+
static_cast<uint64_t>(stats.bytes_write_into_cache), allocator);
+ stats_obj.AddMember("num_skip_cache_io_total",
+
static_cast<uint64_t>(stats.num_skip_cache_io_total), allocator);
+ stats_obj.AddMember("read_cache_file_directly_timer",
+
static_cast<uint64_t>(stats.read_cache_file_directly_timer), allocator);
+ stats_obj.AddMember("cache_get_or_set_timer",
+
static_cast<uint64_t>(stats.cache_get_or_set_timer), allocator);
+ stats_obj.AddMember("lock_wait_timer",
static_cast<uint64_t>(stats.lock_wait_timer),
+ allocator);
+ stats_obj.AddMember("get_timer",
static_cast<uint64_t>(stats.get_timer), allocator);
+ stats_obj.AddMember("set_timer",
static_cast<uint64_t>(stats.set_timer), allocator);
+
+ doc.AddMember("statistics", stats_obj, allocator);
+ }
+
+ // Add file records to JSON response
+ void add_file_records(rapidjson::Document& doc,
rapidjson::Document::AllocatorType& allocator,
+ const std::vector<FileInfo>& file_records, size_t
max_files) {
+ rapidjson::Value files_array(rapidjson::kArrayType);
+ size_t count = 0;
+
+ for (const auto& file_info : file_records) {
+ if (count >= max_files) {
+ break; // Stop adding if max limit is reached
+ }
+ rapidjson::Value file_obj(rapidjson::kObjectType);
+ file_obj.AddMember("filename",
rapidjson::Value(file_info.filename.c_str(), allocator),
+ allocator);
+ file_obj.AddMember("data_size",
static_cast<uint64_t>(file_info.data_size), allocator);
+ file_obj.AddMember("job_id",
rapidjson::Value(file_info.job_id.c_str(), allocator),
+ allocator);
+ files_array.PushBack(file_obj, allocator);
+ count++;
+ }
+
+ doc.AddMember("file_records", files_array, allocator);
+ doc.AddMember("file_records_count", static_cast<uint64_t>(count),
allocator);
+ doc.AddMember("file_records_total",
static_cast<uint64_t>(file_records.size()), allocator);
+ }
+
+ JobManager& _job_manager;
+};
+} // namespace microbenchService
+
+// HTTP server handling
+class HttpServer {
+public:
+ HttpServer(JobManager& job_manager) : _job_manager(job_manager),
_server(nullptr) {}
+
+ void start() {
+ _server = new brpc::Server();
+ microbenchService::MicrobenchServiceImpl http_svc(_job_manager);
+
+ LOG(INFO) << "Starting HTTP server on port " << FLAGS_port;
+
+ if (_server->AddService(&http_svc, brpc::SERVER_DOESNT_OWN_SERVICE) !=
0) {
+ LOG(ERROR) << "Failed to add http service";
+ return;
+ }
+
+ brpc::ServerOptions options;
+ if (_server->Start(FLAGS_port, &options) != 0) {
+ LOG(ERROR) << "Failed to start HttpServer";
+ return;
+ }
+
+ LOG(INFO) << "HTTP server started successfully";
+ _server->RunUntilAskedToQuit(); // Wait for signals
+ _server->ClearServices();
+
+ LOG(INFO) << "HTTP server stopped";
+ }
+
+ ~HttpServer() {
+ if (_server) {
+ LOG(INFO) << "Cleaning up HTTP server in destructor";
+ delete _server;
+ }
+ }
+
+private:
+ JobManager& _job_manager;
+ brpc::Server* _server;
+};
+
+void init_exec_env() {
+ auto* exec_env = doris::ExecEnv::GetInstance();
+
static_cast<void>(doris::ThreadPoolBuilder("MicrobenchS3FileUploadThreadPool")
+ .set_min_threads(256)
+ .set_max_threads(512)
+
.build(&(exec_env->_s3_file_upload_thread_pool)));
+ exec_env->_file_cache_factory = new FileCacheFactory();
+ std::vector<doris::CachePath> cache_paths;
+ exec_env->init_file_cache_factory(cache_paths);
+ exec_env->_file_cache_open_fd_cache =
std::make_unique<doris::io::FDCache>();
+}
+
+int main(int argc, char* argv[]) {
+ google::ParseCommandLineFlags(&argc, &argv, true);
+ FLAGS_minloglevel = google::GLOG_INFO;
+ FLAGS_log_dir = "./logs";
+ FLAGS_logbufsecs = 0; // Disable buffering, write immediately
+ std::filesystem::path log_dir(FLAGS_log_dir);
+ if (!std::filesystem::exists(log_dir)) {
+ std::filesystem::create_directories(log_dir);
+ LOG(INFO) << "Log directory created successfully: " <<
log_dir.string();
+ } else {
+ LOG(INFO) << "Log directory already exists: " << log_dir.string();
+ }
+ google::InitGoogleLogging(argv[0]);
+
+ if (-1 == setenv("DORIS_HOME", ".", 0)) {
+ LOG(WARNING) << "set DORIS_HOME error";
+ }
+ const char* doris_home = getenv("DORIS_HOME");
+ if (doris_home == nullptr) {
+ LOG(INFO) << "DORIS_HOME environment variable not set";
+ }
+ LOG(INFO) << "env=" << doris_home;
+ std::string conffile = std::string(doris_home) + "/conf/be.conf";
+ if (!doris::config::init(conffile.c_str(), true, true, true)) {
+ LOG(ERROR) << "Error reading config file";
+ return -1;
+ }
+ std::string custom_conffile = doris::config::custom_config_dir +
"/be_custom.conf";
+ if (!doris::config::init(custom_conffile.c_str(), true, false, false)) {
+ LOG(ERROR) << "Error reading custom config file";
+ return -1;
+ }
+
+ LOG(INFO) << "Obj config. ak=" << doris::config::test_s3_ak
+ << " sk=" << doris::config::test_s3_sk << " region=" <<
doris::config::test_s3_region
+ << " endpoint=" << doris::config::test_s3_endpoint
+ << " bucket=" << doris::config::test_s3_bucket;
+ LOG(INFO) << "File cache config. enable_file_cache=" <<
doris::config::enable_file_cache
+ << " file_cache_path=" << doris::config::file_cache_path
+ << " file_cache_each_block_size=" <<
doris::config::file_cache_each_block_size
+ << " clear_file_cache=" << doris::config::clear_file_cache
+ << " enable_file_cache_query_limit=" <<
doris::config::enable_file_cache_query_limit
+ << " file_cache_enter_disk_resource_limit_mode_percent="
+ <<
doris::config::file_cache_enter_disk_resource_limit_mode_percent
+ << " file_cache_exit_disk_resource_limit_mode_percent="
+ <<
doris::config::file_cache_exit_disk_resource_limit_mode_percent
+ << " enable_read_cache_file_directly="
+ << doris::config::enable_read_cache_file_directly
+ << " file_cache_enable_evict_from_other_queue_by_size="
+ <<
doris::config::file_cache_enable_evict_from_other_queue_by_size
+ << " file_cache_error_log_limit_bytes="
+ << doris::config::file_cache_error_log_limit_bytes
+ << " cache_lock_wait_long_tail_threshold_us="
+ << doris::config::cache_lock_wait_long_tail_threshold_us
+ << " cache_lock_held_long_tail_threshold_us="
+ << doris::config::cache_lock_held_long_tail_threshold_us
+ << " file_cache_remove_block_qps_limit="
+ << doris::config::file_cache_remove_block_qps_limit
+ << " enable_evict_file_cache_in_advance="
+ << doris::config::enable_evict_file_cache_in_advance
+ << " file_cache_enter_need_evict_cache_in_advance_percent="
+ <<
doris::config::file_cache_enter_need_evict_cache_in_advance_percent
+ << " file_cache_exit_need_evict_cache_in_advance_percent="
+ <<
doris::config::file_cache_exit_need_evict_cache_in_advance_percent
+ << " file_cache_evict_in_advance_interval_ms="
+ << doris::config::file_cache_evict_in_advance_interval_ms
+ << " file_cache_evict_in_advance_batch_bytes="
+ << doris::config::file_cache_evict_in_advance_batch_bytes;
+ LOG(INFO) << "S3 writer config. s3_file_writer_log_interval_second="
+ << doris::config::s3_file_writer_log_interval_second
+ << " s3_write_buffer_size=" <<
doris::config::s3_write_buffer_size
+ << " enable_flush_file_cache_async=" <<
doris::config::enable_flush_file_cache_async;
+
+ init_exec_env();
+ JobManager job_manager;
+
+ std::thread periodiccally_log_thread;
+ std::mutex periodiccally_log_thread_lock;
+ std::condition_variable periodiccally_log_thread_cv;
+ std::atomic_bool periodiccally_log_thread_run = true;
+ auto periodiccally_log = [&]() {
+ while (periodiccally_log_thread_run) {
+ std::unique_lock<std::mutex> lck {periodiccally_log_thread_lock};
+ periodiccally_log_thread_cv.wait_for(lck,
std::chrono::milliseconds(5000));
+ LOG(INFO) << "Periodically log for file cache microbench";
+ }
+ };
+ periodiccally_log_thread = std::thread {periodiccally_log};
+
+ try {
+ HttpServer http_server(job_manager);
+ http_server.start();
+ } catch (const std::exception& e) {
+ LOG(ERROR) << "Error in HTTP server: " << e.what();
+ }
+
+ if (periodiccally_log_thread.joinable()) {
+ {
+ std::unique_lock<std::mutex> lck {periodiccally_log_thread_lock};
+ periodiccally_log_thread_run = false;
+ // immediately notify the log thread to quickly exit in case it
block the
+ // whole procedure
+ periodiccally_log_thread_cv.notify_all();
+ }
+ periodiccally_log_thread.join();
+ }
+ LOG(INFO) << "Program exiting normally";
+ return 0;
+}
+#endif
\ No newline at end of file
diff --git a/be/src/io/tools/proto/Makefile b/be/src/io/tools/proto/Makefile
new file mode 100644
index 00000000000..affed0ef22f
--- /dev/null
+++ b/be/src/io/tools/proto/Makefile
@@ -0,0 +1,38 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# This file compile all protobuf files.
+
+BUILD_DIR = ${CURDIR}/../build/proto
+PROTOC = ${DORIS_THIRDPARTY}/installed/bin/protoc
+
+SOURCES = $(wildcard *.proto)
+OBJECTS = $(patsubst %.proto, ${BUILD_DIR}/%.pb.cc, ${SOURCES})
+HEADERS = $(patsubst %.proto, ${BUILD_DIR}/%.pb.h, ${SOURCES})
+
+all: prepare ${OBJECTS} ${HEADERS}
+.PHONY: all prepare
+
+prepare:
+ mkdir -p ${BUILD_DIR}
+
+${BUILD_DIR}/%.pb.h ${BUILD_DIR}/%.pb.cc: %.proto
+ ${PROTOC} --proto_path=. --cpp_out=${BUILD_DIR} $<
+
+clean:
+ rm -rf ${BUILD_DIR}
+.PHONY: clean
\ No newline at end of file
diff --git a/be/src/io/tools/proto/microbench.proto
b/be/src/io/tools/proto/microbench.proto
new file mode 100644
index 00000000000..ecd69b4096d
--- /dev/null
+++ b/be/src/io/tools/proto/microbench.proto
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+syntax="proto2";
+
+package microbench;
+
+option cc_generic_services = true;
+
+
+message HttpRequest {};
+message HttpResponse {};
+
+service MicrobenchService {
+ rpc get_job_status(HttpRequest) returns (HttpResponse);
+ rpc submit_job(HttpRequest) returns (HttpResponse);
+ rpc list_jobs(HttpRequest) returns (HttpResponse);
+ rpc cancel_job(HttpRequest) returns (HttpResponse);
+ rpc get_help(HttpRequest) returns (HttpResponse);
+ rpc file_cache_clear(HttpRequest) returns (HttpResponse);
+ rpc file_cache_reset(HttpRequest) returns (HttpResponse);
+ rpc file_cache_release(HttpRequest) returns (HttpResponse);
+ rpc update_config(HttpRequest) returns (HttpResponse);
+ rpc show_config(HttpRequest) returns (HttpResponse);
+};
diff --git a/be/src/io/tools/readme.md b/be/src/io/tools/readme.md
new file mode 100644
index 00000000000..a5804958da0
--- /dev/null
+++ b/be/src/io/tools/readme.md
@@ -0,0 +1,133 @@
+# File Cache Microbenchmark
+
+## Compilation
+
+To compile the project, run the following command:
+
+```bash
+./build.sh --clean --file-cache-microbench --be
+```
+
+This will generate the `file_cache_microbench` executable in the
`apache_doris/output/be/lib` directory.
+
+## Usage
+
+1. Create a deployment directory:
+ ```bash
+ mkdir {deploy_dir}
+ ```
+
+2. Create a configuration directory:
+ ```bash
+ mkdir {deploy_dir}/conf
+ ```
+
+3. Copy the executable to the deployment directory:
+ ```bash
+ cp -r apache_doris/output/be/lib/file_cache_microbench {deploy_dir}
+ ```
+
+4. Copy the configuration file to the configuration directory:
+ ```bash
+ cp -r apache_doris/output/be/conf/be.conf {deploy_dir}/conf
+ ```
+
+5. Edit the configuration file `{deploy_dir}/conf/be.conf` and add the
following configuration information:
+ ```ini
+ enable_file_cache=true
+ file_cache_path = [ {"path": "/mnt/disk2/file_cache",
"total_size":53687091200, "query_limit": 10737418240}]
+ test_s3_resource = "resource"
+ test_s3_ak = "ak"
+ test_s3_sk = "sk"
+ test_s3_endpoint = "endpoint"
+ test_s3_region = "region"
+ test_s3_bucket = "bucket"
+ test_s3_prefix = "prefix"
+ ```
+
+6. Change to the deployment directory:
+ ```bash
+ cd {deploy_dir}
+ ```
+
+7. Run the microbenchmark:
+ ```bash
+ ./file_cache_microbench --port={test_port}
+ ```
+
+8. Access the variables:
+ ```bash
+ bvar http://${ip}:${port}/vars/
+ ```
+
+9. Check the logs in `{deploy_dir}/log/`.
+
+## API
+
+### get_help
+```
+curl "http://localhost:{port}/MicrobenchService/get_help"
+```
+
+#### Endpoints:
+- **GET /get_job_status/<job_id>**
+ - Retrieve the status of a submitted job.
+ - Parameters:
+ - `job_id`: The ID of the job to retrieve status for.
+ - `files` (optional): If provided, returns the associated file records for
the job.
+ - Example: `/get_job_status/job_id?files=10`
+
+- **GET /list_jobs**
+ - List all submitted jobs and their statuses.
+
+- **GET /get_help**
+ - Get this help information.
+
+- **GET /file_cache_clear**
+ - Clear the file cache with the following query parameters:
+ ```json
+ {
+ "sync": <true|false>, // Whether to synchronize the cache
clear operation
+ "segment_path": "<path>" // Optional path of the segment to
clear from the cache
+ }
+ ```
+ If `segment_path` is not provided, all caches will be cleared based on the
`sync` parameter.
+
+- **GET /file_cache_reset**
+ - Reset the file cache with the following query parameters:
+ ```json
+ {
+ "capacity": <new_capacity>, // New capacity for the specified
path
+ "path": "<path>" // Path of the segment to reset
+ }
+ ```
+
+- **GET /file_cache_release**
+ - Release the file cache with the following query parameters:
+ ```json
+ {
+ "base_path": "<base_path>" // Optional base path to release
specific caches
+ }
+ ```
+
+- **GET /update_config**
+ - Update the configuration with the following JSON body:
+ ```json
+ {
+ "config_key": "<key>", // The configuration key to update
+ "config_value": "<value>", // The new value for the
configuration key
+ "persist": <true|false> // Whether to persist the
configuration change
+ }
+ ```
+
+- **GET /show_config**
+ - Retrieve the current configuration settings.
+
+### Notes:
+- Ensure that the S3 configuration is set correctly in the environment.
+- The program will create and read files in the specified S3 bucket.
+- Monitor the logs for detailed execution information and errors.
+
+### Version Information:
+you can see it in get_help return msg
+
diff --git a/build.sh b/build.sh
index 24ceed2baea..870311c109b 100755
--- a/build.sh
+++ b/build.sh
@@ -43,20 +43,21 @@ usage() {
echo "
Usage: $0 <options>
Optional options:
- [no option] build all components
- --fe build Frontend and Spark DPP application. Default
ON.
- --be build Backend. Default ON.
- --meta-tool build Backend meta tool. Default OFF.
- --cloud build Cloud. Default OFF.
- --index-tool build Backend inverted index tool. Default OFF.
- --broker build Broker. Default ON.
- --spark-dpp build Spark DPP application. Default ON.
- --hive-udf build Hive UDF library for Spark Load. Default ON.
- --be-java-extensions build Backend java extensions. Default ON.
- --be-extension-ignore build be-java-extensions package, choose which
modules to ignore. Multiple modules separated by commas.
- --clean clean and build target
- --output specify the output directory
- -j build Backend parallel
+ [no option] build all components
+ --fe build Frontend and Spark DPP application.
Default ON.
+ --be build Backend. Default ON.
+ --meta-tool build Backend meta tool. Default OFF.
+ --file-cache-microbench build Backend file cache microbench tool.
Default OFF.
+ --cloud build Cloud. Default OFF.
+ --index-tool build Backend inverted index tool. Default OFF.
+ --broker build Broker. Default ON.
+ --spark-dpp build Spark DPP application. Default ON.
+ --hive-udf build Hive UDF library for Spark Load. Default
ON.
+ --be-java-extensions build Backend java extensions. Default ON.
+ --be-extension-ignore build be-java-extensions package, choose which
modules to ignore. Multiple modules separated by commas.
+ --clean clean and build target
+ --output specify the output directory
+ -j build Backend parallel
Environment variables:
USE_AVX2 If the CPU does not support AVX2 instruction
set, please set USE_AVX2=0. Default is ON.
@@ -68,6 +69,7 @@ Usage: $0 <options>
$0 build all
$0 --be build Backend
$0 --meta-tool build Backend meta tool
+ $0 --file-cache-microbench build Backend file cache
microbench tool
$0 --cloud build Cloud
$0 --index-tool build Backend inverted index tool
$0 --fe --clean clean and build Frontend and Spark
Dpp application
@@ -128,6 +130,7 @@ if ! OPTS="$(getopt \
-l 'cloud' \
-l 'broker' \
-l 'meta-tool' \
+ -l 'file-cache-microbench' \
-l 'index-tool' \
-l 'spark-dpp' \
-l 'hive-udf' \
@@ -150,6 +153,7 @@ BUILD_BE=0
BUILD_CLOUD=0
BUILD_BROKER=0
BUILD_META_TOOL='OFF'
+BUILD_FILE_CACHE_MICROBENCH_TOOL='OFF'
BUILD_INDEX_TOOL='OFF'
BUILD_SPARK_DPP=0
BUILD_BE_JAVA_EXTENSIONS=0
@@ -169,6 +173,7 @@ if [[ "$#" == 1 ]]; then
BUILD_BROKER=1
BUILD_META_TOOL='OFF'
+ BUILD_FILE_CACHE_MICROBENCH_TOOL='OFF'
BUILD_INDEX_TOOL='OFF'
BUILD_SPARK_DPP=1
BUILD_HIVE_UDF=1
@@ -201,6 +206,10 @@ else
BUILD_META_TOOL='ON'
shift
;;
+ --file-cache-microbench)
+ BUILD_FILE_CACHE_MICROBENCH_TOOL='ON'
+ shift
+ ;;
--index-tool)
BUILD_INDEX_TOOL='ON'
shift
@@ -263,6 +272,7 @@ else
BUILD_CLOUD=1
BUILD_BROKER=1
BUILD_META_TOOL='ON'
+ BUILD_FILE_CACHE_MICROBENCH_TOOL='ON'
BUILD_INDEX_TOOL='ON'
BUILD_SPARK_DPP=1
BUILD_HIVE_UDF=1
@@ -470,32 +480,33 @@ if [[ "${BUILD_BE_JAVA_EXTENSIONS}" -eq 1 && "$(uname
-s)" == 'Darwin' ]]; then
fi
echo "Get params:
- BUILD_FE -- ${BUILD_FE}
- BUILD_BE -- ${BUILD_BE}
- BUILD_CLOUD -- ${BUILD_CLOUD}
- BUILD_BROKER -- ${BUILD_BROKER}
- BUILD_META_TOOL -- ${BUILD_META_TOOL}
- BUILD_INDEX_TOOL -- ${BUILD_INDEX_TOOL}
- BUILD_SPARK_DPP -- ${BUILD_SPARK_DPP}
- BUILD_BE_JAVA_EXTENSIONS -- ${BUILD_BE_JAVA_EXTENSIONS}
- BUILD_HIVE_UDF -- ${BUILD_HIVE_UDF}
- PARALLEL -- ${PARALLEL}
- CLEAN -- ${CLEAN}
- WITH_MYSQL -- ${WITH_MYSQL}
- GLIBC_COMPATIBILITY -- ${GLIBC_COMPATIBILITY}
- USE_AVX2 -- ${USE_AVX2}
- USE_LIBCPP -- ${USE_LIBCPP}
- USE_DWARF -- ${USE_DWARF}
- USE_UNWIND -- ${USE_UNWIND}
- STRIP_DEBUG_INFO -- ${STRIP_DEBUG_INFO}
- USE_MEM_TRACKER -- ${USE_MEM_TRACKER}
- USE_JEMALLOC -- ${USE_JEMALLOC}
- USE_BTHREAD_SCANNER -- ${USE_BTHREAD_SCANNER}
- ENABLE_INJECTION_POINT -- ${ENABLE_INJECTION_POINT}
- ENABLE_CACHE_LOCK_DEBUG -- ${ENABLE_CACHE_LOCK_DEBUG}
- DENABLE_CLANG_COVERAGE -- ${DENABLE_CLANG_COVERAGE}
- DISPLAY_BUILD_TIME -- ${DISPLAY_BUILD_TIME}
- ENABLE_PCH -- ${ENABLE_PCH}
+ BUILD_FE -- ${BUILD_FE}
+ BUILD_BE -- ${BUILD_BE}
+ BUILD_CLOUD -- ${BUILD_CLOUD}
+ BUILD_BROKER -- ${BUILD_BROKER}
+ BUILD_META_TOOL -- ${BUILD_META_TOOL}
+ BUILD_FILE_CACHE_MICROBENCH_TOOL -- ${BUILD_FILE_CACHE_MICROBENCH_TOOL}
+ BUILD_INDEX_TOOL -- ${BUILD_INDEX_TOOL}
+ BUILD_SPARK_DPP -- ${BUILD_SPARK_DPP}
+ BUILD_BE_JAVA_EXTENSIONS -- ${BUILD_BE_JAVA_EXTENSIONS}
+ BUILD_HIVE_UDF -- ${BUILD_HIVE_UDF}
+ PARALLEL -- ${PARALLEL}
+ CLEAN -- ${CLEAN}
+ WITH_MYSQL -- ${WITH_MYSQL}
+ GLIBC_COMPATIBILITY -- ${GLIBC_COMPATIBILITY}
+ USE_AVX2 -- ${USE_AVX2}
+ USE_LIBCPP -- ${USE_LIBCPP}
+ USE_DWARF -- ${USE_DWARF}
+ USE_UNWIND -- ${USE_UNWIND}
+ STRIP_DEBUG_INFO -- ${STRIP_DEBUG_INFO}
+ USE_MEM_TRACKER -- ${USE_MEM_TRACKER}
+ USE_JEMALLOC -- ${USE_JEMALLOC}
+ USE_BTHREAD_SCANNER -- ${USE_BTHREAD_SCANNER}
+ ENABLE_INJECTION_POINT -- ${ENABLE_INJECTION_POINT}
+ ENABLE_CACHE_LOCK_DEBUG -- ${ENABLE_CACHE_LOCK_DEBUG}
+ DENABLE_CLANG_COVERAGE -- ${DENABLE_CLANG_COVERAGE}
+ DISPLAY_BUILD_TIME -- ${DISPLAY_BUILD_TIME}
+ ENABLE_PCH -- ${ENABLE_PCH}
"
# Clean and build generated code
@@ -587,6 +598,7 @@ if [[ "${BUILD_BE}" -eq 1 ]]; then
-DWITH_MYSQL="${WITH_MYSQL}" \
-DUSE_LIBCPP="${USE_LIBCPP}" \
-DBUILD_META_TOOL="${BUILD_META_TOOL}" \
+
-DBUILD_FILE_CACHE_MICROBENCH_TOOL="${BUILD_FILE_CACHE_MICROBENCH_TOOL}" \
-DBUILD_INDEX_TOOL="${BUILD_INDEX_TOOL}" \
-DSTRIP_DEBUG_INFO="${STRIP_DEBUG_INFO}" \
-DUSE_DWARF="${USE_DWARF}" \
@@ -799,6 +811,10 @@ EOF
cp -r -p "${DORIS_HOME}/be/output/lib/meta_tool"
"${DORIS_OUTPUT}/be/lib"/
fi
+ if [[ "${BUILD_FILE_CACHE_MICROBENCH_TOOL}" = "ON" ]]; then
+ cp -r -p "${DORIS_HOME}/be/output/lib/file_cache_microbench"
"${DORIS_OUTPUT}/be/lib"/
+ fi
+
if [[ "${BUILD_INDEX_TOOL}" = "ON" ]]; then
cp -r -p "${DORIS_HOME}/be/output/lib/index_tool"
"${DORIS_OUTPUT}/be/lib"/
fi
diff --git a/run-be-ut.sh b/run-be-ut.sh
index 5f73f6f0ee6..78b0dc4fcce 100755
--- a/run-be-ut.sh
+++ b/run-be-ut.sh
@@ -240,6 +240,7 @@ cd "${CMAKE_BUILD_DIR}"
-DUSE_LIBCPP="${USE_LIBCPP}" \
-DBUILD_META_TOOL=OFF \
-DBUILD_BENCHMARK_TOOL="${BUILD_BENCHMARK_TOOL}" \
+ -DBUILD_FILE_CACHE_MICROBENCH_TOOL=OFF \
-DWITH_MYSQL=ON \
-DUSE_DWARF="${USE_DWARF}" \
-DUSE_UNWIND="${USE_UNWIND}" \
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]