This is an automated email from the ASF dual-hosted git repository. dataroaring pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 2658914bc6e Pick "[fix](IO) Fix async close's raii and lazily load jni's jvm max heap value #34799" (#34815) 2658914bc6e is described below commit 2658914bc6e66c87b1aa7cfdef8970dedc0ff74b Author: AlexYue <yj976240...@gmail.com> AuthorDate: Tue May 14 14:08:53 2024 +0800 Pick "[fix](IO) Fix async close's raii and lazily load jni's jvm max heap value #34799" (#34815) --- be/src/io/fs/hdfs_file_writer.cpp | 2 +- be/src/io/fs/s3_file_writer.cpp | 2 +- be/src/runtime/exec_env_init.cpp | 2 - be/src/util/jni-util.cpp | 11 +++--- be/test/io/fs/hdfs_file_system_test.cpp | 66 +++++++++++++++------------------ 5 files changed, 37 insertions(+), 46 deletions(-) diff --git a/be/src/io/fs/hdfs_file_writer.cpp b/be/src/io/fs/hdfs_file_writer.cpp index cad3dd35dbe..01744c7bde3 100644 --- a/be/src/io/fs/hdfs_file_writer.cpp +++ b/be/src/io/fs/hdfs_file_writer.cpp @@ -144,7 +144,7 @@ HdfsFileWriter::HdfsFileWriter(Path path, std::shared_ptr<HdfsHandler> handler, HdfsFileWriter::~HdfsFileWriter() { if (_async_close_pack != nullptr) { // For thread safety - std::ignore = _async_close_pack->promise.get_future(); + std::ignore = _async_close_pack->future.get(); _async_close_pack = nullptr; } if (_hdfs_file) { diff --git a/be/src/io/fs/s3_file_writer.cpp b/be/src/io/fs/s3_file_writer.cpp index 644440923de..852b5258e85 100644 --- a/be/src/io/fs/s3_file_writer.cpp +++ b/be/src/io/fs/s3_file_writer.cpp @@ -106,7 +106,7 @@ S3FileWriter::S3FileWriter(std::shared_ptr<Aws::S3::S3Client> client, std::strin S3FileWriter::~S3FileWriter() { if (_async_close_pack != nullptr) { // For thread safety - std::ignore = _async_close_pack->promise.get_future(); + std::ignore = _async_close_pack->future.get(); _async_close_pack = nullptr; } // We won't do S3 abort operation in BE, we let s3 service do it own. diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp index ac45fa3d136..a181e607767 100644 --- a/be/src/runtime/exec_env_init.cpp +++ b/be/src/runtime/exec_env_init.cpp @@ -155,8 +155,6 @@ static void init_doris_metrics(const std::vector<StorePath>& store_paths) { DorisMetrics::instance()->initialize(init_system_metrics, disk_devices, network_interfaces); } - - ThreadPool* ExecEnv::non_block_close_thread_pool() { #ifdef BE_TEST return get_non_block_close_thread_pool(); diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp index 4ef0ffc7acb..3c8f2b0a30f 100644 --- a/be/src/util/jni-util.cpp +++ b/be/src/util/jni-util.cpp @@ -208,13 +208,14 @@ Status JniLocalFrame::push(JNIEnv* env, int max_local_ref) { } void JniUtil::parse_max_heap_memory_size_from_jvm(JNIEnv* env) { - // The start_be.sh would set JAVA_OPTS - std::string java_opts = getenv("JAVA_OPTS") ? getenv("JAVA_OPTS") : ""; + // The start_be.sh would set JAVA_OPTS inside LIBHDFS_OPTS + std::string java_opts = getenv("LIBHDFS_OPTS") ? getenv("LIBHDFS_OPTS") : ""; std::istringstream iss(java_opts); std::string opt; while (iss >> opt) { if (opt.find("-Xmx") == 0) { std::string xmxValue = opt.substr(4); + LOG(INFO) << "The max heap vaule is " << xmxValue; char unit = xmxValue.back(); xmxValue.pop_back(); long long value = std::stoll(xmxValue); @@ -247,6 +248,9 @@ size_t JniUtil::get_max_jni_heap_memory_size() { #if defined(USE_LIBHDFS3) || defined(BE_TEST) return std::numeric_limits<size_t>::max(); #else + static std::once_flag parse_max_heap_memory_size_from_jvm_flag; + std::call_once(parse_max_heap_memory_size_from_jvm_flag, parse_max_heap_memory_size_from_jvm, + tls_env_); return max_jvm_heap_memory_size_; #endif } @@ -267,9 +271,6 @@ Status JniUtil::GetJNIEnvSlowPath(JNIEnv** env) { // the hadoop libhdfs will do all the stuff SetEnvIfNecessary(); tls_env_ = getJNIEnv(); - if (nullptr != tls_env_) { - parse_max_heap_memory_size_from_jvm(tls_env_); - } #endif *env = tls_env_; return Status::OK(); diff --git a/be/test/io/fs/hdfs_file_system_test.cpp b/be/test/io/fs/hdfs_file_system_test.cpp index 51a308f45ed..b741a19bf79 100644 --- a/be/test/io/fs/hdfs_file_system_test.cpp +++ b/be/test/io/fs/hdfs_file_system_test.cpp @@ -43,43 +43,35 @@ TEST(HdfsFileSystemTest, Write) { st = local_fs->create_file(fmt::format("{}/mock_hdfs_file", test_dir), &local_file_writer); ASSERT_TRUE(st.ok()) << st; - sp->set_call_back( - "HdfsFileWriter::close::hdfsHSync", - [](auto&& args) { - auto* ret = try_any_cast_ret<int>(args); - ret->first = 0; // noop, return success - ret->second = true; - }); - - sp->set_call_back( - "HdfsFileWriter::close::hdfsCloseFile", - [&](auto&& args) { - auto st = local_file_writer->close(); - ASSERT_TRUE(st.ok()) << st; - auto* ret = try_any_cast_ret<int>(args); - ret->first = 0; // return success - ret->second = true; - }); - - sp->set_call_back( - "HdfsFileWriter::append_hdfs_file::hdfsWrite", - [&](auto&& args) { - auto content = try_any_cast<std::string_view>(args[0]); - auto st = local_file_writer->append({content.data(), content.size()}); - ASSERT_TRUE(st.ok()) << st; - auto* ret = try_any_cast_ret<int>(args); - ret->first = content.size(); // return bytes written - ret->second = true; - }); - - sp->set_call_back( - "HdfsFileWriter::finalize::hdfsFlush", - [](auto&& args) { - auto* ret = try_any_cast_ret<int>(args); - ret->first = 0; // noop, return success - ret->second = true; - }); - + sp->set_call_back("HdfsFileWriter::close::hdfsHSync", [](auto&& args) { + auto* ret = try_any_cast_ret<int>(args); + ret->first = 0; // noop, return success + ret->second = true; + }); + + sp->set_call_back("HdfsFileWriter::close::hdfsCloseFile", [&](auto&& args) { + auto st = local_file_writer->close(); + ASSERT_TRUE(st.ok()) << st; + auto* ret = try_any_cast_ret<int>(args); + ret->first = 0; // return success + ret->second = true; + }); + + sp->set_call_back("HdfsFileWriter::append_hdfs_file::hdfsWrite", [&](auto&& args) { + auto content = try_any_cast<std::string_view>(args[0]); + auto st = local_file_writer->append({content.data(), content.size()}); + ASSERT_TRUE(st.ok()) << st; + auto* ret = try_any_cast_ret<int>(args); + ret->first = content.size(); // return bytes written + ret->second = true; + }); + + sp->set_call_back("HdfsFileWriter::finalize::hdfsFlush", [](auto&& args) { + auto* ret = try_any_cast_ret<int>(args); + ret->first = 0; // noop, return success + ret->second = true; + }); + Defer defer {[&]() { sp->clear_call_back("HdfsFileWriter::close::hdfsHSync"); sp->clear_call_back("HdfsFileWriter::close::hdfsCloseFile"); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org