gavinchou commented on code in PR #34354:
URL: https://github.com/apache/doris/pull/34354#discussion_r1595754105


##########
be/src/io/fs/hdfs_file_writer.cpp:
##########
@@ -44,7 +48,112 @@ bvar::Adder<uint64_t> 
hdfs_bytes_written_total("hdfs_file_writer_bytes_written")
 bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created");
 bvar::Adder<uint64_t> 
hdfs_file_being_written("hdfs_file_writer_file_being_written");
 
+class HdfsMaxConnectionLimiter {
+public:
+    HdfsMaxConnectionLimiter() = default;
+    ~HdfsMaxConnectionLimiter() = default;
+
+    Status add_inflight_writer() {
+        int retry = config::hdfs_jni_write_max_retry_time;
+        std::unique_lock lck {_latch};
+        while (!_cv.wait_for(
+                lck, 
std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds), [&]() {
+                    return _cur_inflight_writer < 
config::max_inflight_hdfs_write_connection;
+                })) {
+            if (retry > 0) {
+                retry--;
+                continue;
+            }
+        }
+        if (_cur_inflight_writer >= 
config::max_inflight_hdfs_write_connection) {
+            return Status::InternalError(
+                    "No avaiable hdfs connection, current connections num is 
{}",
+                    _cur_inflight_writer);
+        }
+
+        _cur_inflight_writer++;
+        return Status::OK();
+    }
+
+    void reduce_inflight_writer() {
+        std::unique_lock lck {_latch};
+        auto origin = _cur_inflight_writer;
+        _cur_inflight_writer--;
+        if (origin == config::max_inflight_hdfs_write_connection) {
+            _cv.notify_one();
+        }
+    }
+
+private:
+    std::mutex _latch;
+    std::condition_variable _cv;
+    int64_t _cur_inflight_writer {0};
+};
+
 static constexpr size_t MB = 1024 * 1024;
+static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB
+
+// In practice, we've found that if the import frequency to HDFS is too fast,
+// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI.
+// For this, we should have a method to monitor how much JVM memory is 
currently being used.
+// The HdfsWriteMemUsageRecorder class increments a recorded value during 
hdfsWrite when writing to HDFS.
+// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile
+// which ensures that the client's buffer is sent to the data node and 
returned with an acknowledgment before returning to the caller.
+// HdfsWriteMemUsageRecorder would reduce the mem usage at that time.
+// If the current usage exceeds the maximum set by the user, the current mem 
acquire would return failure.
+// The caller could do sleep to wait for free memory.
+class HdfsWriteMemUsageRecorder {
+public:
+    HdfsWriteMemUsageRecorder() = default;
+    ~HdfsWriteMemUsageRecorder() = default;
+    size_t max_usage() const {
+        return static_cast<size_t>(max_jvm_heap_size() *
+                                   
config::max_hdfs_wirter_jni_heap_usage_ratio);
+    }
+    Status acquire_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+        return Status::OK();
+#elif BE_TEST
+        return Status::OK();
+#else
+
+        std::unique_lock lck {cur_memory_latch};
+        cv.wait_for(lck, 
std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds),
+                    [&]() { return cur_memory_comsuption + memory_size <= 
max_usage(); });
+        if (cur_memory_comsuption + memory_size > max_usage()) {
+            return Status::InternalError(
+                    "Run out of Jni jvm heap space, current limit size is {}, 
max heap size is {}, "
+                    "ratio is {}",
+                    max_usage(), max_jvm_heap_size(), 
config::max_hdfs_wirter_jni_heap_usage_ratio);
+        }
+        cur_memory_comsuption += memory_size;
+        return Status::OK();
+#endif
+    }
+
+    void release_memory(size_t memory_size) {
+#ifdef USE_LIBHDFS3
+        return;
+#elif BE_TEST

Review Comment:
   return?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to