This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 13fb69550a [improvement](kerberos) disable hdfs fs handle cache to 
renew kerberos ticket at fix interval (#21265)
13fb69550a is described below

commit 13fb69550a18c99d4bfb64cc001a024f3053683b
Author: Mingyu Chen <morning...@163.com>
AuthorDate: Tue Jul 4 17:13:34 2023 +0800

    [improvement](kerberos) disable hdfs fs handle cache to renew kerberos 
ticket at fix interval (#21265)
    
    Add a new BE config `kerberos_ticket_lifetime_seconds`, default is 86400.
    Better set it same as the value of `ticket_lifetime` in `krb5.conf`
    If a HDFS fs handle in cache is live longer than HALF of this time, it will 
be set as invalid and recreated.
    And the kerberos ticket will be renewed.
---
 be/src/common/config.cpp          |  1 +
 be/src/common/config.h            |  4 ++++
 be/src/io/fs/hdfs_file_system.cpp | 49 +++++++++++++++++++++------------------
 be/src/io/fs/hdfs_file_system.h   | 26 +++++++++++++++++----
 be/src/io/hdfs_builder.cpp        |  1 +
 5 files changed, 54 insertions(+), 27 deletions(-)

diff --git a/be/src/common/config.cpp b/be/src/common/config.cpp
index 24c5d5f32a..7186d8c970 100644
--- a/be/src/common/config.cpp
+++ b/be/src/common/config.cpp
@@ -1028,6 +1028,7 @@ DEFINE_Int64(max_external_file_meta_cache_num, "20000");
 DEFINE_Int32(rocksdb_max_write_buffer_number, "5");
 
 DEFINE_Bool(allow_invalid_decimalv2_literal, "false");
+DEFINE_mInt64(kerberos_expiration_time_seconds, "43200");
 
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 7601eade16..6230e4caf2 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -1045,6 +1045,10 @@ DECLARE_Int32(rocksdb_max_write_buffer_number);
 
 // Allow invalid decimalv2 literal for compatible with old version. Recommend 
set it false strongly.
 DECLARE_mBool(allow_invalid_decimalv2_literal);
+// the max expiration time of kerberos ticket.
+// If a hdfs filesytem with kerberos authentication live longer
+// than this time, it will be expired.
+DECLARE_mInt64(kerberos_expiration_time_seconds);
 
 #ifdef BE_TEST
 // test s3
diff --git a/be/src/io/fs/hdfs_file_system.cpp 
b/be/src/io/fs/hdfs_file_system.cpp
index 745a7736f9..775754bd4d 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -78,7 +78,7 @@ private:
     HdfsFileSystemCache() = default;
 
     uint64 _hdfs_hash_code(const THdfsParams& hdfs_params);
-    Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs);
+    Status _create_fs(const THdfsParams& hdfs_params, hdfsFS* fs, bool* 
is_kerberos);
     void _clean_invalid();
     void _clean_oldest();
 };
@@ -423,9 +423,11 @@ HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
 // ************* HdfsFileSystemCache ******************
 int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;
 
-Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* 
fs) {
+Status HdfsFileSystemCache::_create_fs(const THdfsParams& hdfs_params, hdfsFS* 
fs,
+                                       bool* is_kerberos) {
     HDFSCommonBuilder builder;
     RETURN_IF_ERROR(createHDFSBuilder(hdfs_params, &builder));
+    *is_kerberos = builder.is_need_kinit();
     hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
     if (hdfs_fs == nullptr) {
         return Status::IOError("faield to connect to hdfs {}: {}", 
hdfs_params.fs_name,
@@ -467,30 +469,33 @@ Status HdfsFileSystemCache::get_connection(const 
THdfsParams& hdfs_params,
         auto it = _cache.find(hash_code);
         if (it != _cache.end()) {
             HdfsFileSystemHandle* handle = it->second.get();
-            if (handle->invalid()) {
-                hdfsFS hdfs_fs = nullptr;
-                RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
-                *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
-            } else {
+            if (!handle->invalid()) {
                 handle->inc_ref();
                 *fs_handle = handle;
+                return Status::OK();
             }
+            // fs handle is invalid, erase it.
+            _cache.erase(it);
+            LOG(INFO) << "erase the hdfs handle, fs name: " << 
hdfs_params.fs_name;
+        }
+
+        // not find in cache, or fs handle is invalid
+        // create a new one and try to put it into cache
+        hdfsFS hdfs_fs = nullptr;
+        bool is_kerberos = false;
+        RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs, &is_kerberos));
+        if (_cache.size() >= MAX_CACHE_HANDLE) {
+            _clean_invalid();
+            _clean_oldest();
+        }
+        if (_cache.size() < MAX_CACHE_HANDLE) {
+            std::unique_ptr<HdfsFileSystemHandle> handle =
+                    std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true, 
is_kerberos);
+            handle->inc_ref();
+            *fs_handle = handle.get();
+            _cache[hash_code] = std::move(handle);
         } else {
-            hdfsFS hdfs_fs = nullptr;
-            RETURN_IF_ERROR(_create_fs(hdfs_params, &hdfs_fs));
-            if (_cache.size() >= MAX_CACHE_HANDLE) {
-                _clean_invalid();
-                _clean_oldest();
-            }
-            if (_cache.size() < MAX_CACHE_HANDLE) {
-                std::unique_ptr<HdfsFileSystemHandle> handle =
-                        std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true);
-                handle->inc_ref();
-                *fs_handle = handle.get();
-                _cache[hash_code] = std::move(handle);
-            } else {
-                *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
-            }
+            *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false, is_kerberos);
         }
     }
     return Status::OK();
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index d542cd1ba7..bd28ec73c2 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -27,6 +27,7 @@
 #include <string>
 #include <vector>
 
+#include "common/config.h"
 #include "common/status.h"
 #include "io/fs/file_reader_writer_fwd.h"
 #include "io/fs/hdfs.h"
@@ -41,14 +42,21 @@ struct FileInfo;
 
 class HdfsFileSystemHandle {
 public:
-    HdfsFileSystemHandle(hdfsFS fs, bool cached)
-            : hdfs_fs(fs), from_cache(cached), _ref_cnt(0), 
_last_access_time(0), _invalid(false) {}
+    HdfsFileSystemHandle(hdfsFS fs, bool cached, bool is_kerberos)
+            : hdfs_fs(fs),
+              from_cache(cached),
+              _is_kerberos(is_kerberos),
+              _ref_cnt(0),
+              _create_time(_now()),
+              _last_access_time(0),
+              _invalid(false) {}
 
     ~HdfsFileSystemHandle() {
         DCHECK(_ref_cnt == 0);
         if (hdfs_fs != nullptr) {
-            // Even if there is an error, the resources associated with the 
hdfsFS will be freed.
-            hdfsDisconnect(hdfs_fs);
+            // DO NOT call hdfsDisconnect(), or we will meet "Filesystem 
closed"
+            // even if we create a new one
+            // hdfsDisconnect(hdfs_fs);
         }
         hdfs_fs = nullptr;
     }
@@ -67,7 +75,11 @@ public:
 
     int ref_cnt() { return _ref_cnt; }
 
-    bool invalid() { return _invalid; }
+    bool invalid() {
+        return _invalid ||
+               (_is_kerberos &&
+                _now() - _create_time.load() > 
config::kerberos_expiration_time_seconds * 1000);
+    }
 
     void set_invalid() { _invalid = true; }
 
@@ -77,8 +89,12 @@ public:
     const bool from_cache;
 
 private:
+    const bool _is_kerberos;
     // the number of referenced client
     std::atomic<int> _ref_cnt;
+    // For kerberos authentication, we need to save create time so that
+    // we can know if the kerberos ticket is expired.
+    std::atomic<uint64_t> _create_time;
     // HdfsFileSystemCache try to remove the oldest handler when the cache is 
full
     std::atomic<uint64_t> _last_access_time;
     // Client will set invalid if error thrown, and HdfsFileSystemCache will 
not reuse this handler
diff --git a/be/src/io/hdfs_builder.cpp b/be/src/io/hdfs_builder.cpp
index 73edc326c3..19986f76e4 100644
--- a/be/src/io/hdfs_builder.cpp
+++ b/be/src/io/hdfs_builder.cpp
@@ -72,6 +72,7 @@ Status HDFSCommonBuilder::run_kinit() {
 #endif
     hdfsBuilderConfSetStr(hdfs_builder, 
"hadoop.security.kerberos.ticket.cache.path",
                           ticket_path.c_str());
+    LOG(INFO) << "finished to run kinit command: " << 
fmt::to_string(kinit_command);
     return Status::OK();
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to