platoneko commented on code in PR #34354:
URL: https://github.com/apache/doris/pull/34354#discussion_r1597342914


##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -45,6 +49,68 @@ bvar::Adder<uint64_t> 
hdfs_file_created_total("hdfs_file_writer_file_created");
 bvar::Adder<uint64_t> 
hdfs_file_being_written("hdfs_file_writer_file_being_written");
 
 static constexpr size_t MB = 1024 * 1024;
+static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
+
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is 
currently being used.
+// The HdfsWriteMemUsageRecorder class increments a recorded value during 
hdfsWrite when writing to HDFS.
+// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
+// which ensures that the client's buffer is sent to the data node and 
returned with an acknowledgment before returning to the caller.
+// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
+// If the current usage exceeds the maximum set by the user, the current mem 
acquire would return failure.
+// The caller could do sleep to wait for free memory.
+class HdfsWriteMemUsageRecorder {
+public:
+    HdfsWriteMemUsageRecorder() = default;
+    ~HdfsWriteMemUsageRecorder() = default;
+    size_t max_usage() const {
+        return static_cast<size_t>(max_jvm_heap_size() *
+                                   
config::max_hdfs_wirter_jni_heap_usage_ratio);
+    }
+    Status acquire_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+        return Status::OK();
+#elif BE_TEST
+        return Status::OK();
+#else
+
+        std::unique_lock lck {cur_memory_latch};
+        cv.wait_for(lck, 
std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds),
+                    [&]() { return cur_memory_comsuption + memory_size <= 
max_usage(); });
+        if (cur_memory_comsuption + memory_size > max_usage()) {
+            return Status::InternalError(
+                    "Run out of Jni jvm heap space, current limit size is {}, 
max heap size is {}, "
+                    "ratio is {}",
+                    max_usage(), max_jvm_heap_size(), 
config::max_hdfs_wirter_jni_heap_usage_ratio);
+        }
+        cur_memory_comsuption += memory_size;
+        return Status::OK();
+#endif
+    }
+
+    void release_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+        return;
+#elif BE_TEST
+#else
+        std::unique_lock lck {cur_memory_latch};
+        size_t origin_size = cur_memory_comsuption;
+        cur_memory_comsuption -= memory_size;
+        if (cur_memory_comsuption <= max_usage() && origin_size > max_usage()) 
{
+            cv.notify_one();

Review Comment:
   notify_all



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -46,6 +49,52 @@ bvar::Adder<uint64_t> 
hdfs_file_being_written("hdfs_file_writer_file_being_writt
 
 static constexpr size_t MB = 1024 * 1024;
 
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is 
currently being used.
+// The HdfsWriteRateLimit class increments a recorded value during hdfsWrite 
when writing to HDFS.
+// When hdfsCloseFile is called, all related memory in the JVM will be 
invalidated, so the recorded value can be decreased at that time.
+// If the current usage exceeds the maximum set by the user, the current write 
will sleep.
+// If the number of sleeps exceeds the number specified by the user, then the 
current write is considered to have failed
+class HdfsWriteRateLimit {
+public:
+    HdfsWriteRateLimit()
+            : max_jvm_heap_size(JniUtil::get_max_jni_heap_memory_size()),
+              cur_memory_comsuption(0) {}
+    size_t max_usage() const {
+        return static_cast<size_t>(max_jvm_heap_size * 
config::max_hdfs_jni_heap_usage_ratio);
+    }
+    Status do_rate_limit(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+        return Status::OK();
+#endif
+        for (int retry_time = config::hsfs_jni_write_max_retry_time; 
retry_time != 0;

Review Comment:
   
   ```suggestion
           for (int retry_time = config::hdfs_jni_write_max_retry_time; 
retry_time > 0;
   ```



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -44,7 +47,85 @@ bvar::Adder<uint64_t> 
hdfs_bytes_written_total("hdfs_file_writer_bytes_written")
 bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created");
 bvar::Adder<uint64_t> 
hdfs_file_being_written("hdfs_file_writer_file_being_written");
 
+class HdfsMaxConnectionLimiter {
+public:
+    HdfsMaxConnectionLimiter() = default;
+    ~HdfsMaxConnectionLimiter() = default;
+
+    void add_inflight_writer() {
+        std::unique_lock lck {_latch};
+        _cv.wait(lck, [&]() {
+            return _cur_inflight_writer < 
config::max_inflight_hdfs_write_connection;
+        });
+        _cur_inflight_writer++;
+    }
+
+    void reduce_inflight_writer() {
+        std::unique_lock lck {_latch};
+        _cur_inflight_writer--;
+        _cv.notify_one();
+    }
+
+private:
+    std::mutex _latch;
+    std::condition_variable _cv;
+    int64_t _cur_inflight_writer {0};
+};
+
 static constexpr size_t MB = 1024 * 1024;
+static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
+
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is 
currently being used.
+// The HdfsWriteMemUsageRecorder class increments a recorded value during 
hdfsWrite when writing to HDFS.
+// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
+// which ensures that the client's buffer is sent to the data node and 
returned with an acknowledgment before returning to the caller.
+// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
+// If the current usage exceeds the maximum set by the user, the current mem 
acquire would return failure.
+// The caller could do sleep to wait for free memory.
+class HdfsWriteMemUsageRecorder {
+public:
+    HdfsWriteMemUsageRecorder()
+            : max_jvm_heap_size(JniUtil::get_max_jni_heap_memory_size()), 
cur_memory_comsuption(0) {
+        LOG_INFO("the max jvm heap size is {}", max_jvm_heap_size);
+    }
+    ~HdfsWriteMemUsageRecorder() = default;
+    size_t max_usage() const {
+        return static_cast<size_t>(max_jvm_heap_size *
+                                   
config::max_hdfs_wirter_jni_heap_usage_ratio);
+    }
+    Status acquire_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+        return Status::OK();
+#elif BE_TEST
+        return Status::OK();
+#else
+        if (cur_memory_comsuption + memory_size > max_usage()) {
+            return Status::InternalError("Run out of Jni jvm heap space, 
current limit size is {}",

Review Comment:
   
   ```suggestion
               return Status::InternalError<false>("Run out of Jni jvm heap 
space, current limit size is {}",
   ```



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -71,11 +137,52 @@ HdfsFileWriter::~HdfsFileWriter() {
     if (_hdfs_file) {
         SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency);
         hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file);
+        _flush_and_reset_approximate_jni_buffer_size();
     }
 
     hdfs_file_being_written << -1;
 }
 
+void HdfsFileWriter::_flush_and_reset_approximate_jni_buffer_size() {
+    g_hdfs_write_rate_limiter.release_memory(_approximate_jni_buffer_size);
+    _approximate_jni_buffer_size = 0;
+}
+
+Status HdfsFileWriter::_acquire_jni_memory(size_t size) {
+#ifdef USE_LIBHDFS3
+    return Status::OK();
+#else
+    size_t actual_size = std::max(CLIENT_WRITE_PACKET_SIZE, size);
+    if (auto st = g_hdfs_write_rate_limiter.acquire_memory(actual_size); 
!st.ok()) {
+        int ret;

Review Comment:
   在 HFlush 前判断下 _approximate_jni_buffer_size > 0



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -46,6 +49,52 @@ bvar::Adder<uint64_t> 
hdfs_file_being_written("hdfs_file_writer_file_being_writt
 
 static constexpr size_t MB = 1024 * 1024;
 
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is 
currently being used.
+// The HdfsWriteRateLimit class increments a recorded value during hdfsWrite 
when writing to HDFS.
+// When hdfsCloseFile is called, all related memory in the JVM will be 
invalidated, so the recorded value can be decreased at that time.
+// If the current usage exceeds the maximum set by the user, the current write 
will sleep.
+// If the number of sleeps exceeds the number specified by the user, then the 
current write is considered to have failed
+class HdfsWriteRateLimit {
+public:
+    HdfsWriteRateLimit()
+            : max_jvm_heap_size(JniUtil::get_max_jni_heap_memory_size()),
+              cur_memory_comsuption(0) {}
+    size_t max_usage() const {
+        return static_cast<size_t>(max_jvm_heap_size * 
config::max_hdfs_jni_heap_usage_ratio);
+    }
+    Status do_rate_limit(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+        return Status::OK();
+#endif
+        for (int retry_time = config::hsfs_jni_write_max_retry_time; 
retry_time != 0;
+             retry_time--) {
+            if (cur_memory_comsuption + memory_size > max_usage()) {
+                std::this_thread::sleep_for(
+                        
std::chrono::milliseconds(config::max_hdfs_jni_write_sleep_milliseconds));
+                continue;
+            }
+            cur_memory_comsuption.fetch_add(memory_size);
+            return Status::OK();
+        }
+        return Status::InternalError("Run out of Jni jvm heap space");
+    }
+
+    void release_jni_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+        return;
+#endif
+        cur_memory_comsuption.fetch_sub(memory_size);

Review Comment:
   
   ```suggestion
           cur_memory_comsuption.fetch_sub(memory_size, 
std::memory_order_relaxed);
   ```



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -45,6 +49,68 @@ bvar::Adder<uint64_t> 
hdfs_file_created_total("hdfs_file_writer_file_created");
 bvar::Adder<uint64_t> 
hdfs_file_being_written("hdfs_file_writer_file_being_written");
 
 static constexpr size_t MB = 1024 * 1024;
+static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
+
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is 
currently being used.
+// The HdfsWriteMemUsageRecorder class increments a recorded value during 
hdfsWrite when writing to HDFS.
+// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
+// which ensures that the client's buffer is sent to the data node and 
returned with an acknowledgment before returning to the caller.
+// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
+// If the current usage exceeds the maximum set by the user, the current mem 
acquire would return failure.
+// The caller could do sleep to wait for free memory.
+class HdfsWriteMemUsageRecorder {
+public:
+    HdfsWriteMemUsageRecorder() = default;
+    ~HdfsWriteMemUsageRecorder() = default;
+    size_t max_usage() const {
+        return static_cast<size_t>(max_jvm_heap_size() *
+                                   
config::max_hdfs_wirter_jni_heap_usage_ratio);
+    }
+    Status acquire_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+        return Status::OK();
+#elif BE_TEST
+        return Status::OK();
+#else
+
+        std::unique_lock lck {cur_memory_latch};
+        cv.wait_for(lck, 
std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds),
+                    [&]() { return cur_memory_comsuption + memory_size <= 
max_usage(); });
+        if (cur_memory_comsuption + memory_size > max_usage()) {
+            return Status::InternalError(

Review Comment:
   `lck.unlock()` before return `InternalError`



##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -45,6 +49,68 @@ bvar::Adder<uint64_t> 
hdfs_file_created_total("hdfs_file_writer_file_created");
 bvar::Adder<uint64_t> 
hdfs_file_being_written("hdfs_file_writer_file_being_written");
 
 static constexpr size_t MB = 1024 * 1024;
+static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
+
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is 
currently being used.
+// The HdfsWriteMemUsageRecorder class increments a recorded value during 
hdfsWrite when writing to HDFS.
+// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
+// which ensures that the client's buffer is sent to the data node and 
returned with an acknowledgment before returning to the caller.
+// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
+// If the current usage exceeds the maximum set by the user, the current mem 
acquire would return failure.
+// The caller could do sleep to wait for free memory.
+class HdfsWriteMemUsageRecorder {
+public:
+    HdfsWriteMemUsageRecorder() = default;
+    ~HdfsWriteMemUsageRecorder() = default;
+    size_t max_usage() const {
+        return static_cast<size_t>(max_jvm_heap_size() *
+                                   
config::max_hdfs_wirter_jni_heap_usage_ratio);
+    }
+    Status acquire_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+        return Status::OK();
+#elif BE_TEST
+        return Status::OK();
+#else
+
+        std::unique_lock lck {cur_memory_latch};
+        cv.wait_for(lck, 
std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds),
+                    [&]() { return cur_memory_comsuption + memory_size <= 
max_usage(); });
+        if (cur_memory_comsuption + memory_size > max_usage()) {
+            return Status::InternalError(

Review Comment:
   Maybe stacktrace should be disabled here, use `InternalError<false>`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to