This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f0def4ec0aa [feature](Cloud) Try to do memory limit control for hdfs 
write (#34354) (#34787)
f0def4ec0aa is described below

commit f0def4ec0aa4b188c5071b00e8080601d1e78b24
Author: AlexYue <yj976240...@gmail.com>
AuthorDate: Mon May 13 20:32:27 2024 +0800

    [feature](Cloud) Try to do memory limit control for hdfs write (#34354) 
(#34787)
---
 be/src/common/config.cpp          |   9 ++-
 be/src/common/config.h            |   7 ++
 be/src/io/fs/hdfs_file_writer.cpp | 138 +++++++++++++++++++++++++++++++++++++-
 be/src/io/fs/hdfs_file_writer.h   |   3 +
 be/src/io/hdfs_util.cpp           |   1 +
 be/src/io/hdfs_util.h             |   1 +
 be/src/util/jni-util.cpp          |  50 ++++++++++++++
 be/src/util/jni-util.h            |   3 +
 8 files changed, 208 insertions(+), 4 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 3dcce044deb..c449a7e0176 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1028,7 +1028,7 @@ DEFINE_mInt64(s3_write_buffer_size, "5242880");
 // The timeout config for S3 buffer allocation
 DEFINE_mInt32(s3_writer_buffer_allocation_timeout, "300");
 DEFINE_mInt64(file_cache_max_file_reader_cache_size, "1000000");
-DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "4"); // 4MB
+DEFINE_mInt64(hdfs_write_batch_buffer_size_mb, "1"); // 1MB
 
 //disable shrink memory by default
 DEFINE_mBool(enable_shrink_memory, "false");
@@ -1217,6 +1217,13 @@ DEFINE_mBool(enable_injection_point, "false");
 
 DEFINE_mBool(ignore_schema_change_check, "false");
 
+// The maximum jvm heap usage ratio for hdfs write workload
+DEFINE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio, "0.5");
+// The sleep milliseconds duration when hdfs write exceeds the maximum usage
+DEFINE_mInt64(hdfs_jni_write_sleep_milliseconds, "300");
+// The max retry times when hdfs write failed
+DEFINE_mInt64(hdfs_jni_write_max_retry_time, "3");
+
 // clang-format off
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index d9c8e9c231c..029c2fc0227 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1295,6 +1295,13 @@ DECLARE_mBool(enable_injection_point);
 
 DECLARE_mBool(ignore_schema_change_check);
 
+// The maximum jvm heap usage ratio for hdfs write workload
+DECLARE_mDouble(max_hdfs_wirter_jni_heap_usage_ratio);
+// The sleep milliseconds duration when hdfs write exceeds the maximum usage
+DECLARE_mInt64(hdfs_jni_write_sleep_milliseconds);
+// The max retry times when hdfs write failed
+DECLARE_mInt64(hdfs_jni_write_max_retry_time);
+
 #ifdef BE_TEST
 // test s3
 DECLARE_String(test_s3_resource);
diff --git a/be/src/io/fs/hdfs_file_writer.cpp 
b/be/src/io/fs/hdfs_file_writer.cpp
index 344609d382e..7af5208592f 100644
--- a/be/src/io/fs/hdfs_file_writer.cpp
+++ b/be/src/io/fs/hdfs_file_writer.cpp
@@ -20,11 +20,15 @@
 #include <fcntl.h>
 #include <fmt/core.h>
 
+#include <chrono>
 #include <filesystem>
 #include <ostream>
+#include <random>
 #include <string>
+#include <thread>
 #include <utility>
 
+#include "common/config.h"
 #include "common/logging.h"
 #include "common/status.h"
 #include "common/sync_point.h"
@@ -36,6 +40,7 @@
 #include "io/hdfs_util.h"
 #include "service/backend_options.h"
 #include "util/bvar_helper.h"
+#include "util/jni-util.h"
 
 namespace doris::io {
 
@@ -45,6 +50,75 @@ bvar::Adder<uint64_t> 
hdfs_file_created_total("hdfs_file_writer_file_created");
 bvar::Adder<uint64_t> 
hdfs_file_being_written("hdfs_file_writer_file_being_written");
 
 static constexpr size_t MB = 1024 * 1024;
+static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
+
+inline std::default_random_engine make_random_engine() {
+    return std::default_random_engine(
+            
static_cast<uint32_t>(std::chrono::steady_clock::now().time_since_epoch().count()));
+}
+
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is 
currently being used.
+// The HdfsWriteMemUsageRecorder class increments a recorded value during 
hdfsWrite when writing to HDFS.
+// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
+// which ensures that the client's buffer is sent to the data node and 
returned with an acknowledgment before returning to the caller.
+// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
+// If the current usage exceeds the maximum set by the user, the current mem 
acquire would return failure.
+// The caller could do sleep to wait for free memory.
+class HdfsWriteMemUsageRecorder {
+public:
+    HdfsWriteMemUsageRecorder() = default;
+    ~HdfsWriteMemUsageRecorder() = default;
+    size_t max_usage() const {
+        return static_cast<size_t>(max_jvm_heap_size() *
+                                   
config::max_hdfs_wirter_jni_heap_usage_ratio);
+    }
+    Status acquire_memory(size_t memory_size, int try_time) {
+#if defined(USE_LIBHDFS3) || defined(BE_TEST)
+        return Status::OK();
+#else
+        auto unit = config::hdfs_jni_write_sleep_milliseconds;
+        std::default_random_engine rng = make_random_engine();
+        std::uniform_int_distribution<uint32_t> u(unit, 2 * unit);
+        std::uniform_int_distribution<uint32_t> u2(2 * unit, 4 * unit);
+        auto duration_ms =
+                try_time < (config::hdfs_jni_write_max_retry_time / 2) ? 
u(rng) : u2(rng);
+        std::unique_lock lck {cur_memory_latch};
+        cv.wait_for(lck, std::chrono::milliseconds(duration_ms),
+                    [&]() { return cur_memory_comsuption + memory_size <= 
max_usage(); });
+        if (cur_memory_comsuption + memory_size > max_usage()) {
+            lck.unlock();
+            return Status::InternalError<false>(
+                    "Run out of Jni jvm heap space, current limit size is {}, 
max heap size is {}, "
+                    "ratio is {}",
+                    max_usage(), max_jvm_heap_size(), 
config::max_hdfs_wirter_jni_heap_usage_ratio);
+        }
+        cur_memory_comsuption += memory_size;
+        return Status::OK();
+#endif
+    }
+
+    void release_memory(size_t memory_size) {
+#if defined(USE_LIBHDFS3) || defined(BE_TEST)
+#else
+        std::unique_lock lck {cur_memory_latch};
+        size_t origin_size = cur_memory_comsuption;
+        cur_memory_comsuption -= memory_size;
+        if (cur_memory_comsuption < max_usage() && origin_size > max_usage()) {
+            cv.notify_all();
+        }
+#endif
+    }
+
+private:
+    size_t max_jvm_heap_size() const { return 
JniUtil::get_max_jni_heap_memory_size(); }
+    [[maybe_unused]] std::size_t cur_memory_comsuption {0};
+    std::mutex cur_memory_latch;
+    std::condition_variable cv;
+};
+
+static HdfsWriteMemUsageRecorder g_hdfs_write_rate_limiter;
 
 HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> 
handler, hdfsFile hdfs_file,
                                std::string fs_name, const FileWriterOptions* 
opts)
@@ -69,11 +143,56 @@ HdfsFileWriter::~HdfsFileWriter() {
     if (_hdfs_file) {
         SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
         hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+        _flush_and_reset_approximate_jni_buffer_size();
     }
 
     hdfs_file_being_written << -1;
 }
 
+void HdfsFileWriter::_flush_and_reset_approximate_jni_buffer_size() {
+    g_hdfs_write_rate_limiter.release_memory(_approximate_jni_buffer_size);
+    _approximate_jni_buffer_size = 0;
+}
+
+Status HdfsFileWriter::_acquire_jni_memory(size_t size) {
+#ifdef USE_LIBHDFS3
+    return Status::OK();
+#else
+    size_t actual_size = std::max(CLIENT_WRITE_PACKET_SIZE, size);
+    int try_time = 0;
+    if (auto st = g_hdfs_write_rate_limiter.acquire_memory(actual_size, 
try_time); !st.ok()) {
+        if (_approximate_jni_buffer_size > 0) {
+            int ret;
+            {
+                SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_hflush_latency);
+                ret = 
SYNC_POINT_HOOK_RETURN_VALUE(hdfsHFlush(_hdfs_handler->hdfs_fs, _hdfs_file),
+                                                   
"HdfsFileWriter::close::hdfsHFlush");
+            }
+            _flush_and_reset_approximate_jni_buffer_size();
+            if (ret != 0) {
+                return Status::InternalError(
+                        "Write hdfs file failed. (BE: {}) namenode:{}, 
path:{}, err: {}, "
+                        "file_size={}",
+                        BackendOptions::get_localhost(), _fs_name, 
_path.native(), hdfs_error(),
+                        bytes_appended());
+            }
+        }
+        // Other hdfs writers might have occupied too much memory, we need to 
sleep for a while to wait for them
+        // releasing their memory
+        for (; try_time < config::hdfs_jni_write_max_retry_time; try_time++) {
+            if (g_hdfs_write_rate_limiter.acquire_memory(actual_size, 
try_time).ok()) {
+                _approximate_jni_buffer_size += actual_size;
+                return Status::OK();
+            }
+        }
+        return st;
+    }
+
+    _approximate_jni_buffer_size += actual_size;
+    return Status::OK();
+#endif
+}
+
 Status HdfsFileWriter::close() {
     if (_closed) {
         return Status::OK();
@@ -89,7 +208,9 @@ Status HdfsFileWriter::close() {
 #ifdef USE_LIBHDFS3
             ret = hdfsSync(_hdfs_handler->hdfs_fs, _hdfs_file);
 #else
-            ret = hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file);
+            ret = 
SYNC_POINT_HOOK_RETURN_VALUE(hdfsHSync(_hdfs_handler->hdfs_fs, _hdfs_file),
+                                               
"HdfsFileWriter::close::hdfsHSync");
+            _flush_and_reset_approximate_jni_buffer_size();
 #endif
         }
 
@@ -104,7 +225,9 @@ Status HdfsFileWriter::close() {
         SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
         // The underlying implementation will invoke `hdfsHFlush` to flush 
buffered data and wait for
         // the HDFS response, but won't guarantee the synchronization of data 
to HDFS.
-        ret = hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+        ret = 
SYNC_POINT_HOOK_RETURN_VALUE(hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file),
+                                           
"HdfsFileWriter::close::hdfsCloseFile");
+        _flush_and_reset_approximate_jni_buffer_size();
     }
     _hdfs_file = nullptr;
     if (ret != 0) {
@@ -173,6 +296,7 @@ void HdfsFileWriter::_write_into_local_file_cache() {
 }
 
 Status HdfsFileWriter::append_hdfs_file(std::string_view content) {
+    RETURN_IF_ERROR(_acquire_jni_memory(content.size()));
     while (!content.empty()) {
         int64_t written_bytes;
         {
@@ -250,7 +374,14 @@ Status HdfsFileWriter::finalize() {
     }
 
     // Flush buffered data to HDFS without waiting for HDFS response
-    int ret = hdfsFlush(_hdfs_handler->hdfs_fs, _hdfs_file);
+    int ret;
+    {
+        SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_flush_latency);
+        ret = SYNC_POINT_HOOK_RETURN_VALUE(hdfsFlush(_hdfs_handler->hdfs_fs, 
_hdfs_file),
+                                           
"HdfsFileWriter::finalize::hdfsFlush");
+    }
+    TEST_INJECTION_POINT_RETURN_WITH_VALUE("HdfsFileWriter::hdfsFlush",
+                                           Status::InternalError("failed to 
flush hdfs file"));
     if (ret != 0) {
         return Status::InternalError(
                 "failed to flush hdfs file. fs_name={} path={} : {}, 
file_size={}", _fs_name,
@@ -280,6 +411,7 @@ Result<FileWriterPtr> HdfsFileWriter::create(Path 
full_path, std::shared_ptr<Hdf
     }
 #endif
     // open file
+
     hdfsFile hdfs_file = nullptr;
     {
         SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_open_latency);
diff --git a/be/src/io/fs/hdfs_file_writer.h b/be/src/io/fs/hdfs_file_writer.h
index 234835e083f..4ed30c5a856 100644
--- a/be/src/io/fs/hdfs_file_writer.h
+++ b/be/src/io/fs/hdfs_file_writer.h
@@ -62,6 +62,8 @@ private:
     Status append_hdfs_file(std::string_view content);
     void _write_into_local_file_cache();
     Status _append(std::string_view content);
+    void _flush_and_reset_approximate_jni_buffer_size();
+    Status _acquire_jni_memory(size_t size);
 
     Path _path;
     std::shared_ptr<HdfsHandler> _hdfs_handler = nullptr;
@@ -87,6 +89,7 @@ private:
         std::string _batch_buffer;
     };
     BatchBuffer _batch_buffer;
+    size_t _approximate_jni_buffer_size = 0;
 };
 
 } // namespace io
diff --git a/be/src/io/hdfs_util.cpp b/be/src/io/hdfs_util.cpp
index a9563a82139..02b6fadb310 100644
--- a/be/src/io/hdfs_util.cpp
+++ b/be/src/io/hdfs_util.cpp
@@ -75,6 +75,7 @@ bvar::LatencyRecorder 
hdfs_create_dir_latency("hdfs_create_dir");
 bvar::LatencyRecorder hdfs_open_latency("hdfs_open");
 bvar::LatencyRecorder hdfs_close_latency("hdfs_close");
 bvar::LatencyRecorder hdfs_flush_latency("hdfs_flush");
+bvar::LatencyRecorder hdfs_hflush_latency("hdfs_hflush");
 bvar::LatencyRecorder hdfs_hsync_latency("hdfs_hsync");
 }; // namespace hdfs_bvar
 
diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h
index c8d25d1f30a..7381ae7104c 100644
--- a/be/src/io/hdfs_util.h
+++ b/be/src/io/hdfs_util.h
@@ -46,6 +46,7 @@ extern bvar::LatencyRecorder hdfs_create_dir_latency;
 extern bvar::LatencyRecorder hdfs_open_latency;
 extern bvar::LatencyRecorder hdfs_close_latency;
 extern bvar::LatencyRecorder hdfs_flush_latency;
+extern bvar::LatencyRecorder hdfs_hflush_latency;
 extern bvar::LatencyRecorder hdfs_hsync_latency;
 }; // namespace hdfs_bvar
 
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 6e66f97ddae..4ef0ffc7acb 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -26,6 +26,7 @@
 #include <cstdlib>
 #include <filesystem>
 #include <iterator>
+#include <limits>
 #include <memory>
 #include <mutex>
 #include <sstream>
@@ -153,6 +154,7 @@ const std::string GetDorisJNIClasspathOption() {
         if (JNI_OK != res) {
             DCHECK(false) << "Failed to create JVM, code= " << res;
         }
+
     } else {
         CHECK_EQ(rv, 0) << "Could not find any created Java VM";
         CHECK_EQ(num_vms, 1) << "No VMs returned";
@@ -173,6 +175,7 @@ jmethodID JniUtil::get_jvm_threads_id_ = nullptr;
 jmethodID JniUtil::get_jmx_json_ = nullptr;
 jobject JniUtil::jni_scanner_loader_obj_ = nullptr;
 jmethodID JniUtil::jni_scanner_loader_method_ = nullptr;
+jlong JniUtil::max_jvm_heap_memory_size_ = 0;
 
 Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* 
out) {
     DCHECK(jstr != nullptr);
@@ -204,6 +207,50 @@ Status JniLocalFrame::push(JNIEnv* env, int max_local_ref) 
{
     return Status::OK();
 }
 
+void JniUtil::parse_max_heap_memory_size_from_jvm(JNIEnv* env) {
+    // The start_be.sh would set JAVA_OPTS
+    std::string java_opts = getenv("JAVA_OPTS") ? getenv("JAVA_OPTS") : "";
+    std::istringstream iss(java_opts);
+    std::string opt;
+    while (iss >> opt) {
+        if (opt.find("-Xmx") == 0) {
+            std::string xmxValue = opt.substr(4);
+            char unit = xmxValue.back();
+            xmxValue.pop_back();
+            long long value = std::stoll(xmxValue);
+            switch (unit) {
+            case 'g':
+            case 'G':
+                max_jvm_heap_memory_size_ = value * 1024 * 1024 * 1024;
+                break;
+            case 'm':
+            case 'M':
+                max_jvm_heap_memory_size_ = value * 1024 * 1024;
+                break;
+            case 'k':
+            case 'K':
+                max_jvm_heap_memory_size_ = value * 1024;
+                break;
+            default:
+                max_jvm_heap_memory_size_ = value;
+                break;
+            }
+        }
+    }
+    if (0 == max_jvm_heap_memory_size_) {
+        LOG(FATAL) << "the max_jvm_heap_memory_size_ is " << 
max_jvm_heap_memory_size_;
+    }
+    LOG(INFO) << "the max_jvm_heap_memory_size_ is " << 
max_jvm_heap_memory_size_;
+}
+
+size_t JniUtil::get_max_jni_heap_memory_size() {
+#if defined(USE_LIBHDFS3) || defined(BE_TEST)
+    return std::numeric_limits<size_t>::max();
+#else
+    return max_jvm_heap_memory_size_;
+#endif
+}
+
 Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) {
     DCHECK(!tls_env_) << "Call GetJNIEnv() fast path";
 
@@ -220,6 +267,9 @@ Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) {
     // the hadoop libhdfs will do all the stuff
     SetEnvIfNecessary();
     tls_env_ = getJNIEnv();
+    if (nullptr != tls_env_) {
+        parse_max_heap_memory_size_from_jvm(tls_env_);
+    }
 #endif
     *env = tls_env_;
     return Status::OK();
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index ca305c32bf1..221e27a9bab 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -84,8 +84,10 @@ public:
     static Status get_jni_scanner_class(JNIEnv* env, const char* classname, 
jclass* loaded_class);
     static jobject convert_to_java_map(JNIEnv* env, const 
std::map<std::string, std::string>& map);
     static std::map<std::string, std::string> convert_to_cpp_map(JNIEnv* env, 
jobject map);
+    static size_t get_max_jni_heap_memory_size();
 
 private:
+    static void parse_max_heap_memory_size_from_jvm(JNIEnv* env);
     static Status GetJNIEnvSlowPath(JNIEnv** env);
     static Status init_jni_scanner_loader(JNIEnv* env);
 
@@ -103,6 +105,7 @@ private:
     static jmethodID jni_scanner_loader_method_;
     // Thread-local cache of the JNIEnv for this thread.
     static __thread JNIEnv* tls_env_;
+    static jlong max_jvm_heap_memory_size_;
 };
 
 /// Helper class for lifetime management of chars from JNI, releasing JNI 
chars when


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to