platoneko commented on code in PR #34354: URL: https://github.com/apache/doris/pull/34354#discussion_r1597342914
########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -45,6 +49,68 @@ bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created"); bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_written"); static constexpr size_t MB = 1024 * 1024; +static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB + +// In practice, we've found that if the import frequency to HDFS is too fast, +// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI. +// For this, we should have a method to monitor how much JVM memory is currently being used. +// The HdfsWriteMemUsageRecorder class increments a recorded value during hdfsWrite when writing to HDFS. +// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile +// which ensures that the client's buffer is sent to the data node and returned with an acknowledgment before returning to the caller. +// HdfsWriteMemUsageRecorder would reduce the mem usage at that time. +// If the current usage exceeds the maximum set by the user, the current mem acquire would return failure. +// The caller could do sleep to wait for free memory. +class HdfsWriteMemUsageRecorder { +public: + HdfsWriteMemUsageRecorder() = default; + ~HdfsWriteMemUsageRecorder() = default; + size_t max_usage() const { + return static_cast<size_t>(max_jvm_heap_size() * + config::max_hdfs_wirter_jni_heap_usage_ratio); + } + Status acquire_memory(size_t memory_size) { +#ifdef USE_LIBHDFS3 + return Status::OK(); +#elif BE_TEST + return Status::OK(); +#else + + std::unique_lock lck {cur_memory_latch}; + cv.wait_for(lck, std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds), + [&]() { return cur_memory_comsuption + memory_size <= max_usage(); }); + if (cur_memory_comsuption + memory_size > max_usage()) { + return Status::InternalError( + "Run out of Jni jvm heap space, current limit size is {}, max heap size is {}, " + "ratio is {}", + max_usage(), max_jvm_heap_size(), config::max_hdfs_wirter_jni_heap_usage_ratio); + } + cur_memory_comsuption += memory_size; + return Status::OK(); +#endif + } + + void release_memory(size_t memory_size) { +#ifdef USE_LIBHDFS3 + return; +#elif BE_TEST +#else + std::unique_lock lck {cur_memory_latch}; + size_t origin_size = cur_memory_comsuption; + cur_memory_comsuption -= memory_size; + if (cur_memory_comsuption <= max_usage() && origin_size > max_usage()) { + cv.notify_one(); Review Comment: notify_all ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -46,6 +49,52 @@ bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_writt static constexpr size_t MB = 1024 * 1024; +// In practice, we've found that if the import frequency to HDFS is too fast, +// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI. +// For this, we should have a method to monitor how much JVM memory is currently being used. +// The HdfsWriteRateLimit class increments a recorded value during hdfsWrite when writing to HDFS. +// When hdfsCloseFile is called, all related memory in the JVM will be invalidated, so the recorded value can be decreased at that time. +// If the current usage exceeds the maximum set by the user, the current write will sleep. +// If the number of sleeps exceeds the number specified by the user, then the current write is considered to have failed +class HdfsWriteRateLimit { +public: + HdfsWriteRateLimit() + : max_jvm_heap_size(JniUtil::get_max_jni_heap_memory_size()), + cur_memory_comsuption(0) {} + size_t max_usage() const { + return static_cast<size_t>(max_jvm_heap_size * config::max_hdfs_jni_heap_usage_ratio); + } + Status do_rate_limit(size_t memory_size) { +#ifdef USE_LIBHDFS3 + return Status::OK(); +#endif + for (int retry_time = config::hsfs_jni_write_max_retry_time; retry_time != 0; Review Comment: ```suggestion for (int retry_time = config::hdfs_jni_write_max_retry_time; retry_time > 0; ``` ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -44,7 +47,85 @@ bvar::Adder<uint64_t> hdfs_bytes_written_total("hdfs_file_writer_bytes_written") bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created"); bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_written"); +class HdfsMaxConnectionLimiter { +public: + HdfsMaxConnectionLimiter() = default; + ~HdfsMaxConnectionLimiter() = default; + + void add_inflight_writer() { + std::unique_lock lck {_latch}; + _cv.wait(lck, [&]() { + return _cur_inflight_writer < config::max_inflight_hdfs_write_connection; + }); + _cur_inflight_writer++; + } + + void reduce_inflight_writer() { + std::unique_lock lck {_latch}; + _cur_inflight_writer--; + _cv.notify_one(); + } + +private: + std::mutex _latch; + std::condition_variable _cv; + int64_t _cur_inflight_writer {0}; +}; + static constexpr size_t MB = 1024 * 1024; +static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB + +// In practice, we've found that if the import frequency to HDFS is too fast, +// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI. +// For this, we should have a method to monitor how much JVM memory is currently being used. +// The HdfsWriteMemUsageRecorder class increments a recorded value during hdfsWrite when writing to HDFS. +// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile +// which ensures that the client's buffer is sent to the data node and returned with an acknowledgment before returning to the caller. +// HdfsWriteMemUsageRecorder would reduce the mem usage at that time. +// If the current usage exceeds the maximum set by the user, the current mem acquire would return failure. +// The caller could do sleep to wait for free memory. +class HdfsWriteMemUsageRecorder { +public: + HdfsWriteMemUsageRecorder() + : max_jvm_heap_size(JniUtil::get_max_jni_heap_memory_size()), cur_memory_comsuption(0) { + LOG_INFO("the max jvm heap size is {}", max_jvm_heap_size); + } + ~HdfsWriteMemUsageRecorder() = default; + size_t max_usage() const { + return static_cast<size_t>(max_jvm_heap_size * + config::max_hdfs_wirter_jni_heap_usage_ratio); + } + Status acquire_memory(size_t memory_size) { +#ifdef USE_LIBHDFS3 + return Status::OK(); +#elif BE_TEST + return Status::OK(); +#else + if (cur_memory_comsuption + memory_size > max_usage()) { + return Status::InternalError("Run out of Jni jvm heap space, current limit size is {}", Review Comment: ```suggestion return Status::InternalError<false>("Run out of Jni jvm heap space, current limit size is {}", ``` ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -71,11 +137,52 @@ HdfsFileWriter::~HdfsFileWriter() { if (_hdfs_file) { SCOPED_BVAR_LATENCY(hdfs_bvar::hdfs_close_latency); hdfsCloseFile(_hdfs_handler->hdfs_fs, _hdfs_file); + _flush_and_reset_approximate_jni_buffer_size(); } hdfs_file_being_written << -1; } +void HdfsFileWriter::_flush_and_reset_approximate_jni_buffer_size() { + g_hdfs_write_rate_limiter.release_memory(_approximate_jni_buffer_size); + _approximate_jni_buffer_size = 0; +} + +Status HdfsFileWriter::_acquire_jni_memory(size_t size) { +#ifdef USE_LIBHDFS3 + return Status::OK(); +#else + size_t actual_size = std::max(CLIENT_WRITE_PACKET_SIZE, size); + if (auto st = g_hdfs_write_rate_limiter.acquire_memory(actual_size); !st.ok()) { + int ret; Review Comment: 在 HFlush 前判断下 _approximate_jni_buffer_size > 0 ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -46,6 +49,52 @@ bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_writt static constexpr size_t MB = 1024 * 1024; +// In practice, we've found that if the import frequency to HDFS is too fast, +// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI. +// For this, we should have a method to monitor how much JVM memory is currently being used. +// The HdfsWriteRateLimit class increments a recorded value during hdfsWrite when writing to HDFS. +// When hdfsCloseFile is called, all related memory in the JVM will be invalidated, so the recorded value can be decreased at that time. +// If the current usage exceeds the maximum set by the user, the current write will sleep. +// If the number of sleeps exceeds the number specified by the user, then the current write is considered to have failed +class HdfsWriteRateLimit { +public: + HdfsWriteRateLimit() + : max_jvm_heap_size(JniUtil::get_max_jni_heap_memory_size()), + cur_memory_comsuption(0) {} + size_t max_usage() const { + return static_cast<size_t>(max_jvm_heap_size * config::max_hdfs_jni_heap_usage_ratio); + } + Status do_rate_limit(size_t memory_size) { +#ifdef USE_LIBHDFS3 + return Status::OK(); +#endif + for (int retry_time = config::hsfs_jni_write_max_retry_time; retry_time != 0; + retry_time--) { + if (cur_memory_comsuption + memory_size > max_usage()) { + std::this_thread::sleep_for( + std::chrono::milliseconds(config::max_hdfs_jni_write_sleep_milliseconds)); + continue; + } + cur_memory_comsuption.fetch_add(memory_size); + return Status::OK(); + } + return Status::InternalError("Run out of Jni jvm heap space"); + } + + void release_jni_memory(size_t memory_size) { +#ifdef USE_LIBHDFS3 + return; +#endif + cur_memory_comsuption.fetch_sub(memory_size); Review Comment: ```suggestion cur_memory_comsuption.fetch_sub(memory_size, std::memory_order_relaxed); ``` ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -45,6 +49,68 @@ bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created"); bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_written"); static constexpr size_t MB = 1024 * 1024; +static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB + +// In practice, we've found that if the import frequency to HDFS is too fast, +// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI. +// For this, we should have a method to monitor how much JVM memory is currently being used. +// The HdfsWriteMemUsageRecorder class increments a recorded value during hdfsWrite when writing to HDFS. +// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile +// which ensures that the client's buffer is sent to the data node and returned with an acknowledgment before returning to the caller. +// HdfsWriteMemUsageRecorder would reduce the mem usage at that time. +// If the current usage exceeds the maximum set by the user, the current mem acquire would return failure. +// The caller could do sleep to wait for free memory. +class HdfsWriteMemUsageRecorder { +public: + HdfsWriteMemUsageRecorder() = default; + ~HdfsWriteMemUsageRecorder() = default; + size_t max_usage() const { + return static_cast<size_t>(max_jvm_heap_size() * + config::max_hdfs_wirter_jni_heap_usage_ratio); + } + Status acquire_memory(size_t memory_size) { +#ifdef USE_LIBHDFS3 + return Status::OK(); +#elif BE_TEST + return Status::OK(); +#else + + std::unique_lock lck {cur_memory_latch}; + cv.wait_for(lck, std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds), + [&]() { return cur_memory_comsuption + memory_size <= max_usage(); }); + if (cur_memory_comsuption + memory_size > max_usage()) { + return Status::InternalError( Review Comment: `lck.unlock()` before return `InternalError` ########## be/src/io/fs/hdfs_file_writer.cpp: ########## @@ -45,6 +49,68 @@ bvar::Adder<uint64_t> hdfs_file_created_total("hdfs_file_writer_file_created"); bvar::Adder<uint64_t> hdfs_file_being_written("hdfs_file_writer_file_being_written"); static constexpr size_t MB = 1024 * 1024; +static constexpr size_t CLIENT_WRITE_PACKET_SIZE = 64 * 1024; // 64 KB + +// In practice, we've found that if the import frequency to HDFS is too fast, +// it can cause an OutOfMemoryError (OOM) in the JVM started by the JNI. +// For this, we should have a method to monitor how much JVM memory is currently being used. +// The HdfsWriteMemUsageRecorder class increments a recorded value during hdfsWrite when writing to HDFS. +// The HDFS client will blockingly call hdfsHsync or hdfsCloseFile +// which ensures that the client's buffer is sent to the data node and returned with an acknowledgment before returning to the caller. +// HdfsWriteMemUsageRecorder would reduce the mem usage at that time. +// If the current usage exceeds the maximum set by the user, the current mem acquire would return failure. +// The caller could do sleep to wait for free memory. +class HdfsWriteMemUsageRecorder { +public: + HdfsWriteMemUsageRecorder() = default; + ~HdfsWriteMemUsageRecorder() = default; + size_t max_usage() const { + return static_cast<size_t>(max_jvm_heap_size() * + config::max_hdfs_wirter_jni_heap_usage_ratio); + } + Status acquire_memory(size_t memory_size) { +#ifdef USE_LIBHDFS3 + return Status::OK(); +#elif BE_TEST + return Status::OK(); +#else + + std::unique_lock lck {cur_memory_latch}; + cv.wait_for(lck, std::chrono::milliseconds(config::hdfs_jni_write_sleep_milliseconds), + [&]() { return cur_memory_comsuption + memory_size <= max_usage(); }); + if (cur_memory_comsuption + memory_size > max_usage()) { + return Status::InternalError( Review Comment: Maybe stacktrace should be disabled here, use `InternalError<false>` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org