gavinchou commented on code in PR #34354: URL: https://github.com/apache/doris/pull/34354#discussion_r1595754105
########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -44,7 +48,112 @@ bvar::Adder<uint64_t> hdfs_bytes_written_total("hdfs_file_writer_bytes_written") bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created"); bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_written"); +class HdfsMaxConnectionLimiter { +public: + HdfsMaxConnectionLimiter() = default; + ~HdfsMaxConnectionLimiter() = default; + + Status add_inflight_writer() { + int retry = config::hdfs_jni_write_max_retry_time; + std::unique_lock lck {_latch}; + while (!_cv.wait_for( + lck, std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds), [&]() { + return _cur_inflight_writer < config::max_inflight_hdfs_write_connection; + })) { + if (retry > 0) { + retry--; + continue; + } + } + if (_cur_inflight_writer >= config::max_inflight_hdfs_write_connection) { + return Status::InternalError( + "No avaiable hdfs connection, current connections num is {}", + _cur_inflight_writer); + } + + _cur_inflight_writer++; + return Status::OK(); + } + + void reduce_inflight_writer() { + std::unique_lock lck {_latch}; + auto origin = _cur_inflight_writer; + _cur_inflight_writer--; + if (origin == config::max_inflight_hdfs_write_connection) { + _cv.notify_one(); + } + } + +private: + std::mutex _latch; + std::condition_variable _cv; + int64_t _cur_inflight_writer {0}; +}; + static constexpr size_t MB = 1024 * 1024; +static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB + +// In practice, we've found that if the import frequency to HDFS is too fast, +// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI. +// For this, we should have a method to monitor how much JVM memory is currently being used. +// The HdfsWriteMemUsageRecorder class increments a recorded value during hdfsWrite when writing to HDFS. +// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile +// which ensures that the client's buffer is sent to the data node and returned with an acknowledgment before returning to the caller. +// HdfsWriteMemUsageRecorder would reduce the mem usage at that time. +// If the current usage exceeds the maximum set by the user, the current mem acquire would return failure. +// The caller could do sleep to wait for free memory. +class HdfsWriteMemUsageRecorder { +public: + HdfsWriteMemUsageRecorder() = default; + ~HdfsWriteMemUsageRecorder() = default; + size_t max_usage() const { + return static_cast<size_t>(max_jvm_heap_size() * + config::max_hdfs_wirter_jni_heap_usage_ratio); + } + Status acquire_memory(size_t memory_size) { +#ifdef USE_LIBHDFS3 + return Status::OK(); +#elif BE_TEST + return Status::OK(); +#else + + std::unique_lock lck {cur_memory_latch}; + cv.wait_for(lck, std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds), + [&]() { return cur_memory_comsuption + memory_size <= max_usage(); }); + if (cur_memory_comsuption + memory_size > max_usage()) { + return Status::InternalError( + "Run out of Jni jvm heap space, current limit size is {}, max heap size is {}, " + "ratio is {}", + max_usage(), max_jvm_heap_size(), config::max_hdfs_wirter_jni_heap_usage_ratio); + } + cur_memory_comsuption += memory_size; + return Status::OK(); +#endif + } + + void release_memory(size_t memory_size) { +#ifdef USE_LIBHDFS3 + return; +#elif BE_TEST Review Comment: return? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org