This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new f0def4ec0aa [feature](Cloud) Try to do memory limit control for hdfs write (#34354) (#34787) f0def4ec0aa is described below commit f0def4ec0aa4b188c5071b00e8080601d1e78b24 Author: AlexYue <yj976240...@gmail.com> AuthorDate: Mon May 13 20:32:27 2024 +0800 [feature](Cloud) Try to do memory limit control for hdfs write (#34354) (#34787) --- be/src/common/config.cpp | 9 ++- be/src/common/config.h | 7 ++ be/src/io/fs/hdfs_file_writer.cpp | 138 +++++++++++++++++++++++++++++++++++++- be/src/io/fs/hdfs_file_writer.h | 3 + be/src/io/hdfs_util.cpp | 1 + be/src/io/hdfs_util.h | 1 + be/src/util/jni-util.cpp | 50 ++++++++++++++ be/src/util/jni-util.h | 3 + 8 files changed, 208 insertions(+), 4 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 3dcce044deb..c449a7e0176 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1028,7 +1028,7 @@ DEFINE_mInt64(s3_write_buffer_size, "5242880"); // The timeout config for S3 buffer allocation DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300"); DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000"); -DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "4"); // 4MB +DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "1"); // 1MB //disable shrink memory by default DEFINE_mBool(enable_shrink_memory, "false"); @@ -1217,6 +1217,13 @@ DEFINE_mBool(enable_injection_point, "false"); DEFINE_mBool(ignore_schema_change_check, "false"); +// The maximum jvm heap usage ratio for hdfs write workload +DEFINE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio, "0.5"); +// The sleep milliseconds duration when hdfs write exceeds the maximum usage +DEFINE_mInt64(hdfs_jni_write_sleep_milliseconds, "300"); +// The max retry times when hdfs write failed +DEFINE_mInt64(hdfs_jni_write_max_retry_time, "3"); + // clang-format off #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index d9c8e9c231c..029c2fc0227 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1295,6 +1295,13 @@ DECLARE_mBool(enable_injection_point); DECLARE_mBool(ignore_schema_change_check); +// The maximum jvm heap usage ratio for hdfs write workload +DECLARE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio); +// The sleep milliseconds duration when hdfs write exceeds the maximum usage +DECLARE_mInt64(hdfs_jni_write_sleep_milliseconds); +// The max retry times when hdfs write failed +DECLARE_mInt64(hdfs_jni_write_max_retry_time); + #ifdef BE_TEST // test s3 DECLARE_String(test_s3_resource); diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index 344609d382e..7af5208592f 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -20,11 +20,15 @@ #include <fcntl.h> #include <fmt/core.h> +#include <chrono> #include <filesystem> #include <ostream> +#include <random> #include <string> +#include <thread> #include <utility> +#include "common/config.h" #include "common/logging.h" #include "common/status.h" #include "common/sync_point.h" @@ -36,6 +40,7 @@ #include "io/hdfs_util.h" #include "service/backend_options.h" #include "util/bvar_helper.h" +#include "util/jni-util.h" namespace doris::io { @@ -45,6 +50,75 @@ bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created"); bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_written"); static constexpr size_t MB = 1024 * 1024; +static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB + +inline std::default_random_engine make_random_engine() { + return std::default_random_engine( + static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count())); +} + +// In practice, we've found that if the import frequency to HDFS is too fast, +// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI. +// For this, we should have a method to monitor how much JVM memory is currently being used. +// The HdfsWriteMemUsageRecorder class increments a recorded value during hdfsWrite when writing to HDFS. +// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile +// which ensures that the client's buffer is sent to the data node and returned with an acknowledgment before returning to the caller. +// HdfsWriteMemUsageRecorder would reduce the mem usage at that time. +// If the current usage exceeds the maximum set by the user, the current mem acquire would return failure. +// The caller could do sleep to wait for free memory. +class HdfsWriteMemUsageRecorder { +public: + HdfsWriteMemUsageRecorder() = default; + ~HdfsWriteMemUsageRecorder() = default; + size_t max_usage() const { + return static_cast<size_t>(max_jvm_heap_size() * + config::max_hdfs_wirter_jni_heap_usage_ratio); + } + Status acquire_memory(size_t memory_size, int try_time) { +#if defined(USE_LIBHDFS3) || defined(BE_TEST) + return Status::OK(); +#else + auto unit = config::hdfs_jni_write_sleep_milliseconds; + std::default_random_engine rng = make_random_engine(); + std::uniform_int_distribution<uint32_t> u(unit, 2 * unit); + std::uniform_int_distribution<uint32_t> u2(2 * unit, 4 * unit); + auto duration_ms = + try_time < (config::hdfs_jni_write_max_retry_time / 2) ? u(rng) : u2(rng); + std::unique_lock lck {cur_memory_latch}; + cv.wait_for(lck, std::chrono::milliseconds(duration_ms), + [&]() { return cur_memory_comsuption + memory_size <= max_usage(); }); + if (cur_memory_comsuption + memory_size > max_usage()) { + lck.unlock(); + return Status::InternalError<false>( + "Run out of Jni jvm heap space, current limit size is {}, max heap size is {}, " + "ratio is {}", + max_usage(), max_jvm_heap_size(), config::max_hdfs_wirter_jni_heap_usage_ratio); + } + cur_memory_comsuption += memory_size; + return Status::OK(); +#endif + } + + void release_memory(size_t memory_size) { +#if defined(USE_LIBHDFS3) || defined(BE_TEST) +#else + std::unique_lock lck {cur_memory_latch}; + size_t origin_size = cur_memory_comsuption; + cur_memory_comsuption -= memory_size; + if (cur_memory_comsuption < max_usage() && origin_size > max_usage()) { + cv.notify_all(); + } +#endif + } + +private: + size_t max_jvm_heap_size() const { return JniUtil::get_max_jni_heap_memory_size(); } + [[maybe_unused]] std::size_t cur_memory_comsuption {0}; + std::mutex cur_memory_latch; + std::condition_variable cv; +}; + +static HdfsWriteMemUsageRecorder g_hdfs_write_rate_limiter; HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, hdfsFile hdfs_file, std::string fs_name, const FileWriterOptions* opts) @@ -69,11 +143,56 @@ HdfsFileWriter::~HdfsFileWriter() { if (_hdfs_file) { SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + _flush_and_reset_approximate_jni_buffer_size(); } hdfs_file_being_written << -1; } +void HdfsFileWriter::_flush_and_reset_approximate_jni_buffer_size() { + g_hdfs_write_rate_limiter.release_memory(_approximate_jni_buffer_size); + _approximate_jni_buffer_size = 0; +} + +Status HdfsFileWriter::_acquire_jni_memory(size_t size) { +#ifdef USE_LIBHDFS3 + return Status::OK(); +#else + size_t actual_size = std::max(CLIENT_WRITE_PACKET_SIZE, size); + int try_time = 0; + if (auto st = g_hdfs_write_rate_limiter.acquire_memory(actual_size, try_time); !st.ok()) { + if (_approximate_jni_buffer_size > 0) { + int ret; + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hflush_latency); + ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsHFlush(_hdfs_handler->hdfs_fs, _hdfs_file), + "HdfsFileWriter::close::hdfsHFlush"); + } + _flush_and_reset_approximate_jni_buffer_size(); + if (ret != 0) { + return Status::InternalError( + "Write hdfs file failed. (BE: {}) namenode:{}, path:{}, err: {}, " + "file_size={}", + BackendOptions::get_localhost(), _fs_name, _path.native(), hdfs_error(), + bytes_appended()); + } + } + // Other hdfs writers might have occupied too much memory, we need to sleep for a while to wait for them + // releasing their memory + for (; try_time < config::hdfs_jni_write_max_retry_time; try_time++) { + if (g_hdfs_write_rate_limiter.acquire_memory(actual_size, try_time).ok()) { + _approximate_jni_buffer_size += actual_size; + return Status::OK(); + } + } + return st; + } + + _approximate_jni_buffer_size += actual_size; + return Status::OK(); +#endif +} + Status HdfsFileWriter::close() { if (_closed) { return Status::OK(); @@ -89,7 +208,9 @@ Status HdfsFileWriter::close() { #ifdef USE_LIBHDFS3 ret = hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file); #else - ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file); + ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file), + "HdfsFileWriter::close::hdfsHSync"); + _flush_and_reset_approximate_jni_buffer_size(); #endif } @@ -104,7 +225,9 @@ Status HdfsFileWriter::close() { SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); // The underlying implementation will invoke `hdfsHFlush` to flush buffered data and wait for // the HDFS response, but won't guarantee the synchronization of data to HDFS. - ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file), + "HdfsFileWriter::close::hdfsCloseFile"); + _flush_and_reset_approximate_jni_buffer_size(); } _hdfs_file = nullptr; if (ret != 0) { @@ -173,6 +296,7 @@ void HdfsFileWriter::_write_into_local_file_cache() { } Status HdfsFileWriter::append_hdfs_file(std::string_view content) { + RETURN_IF_ERROR(_acquire_jni_memory(content.size())); while (!content.empty()) { int64_t written_bytes; { @@ -250,7 +374,14 @@ Status HdfsFileWriter::finalize() { } // Flush buffered data to HDFS without waiting for HDFS response - int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file); + int ret; + { + SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency); + ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file), + "HdfsFileWriter::finalize::hdfsFlush"); + } + TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsFlush", + Status::InternalError("failed to flush hdfs file")); if (ret != 0) { return Status::InternalError( "failed to flush hdfs file. fs_name={} path={} : {}, file_size={}", _fs_name, @@ -280,6 +411,7 @@ Result<FileWriterPtr> HdfsFileWriter::create(Path full_path, std::shared_ptr<Hdf } #endif // open file + hdfsFile hdfs_file = nullptr; { SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_open_latency); diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h index 234835e083f..4ed30c5a856 100644 --- a/be/src/io/fs/hdfs_file_writer.h +++ b/be/src/io/fs/hdfs_file_writer.h @@ -62,6 +62,8 @@ private: Status append_hdfs_file(std::string_view content); void _write_into_local_file_cache(); Status _append(std::string_view content); + void _flush_and_reset_approximate_jni_buffer_size(); + Status _acquire_jni_memory(size_t size); Path _path; std::shared_ptr<HdfsHandler> _hdfs_handler = nullptr; @@ -87,6 +89,7 @@ private: std::string _batch_buffer; }; BatchBuffer _batch_buffer; + size_t _approximate_jni_buffer_size = 0; }; } // namespace io diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp index a9563a82139..02b6fadb310 100644 --- a/be/src/io/hdfs_util.cpp +++ b/be/src/io/hdfs_util.cpp @@ -75,6 +75,7 @@ bvar::LatencyRecorder hdfs_create_dir_latency("hdfs_create_dir"); bvar::LatencyRecorder hdfs_open_latency("hdfs_open"); bvar::LatencyRecorder hdfs_close_latency("hdfs_close"); bvar::LatencyRecorder hdfs_flush_latency("hdfs_flush"); +bvar::LatencyRecorder hdfs_hflush_latency("hdfs_hflush"); bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync"); }; // namespace hdfs_bvar diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h index c8d25d1f30a..7381ae7104c 100644 --- a/be/src/io/hdfs_util.h +++ b/be/src/io/hdfs_util.h @@ -46,6 +46,7 @@ extern bvar::LatencyRecorder hdfs_create_dir_latency; extern bvar::LatencyRecorder hdfs_open_latency; extern bvar::LatencyRecorder hdfs_close_latency; extern bvar::LatencyRecorder hdfs_flush_latency; +extern bvar::LatencyRecorder hdfs_hflush_latency; extern bvar::LatencyRecorder hdfs_hsync_latency; }; // namespace hdfs_bvar diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp index 6e66f97ddae..4ef0ffc7acb 100644 --- a/be/src/util/jni-util.cpp +++ b/be/src/util/jni-util.cpp @@ -26,6 +26,7 @@ #include <cstdlib> #include <filesystem> #include <iterator> +#include <limits> #include <memory> #include <mutex> #include <sstream> @@ -153,6 +154,7 @@ const std::string GetDorisJNIClasspathOption() { if (JNI_OK != res) { DCHECK(false) << "Failed to create JVM, code= " << res; } + } else { CHECK_EQ(rv, 0) << "Could not find any created Java VM"; CHECK_EQ(num_vms, 1) << "No VMs returned"; @@ -173,6 +175,7 @@ jmethodID JniUtil::get_jvm_threads_id_ = nullptr; jmethodID JniUtil::get_jmx_json_ = nullptr; jobject JniUtil::jni_scanner_loader_obj_ = nullptr; jmethodID JniUtil::jni_scanner_loader_method_ = nullptr; +jlong JniUtil::max_jvm_heap_memory_size_ = 0; Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* out) { DCHECK(jstr != nullptr); @@ -204,6 +207,50 @@ Status JniLocalFrame::push(JNIEnv* env, int max_local_ref) { return Status::OK(); } +void JniUtil::parse_max_heap_memory_size_from_jvm(JNIEnv* env) { + // The start_be.sh would set JAVA_OPTS + std::string java_opts = getenv("JAVA_OPTS") ? getenv("JAVA_OPTS") : ""; + std::istringstream iss(java_opts); + std::string opt; + while (iss >> opt) { + if (opt.find("-Xmx") == 0) { + std::string xmxValue = opt.substr(4); + char unit = xmxValue.back(); + xmxValue.pop_back(); + long long value = std::stoll(xmxValue); + switch (unit) { + case 'g': + case 'G': + max_jvm_heap_memory_size_ = value * 1024 * 1024 * 1024; + break; + case 'm': + case 'M': + max_jvm_heap_memory_size_ = value * 1024 * 1024; + break; + case 'k': + case 'K': + max_jvm_heap_memory_size_ = value * 1024; + break; + default: + max_jvm_heap_memory_size_ = value; + break; + } + } + } + if (0 == max_jvm_heap_memory_size_) { + LOG(FATAL) << "the max_jvm_heap_memory_size_ is " << max_jvm_heap_memory_size_; + } + LOG(INFO) << "the max_jvm_heap_memory_size_ is " << max_jvm_heap_memory_size_; +} + +size_t JniUtil::get_max_jni_heap_memory_size() { +#if defined(USE_LIBHDFS3) || defined(BE_TEST) + return std::numeric_limits<size_t>::max(); +#else + return max_jvm_heap_memory_size_; +#endif +} + Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) { DCHECK(!tls_env_) << "Call GetJNIEnv() fast path"; @@ -220,6 +267,9 @@ Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) { // the hadoop libhdfs will do all the stuff SetEnvIfNecessary(); tls_env_ = getJNIEnv(); + if (nullptr != tls_env_) { + parse_max_heap_memory_size_from_jvm(tls_env_); + } #endif *env = tls_env_; return Status::OK(); diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index ca305c32bf1..221e27a9bab 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -84,8 +84,10 @@ public: static Status get_jni_scanner_class(JNIEnv* env, const char* classname, jclass* loaded_class); static jobject convert_to_java_map(JNIEnv* env, const std::map<std::string, std::string>& map); static std::map<std::string, std::string> convert_to_cpp_map(JNIEnv* env, jobject map); + static size_t get_max_jni_heap_memory_size(); private: + static void parse_max_heap_memory_size_from_jvm(JNIEnv* env); static Status GetJNIEnvSlowPath(JNIEnv** env); static Status init_jni_scanner_loader(JNIEnv* env); @@ -103,6 +105,7 @@ private: static jmethodID jni_scanner_loader_method_; // Thread-local cache of the JNIEnv for this thread. static __thread JNIEnv* tls_env_; + static jlong max_jvm_heap_memory_size_; }; /// Helper class for lifetime management of chars from JNI, releasing JNI chars when --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org