This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 417431fd83e [Enhancement](hdfs-file-system) Change fs_handler ptr to
shared_ptr and remove ref count operations. (#34049)
417431fd83e is described below
commit 417431fd83e6a927345a7384113981e36453a6f1
Author: Qi Chen <[email protected]>
AuthorDate: Sun Apr 28 19:45:30 2024 +0800
[Enhancement](hdfs-file-system) Change fs_handler ptr to shared_ptr and
remove ref count operations. (#34049)
Backport #33959.
---
be/src/io/fs/hdfs_file_system.cpp | 39 +++++++++++++--------------------------
be/src/io/fs/hdfs_file_system.h | 26 +++++++-------------------
2 files changed, 20 insertions(+), 45 deletions(-)
diff --git a/be/src/io/fs/hdfs_file_system.cpp
b/be/src/io/fs/hdfs_file_system.cpp
index e41671e493b..1340a3078e2 100644
--- a/be/src/io/fs/hdfs_file_system.cpp
+++ b/be/src/io/fs/hdfs_file_system.cpp
@@ -71,11 +71,11 @@ public:
// This function is thread-safe
Status get_connection(const THdfsParams& hdfs_params, const std::string&
fs_name,
- HdfsFileSystemHandle** fs_handle);
+ std::shared_ptr<HdfsFileSystemHandle>* fs_handle);
private:
std::mutex _lock;
- std::unordered_map<uint64, std::unique_ptr<HdfsFileSystemHandle>> _cache;
+ std::unordered_map<uint64, std::shared_ptr<HdfsFileSystemHandle>> _cache;
HdfsFileSystemCache() = default;
@@ -148,15 +148,7 @@ HdfsFileSystem::HdfsFileSystem(const THdfsParams&
hdfs_params, std::string id,
}
}
-HdfsFileSystem::~HdfsFileSystem() {
- if (_fs_handle != nullptr) {
- if (_fs_handle->from_cache) {
- _fs_handle->dec_ref();
- } else {
- delete _fs_handle;
- }
- }
-}
+HdfsFileSystem::~HdfsFileSystem() = default;
Status HdfsFileSystem::connect_impl() {
RETURN_IF_ERROR(
@@ -384,10 +376,6 @@ Status HdfsFileSystem::download_impl(const Path&
remote_file, const Path& local_
return local_writer->close();
}
-HdfsFileSystemHandle* HdfsFileSystem::get_handle() {
- return _fs_handle;
-}
-
// ************* HdfsFileSystemCache ******************
int HdfsFileSystemCache::MAX_CACHE_HANDLE = 64;
@@ -406,7 +394,7 @@ Status HdfsFileSystemCache::_create_fs(const THdfsParams&
hdfs_params, const std
void HdfsFileSystemCache::_clean_invalid() {
std::vector<uint64> removed_handle;
for (auto& item : _cache) {
- if (item.second->invalid() && item.second->ref_cnt() == 0) {
+ if (item.second.use_count() == 1 && item.second->invalid()) {
removed_handle.emplace_back(item.first);
}
}
@@ -419,7 +407,7 @@ void HdfsFileSystemCache::_clean_oldest() {
uint64_t oldest_time = ULONG_MAX;
uint64 oldest = 0;
for (auto& item : _cache) {
- if (item.second->ref_cnt() == 0 && item.second->last_access_time() <
oldest_time) {
+ if (item.second.use_count() == 1 && item.second->last_access_time() <
oldest_time) {
oldest_time = item.second->last_access_time();
oldest = item.first;
}
@@ -429,16 +417,16 @@ void HdfsFileSystemCache::_clean_oldest() {
Status HdfsFileSystemCache::get_connection(const THdfsParams& hdfs_params,
const std::string& fs_name,
- HdfsFileSystemHandle** fs_handle) {
+
std::shared_ptr<HdfsFileSystemHandle>* fs_handle) {
uint64 hash_code = _hdfs_hash_code(hdfs_params, fs_name);
{
std::lock_guard<std::mutex> l(_lock);
auto it = _cache.find(hash_code);
if (it != _cache.end()) {
- HdfsFileSystemHandle* handle = it->second.get();
+ std::shared_ptr<HdfsFileSystemHandle> handle = it->second;
if (!handle->invalid()) {
- handle->inc_ref();
- *fs_handle = handle;
+ handle->update_last_access_time();
+ *fs_handle = std::move(handle);
return Status::OK();
}
// fs handle is invalid, erase it.
@@ -455,13 +443,12 @@ Status HdfsFileSystemCache::get_connection(const
THdfsParams& hdfs_params,
_clean_oldest();
}
if (_cache.size() < MAX_CACHE_HANDLE) {
- std::unique_ptr<HdfsFileSystemHandle> handle =
- std::make_unique<HdfsFileSystemHandle>(hdfs_fs, true);
- handle->inc_ref();
- *fs_handle = handle.get();
+ auto handle = std::make_shared<HdfsFileSystemHandle>(hdfs_fs,
true);
+ handle->update_last_access_time();
+ *fs_handle = handle;
_cache[hash_code] = std::move(handle);
} else {
- *fs_handle = new HdfsFileSystemHandle(hdfs_fs, false);
+ *fs_handle = std::make_shared<HdfsFileSystemHandle>(hdfs_fs,
false);
}
}
return Status::OK();
diff --git a/be/src/io/fs/hdfs_file_system.h b/be/src/io/fs/hdfs_file_system.h
index db854cafa9e..74d098004ab 100644
--- a/be/src/io/fs/hdfs_file_system.h
+++ b/be/src/io/fs/hdfs_file_system.h
@@ -45,13 +45,11 @@ public:
HdfsFileSystemHandle(hdfsFS fs, bool cached)
: hdfs_fs(fs),
from_cache(cached),
- _ref_cnt(0),
_create_time(_now()),
_last_access_time(0),
_invalid(false) {}
~HdfsFileSystemHandle() {
- DCHECK(_ref_cnt == 0) << _ref_cnt;
if (hdfs_fs != nullptr) {
// DO NOT call hdfsDisconnect(), or we will meet "Filesystem
closed"
// even if we create a new one
@@ -62,18 +60,14 @@ public:
int64_t last_access_time() { return _last_access_time; }
- void inc_ref() {
- _ref_cnt++;
- _last_access_time = _now();
- }
-
- void dec_ref() {
- _ref_cnt--;
- _last_access_time = _now();
+ void update_last_access_time() {
+ if (from_cache) {
+ _last_access_time =
std::chrono::duration_cast<std::chrono::milliseconds>(
+
std::chrono::system_clock::now().time_since_epoch())
+ .count();
+ }
}
- int ref_cnt() { return _ref_cnt; }
-
bool invalid() { return _invalid; }
void set_invalid() { _invalid = true; }
@@ -84,8 +78,6 @@ public:
const bool from_cache;
private:
- // the number of referenced client
- std::atomic<int> _ref_cnt;
// For kerberos authentication, we need to save create time so that
// we can know if the kerberos ticket is expired.
std::atomic<uint64_t> _create_time;
@@ -109,8 +101,6 @@ public:
~HdfsFileSystem() override;
- HdfsFileSystemHandle* get_handle();
-
friend class HdfsFileHandleCache;
protected:
@@ -143,9 +133,7 @@ private:
RuntimeProfile* profile);
const THdfsParams& _hdfs_params;
std::string _fs_name;
- // do not use std::shared_ptr or std::unique_ptr
- // _fs_handle is managed by HdfsFileSystemCache
- HdfsFileSystemHandle* _fs_handle = nullptr;
+ std::shared_ptr<HdfsFileSystemHandle> _fs_handle = nullptr;
RuntimeProfile* _profile = nullptr;
};
} // namespace io
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]