This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push: new 13fb69550a [improvement](kerberos) disable hdfs fs handle cache to renew kerberos ticket at fix interval (#21265) 13fb69550a is described below commit 13fb69550a18c99d4bfb64cc001a024f3053683b Author: Mingyu Chen <morning...@163.com> AuthorDate: Tue Jul 4 17:13:34 2023 +0800 [improvement](kerberos) disable hdfs fs handle cache to renew kerberos ticket at fix interval (#21265) Add a new BE config `kerberos_ticket_lifetime_seconds`, default is 86400. Better set it same as the value of `ticket_lifetime` in `krb5.conf` If a HDFS fs handle in cache is live longer than HALF of this time, it will be set as invalid and recreated. And the kerberos ticket will be renewed. --- be/src/common/config.cpp | 1 + be/src/common/config.h | 4 ++++ be/src/io/fs/hdfs_file_system.cpp | 49 +++++++++++++++++++++------------------ be/src/io/fs/hdfs_file_system.h | 26 +++++++++++++++++---- be/src/io/hdfs_builder.cpp | 1 + 5 files changed, 54 insertions(+), 27 deletions(-) diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp index 24c5d5f32a..7186d8c970 100644 --- a/be/src/common/config.cpp +++ b/be/src/common/config.cpp @@ -1028,6 +1028,7 @@ DEFINE_Int64(max_external_file_meta_cache_num, "20000"); DEFINE_Int32(rocksdb_max_write_buffer_number, "5"); DEFINE_Bool(allow_invalid_decimalv2_literal, "false"); +DEFINE_mInt64(kerberos_expiration_time_seconds, "43200"); #ifdef BE_TEST // test s3 diff --git a/be/src/common/config.h b/be/src/common/config.h index 7601eade16..6230e4caf2 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -1045,6 +1045,10 @@ DECLARE_Int32(rocksdb_max_write_buffer_number); // Allow invalid decimalv2 literal for compatible with old version. Recommend set it false strongly. DECLARE_mBool(allow_invalid_decimalv2_literal); +// the max expiration time of kerberos ticket. +// If a hdfs filesytem with kerberos authentication live longer +// than this time, it will be expired. +DECLARE_mInt64(kerberos_expiration_time_seconds); #ifdef BE_TEST // test s3 diff --git a/be/src/io/fs/hdfs_file_system.cpp b/be/src/io/fs/hdfs_file_system.cpp index 745a7736f9..775754bd4d 100644 --- a/be/src/io/fs/hdfs_file_system.cpp +++ b/be/src/io/fs/hdfs_file_system.cpp @@ -78,7 +78,7 @@ private: HdfsFileSystemCache() = default; uint64 _hdfs_hash_code(const THdfsParams& hdfs_params); - Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs); + Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, bool* is_kerberos); void _clean_invalid(); void _clean_oldest(); }; @@ -423,9 +423,11 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() { // ************* HdfsFileSystemCache ****************** int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64; -Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs) { +Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, + bool* is_kerberos) { HDFSCommonBuilder builder; RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder)); + *is_kerberos = builder.is_need_kinit(); hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get()); if (hdfs_fs == nullptr) { return Status::IOError("faield to connect to hdfs {}: {}", hdfs_params.fs_name, @@ -467,30 +469,33 @@ Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params, auto it = _cache.find(hash_code); if (it != _cache.end()) { HdfsFileSystemHandle* handle = it->second.get(); - if (handle->invalid()) { - hdfsFS hdfs_fs = nullptr; - RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs)); - *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false); - } else { + if (!handle->invalid()) { handle->inc_ref(); *fs_handle = handle; + return Status::OK(); } + // fs handle is invalid, erase it. + _cache.erase(it); + LOG(INFO) << "erase the hdfs handle, fs name: " << hdfs_params.fs_name; + } + + // not find in cache, or fs handle is invalid + // create a new one and try to put it into cache + hdfsFS hdfs_fs = nullptr; + bool is_kerberos = false; + RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs, &is_kerberos)); + if (_cache.size() >= MAX_CACHE_HANDLE) { + _clean_invalid(); + _clean_oldest(); + } + if (_cache.size() < MAX_CACHE_HANDLE) { + std::unique_ptr<HdfsFileSystemHandle> handle = + std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true, is_kerberos); + handle->inc_ref(); + *fs_handle = handle.get(); + _cache[hash_code] = std::move(handle); } else { - hdfsFS hdfs_fs = nullptr; - RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs)); - if (_cache.size() >= MAX_CACHE_HANDLE) { - _clean_invalid(); - _clean_oldest(); - } - if (_cache.size() < MAX_CACHE_HANDLE) { - std::unique_ptr<HdfsFileSystemHandle> handle = - std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true); - handle->inc_ref(); - *fs_handle = handle.get(); - _cache[hash_code] = std::move(handle); - } else { - *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false); - } + *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false, is_kerberos); } } return Status::OK(); diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h index d542cd1ba7..bd28ec73c2 100644 --- a/be/src/io/fs/hdfs_file_system.h +++ b/be/src/io/fs/hdfs_file_system.h @@ -27,6 +27,7 @@ #include <string> #include <vector> +#include "common/config.h" #include "common/status.h" #include "io/fs/file_reader_writer_fwd.h" #include "io/fs/hdfs.h" @@ -41,14 +42,21 @@ struct FileInfo; class HdfsFileSystemHandle { public: - HdfsFileSystemHandle(hdfsFS fs, bool cached) - : hdfs_fs(fs), from_cache(cached), _ref_cnt(0), _last_access_time(0), _invalid(false) {} + HdfsFileSystemHandle(hdfsFS fs, bool cached, bool is_kerberos) + : hdfs_fs(fs), + from_cache(cached), + _is_kerberos(is_kerberos), + _ref_cnt(0), + _create_time(_now()), + _last_access_time(0), + _invalid(false) {} ~HdfsFileSystemHandle() { DCHECK(_ref_cnt == 0); if (hdfs_fs != nullptr) { - // Even if there is an error, the resources associated with the hdfsFS will be freed. - hdfsDisconnect(hdfs_fs); + // DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed" + // even if we create a new one + // hdfsDisconnect(hdfs_fs); } hdfs_fs = nullptr; } @@ -67,7 +75,11 @@ public: int ref_cnt() { return _ref_cnt; } - bool invalid() { return _invalid; } + bool invalid() { + return _invalid || + (_is_kerberos && + _now() - _create_time.load() > config::kerberos_expiration_time_seconds * 1000); + } void set_invalid() { _invalid = true; } @@ -77,8 +89,12 @@ public: const bool from_cache; private: + const bool _is_kerberos; // the number of referenced client std::atomic<int> _ref_cnt; + // For kerberos authentication, we need to save create time so that + // we can know if the kerberos ticket is expired. + std::atomic<uint64_t> _create_time; // HdfsFileSystemCache try to remove the oldest handler when the cache is full std::atomic<uint64_t> _last_access_time; // Client will set invalid if error thrown, and HdfsFileSystemCache will not reuse this handler diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp index 73edc326c3..19986f76e4 100644 --- a/be/src/io/hdfs_builder.cpp +++ b/be/src/io/hdfs_builder.cpp @@ -72,6 +72,7 @@ Status HDFSCommonBuilder::run_kinit() { #endif hdfsBuilderConfSetStr(hdfs_builder, "hadoop.security.kerberos.ticket.cache.path", ticket_path.c_str()); + LOG(INFO) << "finished to run kinit command: " << fmt::to_string(kinit_command); return Status::OK(); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org