imay commented on a change in pull request #453: Add UserFunctionCache to cache 
UDF's library
URL: https://github.com/apache/incubator-doris/pull/453#discussion_r243576027
 
 

 ##########
 File path: be/src/runtime/user_function_cache.cpp
 ##########
 @@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/user_function_cache.h"
+
+#include <vector>
+
+#include <boost/algorithm/string/split.hpp> // boost::split
+#include <boost/algorithm/string/predicate.hpp> // boost::algorithm::ends_with
+#include <boost/algorithm/string/classification.hpp> // boost::is_any_of
+
+#include "http/http_client.h"
+#include "util/dynamic_util.h"
+#include "util/file_utils.h"
+#include "util/md5.h"
+#include "util/spinlock.h"
+
+namespace doris {
+
+static const int kLibShardNum = 128;
+
+// function cache entry, store information for 
+struct UserFunctionCacheEntry {
+    UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_,
+                           const std::string& lib_file_)
+        : function_id(fid_), checksum(checksum_), lib_file(lib_file_) { }
+    ~UserFunctionCacheEntry();
+
+    void ref() { _refs.fetch_add(1); }
+
+    // If unref() returns true, this object should be delete
+    bool unref() { return _refs.fetch_sub(1) == 1; }
+
+    int64_t function_id = 0;
+    // used to check if this library is valid.
+    std::string checksum;
+
+    // library file
+    std::string lib_file;
+
+    // make it atomic variable instead of holding a lock
+    std::atomic<bool> is_loaded{false};
+
+    // Set to true when this library is not needed.
+    // e.g. deleting some unused library to re
+    std::atomic<bool> should_delete_library{false};
+
+    // lock to make sure only one can load this cache
+    std::mutex load_lock;
+
+    // To reduce cache lock held time, cache entry is
+    // added to cache map before library is downloaded.
+    // And this is used to indicate whether library is downloaded.
+    bool is_downloaded = false;
+
+    // used to lookup a symbol
+    void* lib_handle = nullptr;
+
+    SpinLock map_lock;
+    // from symbol_name to function pointer
+    std::unordered_map<std::string, void*> fptr_map;
+
+private:
+    std::atomic<int> _refs{0};
+};
+
+UserFunctionCacheEntry::~UserFunctionCacheEntry() {
+    // close lib_handle if it was opened
+    if (lib_handle != nullptr) {
+        dynamic_close(lib_handle);
+        lib_handle = nullptr;
+    }
+
+    // delete library file if should_delete_library is set
+    if (should_delete_library.load()) {
+        unlink(lib_file.c_str());
+    }
+}
+
+UserFunctionCache::UserFunctionCache() {
+}
+
+UserFunctionCache::~UserFunctionCache() {
+    std::lock_guard<std::mutex> l(_cache_lock);
+    auto it = _entry_map.begin();
+    while (it != _entry_map.end()) {
+        auto entry = it->second;
+        it = _entry_map.erase(it);
+        if (entry->unref()) {
+            delete entry;
+        }
+    }
+}
+
+UserFunctionCache* UserFunctionCache::instance() {
+    static UserFunctionCache s_cache;
+    return &s_cache;
+}
+
+Status UserFunctionCache::init(const std::string& lib_dir) {
+    DCHECK(_lib_dir.empty());
+    _lib_dir = lib_dir;
+    // 1. dynamic open current process
+    RETURN_IF_ERROR(dynamic_open(nullptr, &_current_process_handle));
+    // 2. load all cached 
+    RETURN_IF_ERROR(_load_cached_lib());
+    return Status::OK;
+}
+
+Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const 
std::string& file) {
+    if (!boost::algorithm::ends_with(file, ".so")) {
+        return Status("unknown library file format");
+    }
+
+    std::vector<std::string> split_parts;
+    boost::split(split_parts, file, boost::is_any_of("."));
+    if (split_parts.size() != 3) {
+        return Status("user function's name should be 
function_id.checksum.so");
+    }
+    int64_t function_id = std::stol(split_parts[0]);
+    std::string checksum = split_parts[1];
+    auto it = _entry_map.find(function_id);
+    if (it != _entry_map.end()) {
+        LOG(WARNING) << "meet a same function id user function library, 
function_id=" << function_id
+            << ", one_checksum=" << checksum << ", other_checksum=" << 
it->second->checksum;
+        return Status("duplicate function id");
+    }
+    // create a cache entry and put it into entry map
+    UserFunctionCacheEntry* entry = new UserFunctionCacheEntry(
+        function_id, checksum, dir + "/" + file);
+    entry->is_downloaded = true;
+
+    entry->ref();
+    _entry_map[function_id] = entry;
+
+    return Status::OK;
+}
+
+Status UserFunctionCache::_load_cached_lib() {
+    // craete library directory if not exist
+    RETURN_IF_ERROR(FileUtils::create_dir(_lib_dir));
+
+    auto scan_cb = [this] (const std::string& dir, const std::string& file) {
+        auto st = _load_entry_from_lib(dir, file);
+        if (!st.ok()) {
+            LOG(WARNING) << "load a library failed, dir=" << dir << ", file=" 
<< file;
+        }
+        return true;
+    };
+    for (int i = 0; i < kLibShardNum; ++i) {
+        std::string sub_dir = _lib_dir + "/" + std::to_string(i);
+        RETURN_IF_ERROR(FileUtils::create_dir(sub_dir));
+        RETURN_IF_ERROR(FileUtils::scan_dir(sub_dir, scan_cb));
+    }
+    return Status::OK;
+}
+
+Status UserFunctionCache::get_function_ptr(
+        int64_t fid,
+        const std::string& symbol,
+        const std::string& url,
+        const std::string& checksum,
+        void** fn_ptr,
+        UserFunctionCacheEntry** output_entry) {
+    if (fid == 0) {
+        // Just loading a function ptr in the current process. No need to take 
any locks.
+        RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, 
symbol.c_str(), fn_ptr));
+        return Status::OK;
+    }
+
+    // if we need to unref entry
+    bool need_unref_entry = false;
+    UserFunctionCacheEntry* entry = nullptr;
+    // find the library entry for this function. If *output_entry is not null
+    // find symbol in it without to get other entry
+    if (output_entry != nullptr && *output_entry != nullptr) {
+        entry = *output_entry;
+    } else {
+        RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry));
+        need_unref_entry = true;
+    }
+
+    Status status;
+    {
+        std::lock_guard<SpinLock> l(entry->map_lock);
+        // now, we have the library entry, we need to lock it to find symbol
+        auto it = entry->fptr_map.find(symbol);
+        if (it != entry->fptr_map.end()) {
+            *fn_ptr = it->second;
+        } else {
+            status = dynamic_lookup(entry->lib_handle, symbol.c_str(), fn_ptr);
+            if (status.ok()) {
+                entry->fptr_map.emplace(symbol, *fn_ptr);
+            } else {
+                LOG(WARNING) << "fail to lookup symbol in library, symbol=" << 
symbol
+                    << ", file=" << entry->lib_file;
+            }
+        }
+    }
+
+    if (status.ok() && output_entry != nullptr && *output_entry == nullptr) {
+        *output_entry = entry;
+        need_unref_entry = false;
+    }
+
+    if (need_unref_entry) {
+        if (entry->unref()) {
+            delete entry;
+        }
+    }
+
+    return status;
+}
+
+Status UserFunctionCache::_get_cache_entry(
+        int64_t fid, const std::string& url,
+        const std::string& checksum, UserFunctionCacheEntry** output_entry) {
+    UserFunctionCacheEntry* entry = nullptr;
+    {
+        std::lock_guard<std::mutex> l(_cache_lock);
+        auto it = _entry_map.find(fid);
+        if (it != _entry_map.end()) {
+            entry = it->second;
+        } else {
+            entry = new UserFunctionCacheEntry(
+                fid, checksum, _make_lib_file(fid, checksum));
+
+            entry->ref();
+            _entry_map.emplace(fid, entry);
+        }
+        entry->ref();
+    }
 
 Review comment:
   Yes, it should be called twice, one for entry map, one for current function.
   release_entry should be added exprs.h

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to