imay commented on a change in pull request #453: Add UserFunctionCache to cache 
UDF's library
URL: https://github.com/apache/incubator-doris/pull/453#discussion_r243575685
 
 

 ##########
 File path: be/src/runtime/user_function_cache.cpp
 ##########
 @@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/user_function_cache.h"
+
+#include <vector>
+
+#include <boost/algorithm/string/split.hpp> // boost::split
+#include <boost/algorithm/string/predicate.hpp> // boost::algorithm::ends_with
+#include <boost/algorithm/string/classification.hpp> // boost::is_any_of
+
+#include "http/http_client.h"
+#include "util/dynamic_util.h"
+#include "util/file_utils.h"
+#include "util/md5.h"
+#include "util/spinlock.h"
+
+namespace doris {
+
+static const int kLibShardNum = 128;
+
+// function cache entry, store information for 
+struct UserFunctionCacheEntry {
+    UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_,
+                           const std::string& lib_file_)
+        : function_id(fid_), checksum(checksum_), lib_file(lib_file_) { }
+    ~UserFunctionCacheEntry();
+
+    void ref() { _refs.fetch_add(1); }
+
+    // If unref() returns true, this object should be delete
+    bool unref() { return _refs.fetch_sub(1) == 1; }
+
+    int64_t function_id = 0;
+    // used to check if this library is valid.
+    std::string checksum;
+
+    // library file
+    std::string lib_file;
+
+    // make it atomic variable instead of holding a lock
+    std::atomic<bool> is_loaded{false};
+
+    // Set to true when this library is not needed.
+    // e.g. deleting some unused library to re
+    std::atomic<bool> should_delete_library{false};
+
+    // lock to make sure only one can load this cache
+    std::mutex load_lock;
+
+    // To reduce cache lock held time, cache entry is
+    // added to cache map before library is downloaded.
+    // And this is used to indicate whether library is downloaded.
+    bool is_downloaded = false;
+
+    // used to lookup a symbol
+    void* lib_handle = nullptr;
+
+    SpinLock map_lock;
+    // from symbol_name to function pointer
+    std::unordered_map<std::string, void*> fptr_map;
+
+private:
+    std::atomic<int> _refs{0};
+};
+
+UserFunctionCacheEntry::~UserFunctionCacheEntry() {
+    // close lib_handle if it was opened
+    if (lib_handle != nullptr) {
+        dynamic_close(lib_handle);
+        lib_handle = nullptr;
+    }
+
+    // delete library file if should_delete_library is set
+    if (should_delete_library.load()) {
+        unlink(lib_file.c_str());
+    }
+}
+
+UserFunctionCache::UserFunctionCache() {
+}
+
+UserFunctionCache::~UserFunctionCache() {
+    std::lock_guard<std::mutex> l(_cache_lock);
+    auto it = _entry_map.begin();
+    while (it != _entry_map.end()) {
+        auto entry = it->second;
+        it = _entry_map.erase(it);
+        if (entry->unref()) {
+            delete entry;
+        }
+    }
+}
+
+UserFunctionCache* UserFunctionCache::instance() {
+    static UserFunctionCache s_cache;
+    return &s_cache;
+}
+
+Status UserFunctionCache::init(const std::string& lib_dir) {
+    DCHECK(_lib_dir.empty());
+    _lib_dir = lib_dir;
+    // 1. dynamic open current process
+    RETURN_IF_ERROR(dynamic_open(nullptr, &_current_process_handle));
+    // 2. load all cached 
+    RETURN_IF_ERROR(_load_cached_lib());
+    return Status::OK;
+}
+
+Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const 
std::string& file) {
+    if (!boost::algorithm::ends_with(file, ".so")) {
+        return Status("unknown library file format");
+    }
+
+    std::vector<std::string> split_parts;
+    boost::split(split_parts, file, boost::is_any_of("."));
+    if (split_parts.size() != 3) {
+        return Status("user function's name should be 
function_id.checksum.so");
+    }
+    int64_t function_id = std::stol(split_parts[0]);
+    std::string checksum = split_parts[1];
+    auto it = _entry_map.find(function_id);
+    if (it != _entry_map.end()) {
+        LOG(WARNING) << "meet a same function id user function library, 
function_id=" << function_id
+            << ", one_checksum=" << checksum << ", other_checksum=" << 
it->second->checksum;
+        return Status("duplicate function id");
+    }
+    // create a cache entry and put it into entry map
+    UserFunctionCacheEntry* entry = new UserFunctionCacheEntry(
+        function_id, checksum, dir + "/" + file);
+    entry->is_downloaded = true;
+
+    entry->ref();
+    _entry_map[function_id] = entry;
+
+    return Status::OK;
+}
+
+Status UserFunctionCache::_load_cached_lib() {
+    // craete library directory if not exist
 
 Review comment:
   my fault

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to