chenhao7253886 closed pull request #453: Add UserFunctionCache to cache UDF's 
library
URL: https://github.com/apache/incubator-doris/pull/453
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 0a040594..0c9272d7 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -121,6 +121,7 @@ namespace config {
 
     // log dir
     CONF_String(sys_log_dir, "${DORIS_HOME}/log");
+    CONF_String(user_function_dir, "${DORIS_HOME}/lib/usr");
     // INFO, WARNING, ERROR, FATAL
     CONF_String(sys_log_level, "INFO");
     // TIME-DAY, TIME-HOUR, SIZE-MB-nnn
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index 53a327bb..450521eb 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -32,7 +32,7 @@
 #include "runtime/bufferpool/buffer_pool.h"
 #include "runtime/exec_env.h"
 #include "runtime/mem_tracker.h"
-#include "runtime/lib_cache.h"
+#include "runtime/user_function_cache.h"
 #include "exprs/operators.h"
 #include "exprs/is_null_predicate.h"
 #include "exprs/like_predicate.h"
@@ -171,7 +171,7 @@ void init_daemon(int argc, char** argv, const 
std::vector<StorePath>& paths) {
     CpuInfo::init();
     DiskInfo::init();
     MemInfo::init();
-    LibCache::init();
+    UserFunctionCache::instance()->init(config::user_function_dir);
     Operators::init();
     IsNullPredicate::init();
     LikePredicate::init();
diff --git a/be/src/exprs/agg_fn.cc b/be/src/exprs/agg_fn.cc
index fd81204e..74b44d0c 100644
--- a/be/src/exprs/agg_fn.cc
+++ b/be/src/exprs/agg_fn.cc
@@ -20,7 +20,7 @@
 #include "codegen/llvm_codegen.h"
 #include "exprs/anyval_util.h"
 #include "runtime/descriptors.h"
-#include "runtime/lib_cache.h"
+#include "runtime/user_function_cache.h"
 #include "runtime/runtime_state.h"
 
 #include "common/names.h"
@@ -91,32 +91,39 @@ Status AggFn::Init(const RowDescriptor& row_desc, 
RuntimeState* state) {
     return Status(ss.str());
   }
 
-  RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
-      aggregate_fn.init_fn_symbol, &init_fn_, &_cache_entry));
-  RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
-      aggregate_fn.update_fn_symbol, &update_fn_, &_cache_entry));
+  RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+          _fn.id, aggregate_fn.init_fn_symbol, _fn.hdfs_location, "", 
&init_fn_, &_cache_entry));
+  RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+          _fn.id, aggregate_fn.update_fn_symbol, _fn.hdfs_location, "", 
&update_fn_, &_cache_entry));
 
   // Merge() is not defined for purely analytic function.
   if (!aggregate_fn.is_analytic_only_fn) {
-     
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
-         aggregate_fn.merge_fn_symbol, &merge_fn_, &_cache_entry));
+     RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+             _fn.id, aggregate_fn.merge_fn_symbol, _fn.hdfs_location, "", 
&merge_fn_, &_cache_entry));
   }
   // Serialize(), GetValue(), Remove() and Finalize() are optional
   if (!aggregate_fn.serialize_fn_symbol.empty()) {
-    
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
-        aggregate_fn.serialize_fn_symbol, &serialize_fn_, &_cache_entry));
+    RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+            _fn.id, aggregate_fn.serialize_fn_symbol,
+            _fn.hdfs_location, "",
+            &serialize_fn_, &_cache_entry));
   }
   if (!aggregate_fn.get_value_fn_symbol.empty()) {
-    
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
-        aggregate_fn.get_value_fn_symbol, &get_value_fn_, &_cache_entry));
+    RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+            _fn.id, aggregate_fn.get_value_fn_symbol, _fn.hdfs_location, "",
+            &get_value_fn_, &_cache_entry));
   }
   if (!aggregate_fn.remove_fn_symbol.empty()) {
-    
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
-        aggregate_fn.remove_fn_symbol, &remove_fn_, &_cache_entry));
+    RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+            _fn.id, aggregate_fn.remove_fn_symbol,
+            _fn.hdfs_location, "",
+            &remove_fn_, &_cache_entry));
   }
   if (!aggregate_fn.finalize_fn_symbol.empty()) {
-    
RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(_fn.hdfs_location,
-        _fn.aggregate_fn.finalize_fn_symbol, &finalize_fn_, &_cache_entry));
+    RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+            _fn.id, _fn.aggregate_fn.finalize_fn_symbol,
+            _fn.hdfs_location, "",
+            &finalize_fn_, &_cache_entry));
   }
   return Status::OK;
 }
diff --git a/be/src/exprs/agg_fn_evaluator.cpp 
b/be/src/exprs/agg_fn_evaluator.cpp
index 6653bc0a..ec8c6c6d 100755
--- a/be/src/exprs/agg_fn_evaluator.cpp
+++ b/be/src/exprs/agg_fn_evaluator.cpp
@@ -24,7 +24,7 @@
 #include "exec/aggregation_node.h"
 #include "exprs/aggregate_functions.h"
 #include "exprs/anyval_util.h"
-//#include "runtime/lib_cache.h"
+#include "runtime/user_function_cache.h"
 #include "udf/udf_internal.h"
 #include "util/debug_util.h"
 #include "runtime/datetime_value.h"
@@ -207,37 +207,38 @@ Status AggFnEvaluator::prepare(
     }
 
     // Load the function pointers.
-    RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
-            _hdfs_location, _fn.aggregate_fn.init_fn_symbol, &_init_fn, NULL, 
true));
+    RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+            _fn.id, _fn.aggregate_fn.init_fn_symbol, _hdfs_location, "", 
&_init_fn, NULL));
 
-    RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
-            _hdfs_location, _fn.aggregate_fn.update_fn_symbol, &_update_fn, 
NULL, true));
+    RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+            _fn.id, _fn.aggregate_fn.update_fn_symbol, _hdfs_location, "", 
&_update_fn, NULL));
 
     // Merge() is not loaded if evaluating the agg fn as an analytic function.
     if (!_is_analytic_fn) {
-    RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
-            _hdfs_location, _fn.aggregate_fn.merge_fn_symbol, &_merge_fn, 
NULL, true));
+    RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+            _fn.id, _fn.aggregate_fn.merge_fn_symbol, _hdfs_location, "", 
&_merge_fn, NULL));
     }
 
     // Serialize and Finalize are optional
     if (!_fn.aggregate_fn.serialize_fn_symbol.empty()) {
-        RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
-                _hdfs_location, _fn.aggregate_fn.serialize_fn_symbol, 
&_serialize_fn, NULL, true));
+        RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+                _fn.id, _fn.aggregate_fn.serialize_fn_symbol, _hdfs_location,
+                "", &_serialize_fn, NULL));
     }
     if (!_fn.aggregate_fn.finalize_fn_symbol.empty()) {
-        RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
-                _hdfs_location, _fn.aggregate_fn.finalize_fn_symbol, 
&_finalize_fn, NULL, true));
+        RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+                _fn.id, _fn.aggregate_fn.finalize_fn_symbol, _hdfs_location, 
"", &_finalize_fn, NULL));
     }
 
     if (!_fn.aggregate_fn.get_value_fn_symbol.empty()) {
-        RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
-                _hdfs_location, _fn.aggregate_fn.get_value_fn_symbol, 
&_get_value_fn,
-                NULL, true));
+        RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+                _fn.id, _fn.aggregate_fn.get_value_fn_symbol, _hdfs_location, 
"", &_get_value_fn,
+                NULL));
     }
     if (!_fn.aggregate_fn.remove_fn_symbol.empty()) {
-        RETURN_IF_ERROR(LibCache::instance()->get_so_function_ptr(
-                _hdfs_location, _fn.aggregate_fn.remove_fn_symbol, &_remove_fn,
-                NULL, true));
+        RETURN_IF_ERROR(UserFunctionCache::instance()->get_function_ptr(
+                _fn.id, _fn.aggregate_fn.remove_fn_symbol, _hdfs_location, "", 
&_remove_fn,
+                NULL));
     }
 
     vector<FunctionContext::TypeDesc> arg_types;
diff --git a/be/src/exprs/arithmetic_expr.h b/be/src/exprs/arithmetic_expr.h
index 7d06d635..b83f89c3 100644
--- a/be/src/exprs/arithmetic_expr.h
+++ b/be/src/exprs/arithmetic_expr.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_EXPRS_ARITHMETIC_EXPR_H
 #define DORIS_BE_SRC_EXPRS_ARITHMETIC_EXPR_H
 
+#include "common/object_pool.h"
 #include "exprs/expr.h"
 
 namespace doris {
diff --git a/be/src/exprs/binary_predicate.h b/be/src/exprs/binary_predicate.h
index 783e2dbd..463ca9b9 100644
--- a/be/src/exprs/binary_predicate.h
+++ b/be/src/exprs/binary_predicate.h
@@ -20,7 +20,10 @@
 
 #include <string>
 #include <iostream>
+
 #include <llvm/IR/InstrTypes.h>
+
+#include "common/object_pool.h"
 #include "exprs/predicate.h"
 #include "gen_cpp/Exprs_types.h"
 
diff --git a/be/src/exprs/case_expr.h b/be/src/exprs/case_expr.h
index 02795af0..7b848a96 100644
--- a/be/src/exprs/case_expr.h
+++ b/be/src/exprs/case_expr.h
@@ -20,6 +20,7 @@
 
 #include <string>
 #include "expr.h"
+#include "common/object_pool.h"
 
 namespace llvm {
 class Function;
diff --git a/be/src/exprs/cast_expr.h b/be/src/exprs/cast_expr.h
index 75de1c81..e7e516af 100644
--- a/be/src/exprs/cast_expr.h
+++ b/be/src/exprs/cast_expr.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_EXPRS_CAST_EXPR_H
 #define DORIS_BE_SRC_EXPRS_CAST_EXPR_H
 
+#include "common/object_pool.h"
 #include "exprs/expr.h"
 
 namespace doris {
diff --git a/be/src/exprs/compound_predicate.h 
b/be/src/exprs/compound_predicate.h
index b5bf2bc5..fbecd287 100644
--- a/be/src/exprs/compound_predicate.h
+++ b/be/src/exprs/compound_predicate.h
@@ -19,6 +19,8 @@
 #define DORIS_BE_SRC_QUERY_EXPRS_COMPOUND_PREDICATE_H
 
 #include <string>
+
+#include "common/object_pool.h"
 #include "exprs/predicate.h"
 #include "gen_cpp/Exprs_types.h"
 
diff --git a/be/src/exprs/conditional_functions.h 
b/be/src/exprs/conditional_functions.h
index 53502a37..d14ba7b9 100644
--- a/be/src/exprs/conditional_functions.h
+++ b/be/src/exprs/conditional_functions.h
@@ -19,6 +19,7 @@
 #define DORIS_BE_SRC_QUERY_EXPRS_CONDITIONAL_FUNCTIONS_H
 
 #include <stdint.h>
+#include "common/object_pool.h"
 #include "exprs/expr.h"
 #include "udf/udf.h"
 
diff --git a/be/src/exprs/expr.cpp b/be/src/exprs/expr.cpp
index a65147c5..e9710067 100644
--- a/be/src/exprs/expr.cpp
+++ b/be/src/exprs/expr.cpp
@@ -48,6 +48,7 @@
 #include "gen_cpp/Data_types.h"
 #include "runtime/runtime_state.h"
 #include "runtime/raw_value.h"
+#include "runtime/user_function_cache.h"
 #include "util/debug_util.h"
 
 #include "gen_cpp/Exprs_types.h"
@@ -1077,11 +1078,15 @@ Status Expr::create_tree_internal(const 
vector<TExprNode>& nodes, ObjectPool* po
 
 // TODO chenhao
 void Expr::close() {
-  for (Expr* child : _children) child->close();
-  /*if (_cache_entry != nullptr) {
-    LibCache::instance()->decrement_use_count(_cache_entry);
-    _cache_entry = nullptr;
-  }*/
+    for (Expr* child : _children) child->close();
+    /*if (_cache_entry != nullptr) {
+      LibCache::instance()->decrement_use_count(_cache_entry);
+      _cache_entry = nullptr;
+      }*/
+    if (_cache_entry != nullptr) {
+        UserFunctionCache::instance()->release_entry(_cache_entry);
+        _cache_entry = nullptr;
+    }
 }
 
 void Expr::close(const vector<Expr*>& exprs) {
diff --git a/be/src/exprs/expr.h b/be/src/exprs/expr.h
index 1690992c..d947cf8e 100644
--- a/be/src/exprs/expr.h
+++ b/be/src/exprs/expr.h
@@ -34,7 +34,6 @@
 #include "runtime/datetime_value.h"
 #include "runtime/decimal_value.h"
 #include "udf/udf.h"
-#include "runtime/lib_cache.h"
 #include "runtime/types.h"
 //#include <boost/scoped_ptr.hpp>
 //
@@ -65,6 +64,7 @@ class TupleIsNullPredicate;
 class VectorizedRowBatch;
 class Literal;
 class MemTracker;
+class UserFunctionCacheEntry;
 
 // This is the superclass of all expr evaluation nodes.
 class Expr {
@@ -396,7 +396,7 @@ class Expr {
         ExprContext* ctx, RuntimeState* state, int varargs_buffer_size);
 
     /// Cache entry for the library implementing this function.
-    LibCache::LibCacheEntry* _cache_entry;
+    UserFunctionCacheEntry* _cache_entry;
 
     // function opcode
 
diff --git a/be/src/exprs/info_func.h b/be/src/exprs/info_func.h
index fd1279d6..f0c05bb8 100644
--- a/be/src/exprs/info_func.h
+++ b/be/src/exprs/info_func.h
@@ -20,6 +20,7 @@
 
 #include <string>
 #include <iostream>
+#include "common/object_pool.h"
 #include "exprs/expr.h"
 #include "gen_cpp/Exprs_types.h"
 
diff --git a/be/src/exprs/literal.h b/be/src/exprs/literal.h
index 49b3d518..307775b8 100644
--- a/be/src/exprs/literal.h
+++ b/be/src/exprs/literal.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_QUERY_EXPRS_LITERAL_H
 #define DORIS_BE_SRC_QUERY_EXPRS_LITERAL_H
 
+#include "common/object_pool.h"
 #include "exprs/expr.h"
 
 namespace doris {
diff --git a/be/src/exprs/new_agg_fn_evaluator.cc 
b/be/src/exprs/new_agg_fn_evaluator.cc
index f61a6afb..384b2b75 100644
--- a/be/src/exprs/new_agg_fn_evaluator.cc
+++ b/be/src/exprs/new_agg_fn_evaluator.cc
@@ -28,7 +28,6 @@
 #include "exprs/expr.h"
 #include "exprs/scalar_fn_call.h"
 #include "gutil/strings/substitute.h"
-#include "runtime/lib_cache.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/raw_value.h"
 #include "runtime/runtime_state.h"
diff --git a/be/src/exprs/new_agg_fn_evaluator.h 
b/be/src/exprs/new_agg_fn_evaluator.h
index bfbaae73..7c9bd72f 100644
--- a/be/src/exprs/new_agg_fn_evaluator.h
+++ b/be/src/exprs/new_agg_fn_evaluator.h
@@ -28,7 +28,6 @@
 #include "exprs/agg_fn.h"
 #include "exprs/hybird_map.h"
 #include "runtime/descriptors.h"
-#include "runtime/lib_cache.h"
 #include "runtime/tuple_row.h"
 #include "runtime/types.h"
 #include "udf/udf.h"
diff --git a/be/src/exprs/null_literal.h b/be/src/exprs/null_literal.h
index 5edae780..857b5b48 100644
--- a/be/src/exprs/null_literal.h
+++ b/be/src/exprs/null_literal.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_QUERY_EXPRS_NULL_LITERAL_H
 #define DORIS_BE_SRC_QUERY_EXPRS_NULL_LITERAL_H
 
+#include "common/object_pool.h"
 #include "exprs/expr.h"
 
 namespace llvm {
diff --git a/be/src/exprs/scalar_fn_call.cpp b/be/src/exprs/scalar_fn_call.cpp
index 10a2d80f..fd6ffeaf 100644
--- a/be/src/exprs/scalar_fn_call.cpp
+++ b/be/src/exprs/scalar_fn_call.cpp
@@ -25,7 +25,7 @@
 #include "codegen/llvm_codegen.h"
 #include "exprs/anyval_util.h"
 #include "exprs/expr_context.h"
-#include "runtime/lib_cache.h"
+#include "runtime/user_function_cache.h"
 #include "runtime/runtime_state.h"
 #include "udf/udf_internal.h"
 #include "util/debug_util.h"
@@ -89,8 +89,8 @@ Status ScalarFnCall::prepare(
     Status status = Status::OK;
     if (_scalar_fn == NULL) {
         if (SymbolsUtil::is_mangled(_fn.scalar_fn.symbol)) {
-            status = LibCache::instance()->get_so_function_ptr(
-                _fn.hdfs_location, _fn.scalar_fn.symbol, &_scalar_fn, 
&_cache_entry, true);
+            status = UserFunctionCache::instance()->get_function_ptr(
+                _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, "", 
&_scalar_fn, &_cache_entry);
         } else {
             std::vector<TypeDescriptor> arg_types;
             for (auto& t_type : _fn.arg_types) {
@@ -100,8 +100,8 @@ Status ScalarFnCall::prepare(
             // ret_type = ColumnType(thrift_to_type(_fn.ret_type));
             std::string symbol = SymbolsUtil::mangle_user_function(
                 _fn.scalar_fn.symbol, arg_types, _fn.has_var_args, NULL);
-            status = LibCache::instance()->get_so_function_ptr(
-                _fn.hdfs_location, symbol, &_scalar_fn, &_cache_entry, true);
+            status = UserFunctionCache::instance()->get_function_ptr(
+                _fn.id, symbol, _fn.hdfs_location, "", &_scalar_fn, 
&_cache_entry);
         }
     }
 #if 0
@@ -119,7 +119,7 @@ Status ScalarFnCall::prepare(
         if (char_arg) {
             DCHECK(num_fixed_args() <= 8 && _fn.binary_type == 
TFunctionBinaryType::BUILTIN);
         }
-        Status status = LibCache::instance()->GetSoFunctionPtr(
+        Status status = UserFunctionCache::instance()->GetSoFunctionPtr(
             _fn.hdfs_location, _fn.scalar_fn.symbol, &_scalar_fn, 
&cache_entry_);
         if (!status.ok()) {
             if (_fn.binary_type == TFunctionBinaryType::BUILTIN) {
@@ -141,8 +141,8 @@ Status ScalarFnCall::prepare(
 
         if (_fn.binary_type == TFunctionBinaryType::IR) {
             std::string local_path;
-            RETURN_IF_ERROR(LibCache::instance()->GetLocalLibPath(
-                    _fn.hdfs_location, LibCache::TYPE_IR, &local_path));
+            RETURN_IF_ERROR(UserFunctionCache::instance()->GetLocalLibPath(
+                    _fn.hdfs_location, UserFunctionCache::TYPE_IR, 
&local_path));
             // Link the UDF module into this query's main module (essentially 
copy the UDF
             // module into the main module) so the UDF's functions are 
available in the main
             // module.
@@ -425,8 +425,8 @@ Status ScalarFnCall::get_udf(RuntimeState* state, 
Function** udf) {
         // This can either be a UDF implemented in a .so or a builtin using 
the UDF
         // interface with the code in impalad.
         void* fn_ptr = NULL;
-        Status status = LibCache::instance()->get_so_function_ptr(
-            _fn.hdfs_location, _fn.scalar_fn.symbol, &fn_ptr, &_cache_entry);
+        Status status = UserFunctionCache::instance()->get_function_ptr(
+            _fn.id, _fn.scalar_fn.symbol, _fn.hdfs_location, "", &fn_ptr, 
&_cache_entry);
         if (!status.ok() && _fn.binary_type == TFunctionBinaryType::BUILTIN) {
             // Builtins symbols should exist unless there is a version 
mismatch.
             // TODO(zc )
@@ -541,8 +541,8 @@ Status ScalarFnCall::get_udf(RuntimeState* state, 
Function** udf) {
 Status ScalarFnCall::get_function(RuntimeState* state, const std::string& 
symbol, void** fn) {
     if (_fn.binary_type == TFunctionBinaryType::NATIVE 
             || _fn.binary_type == TFunctionBinaryType::BUILTIN) {
-        return LibCache::instance()->get_so_function_ptr(
-            _fn.hdfs_location, symbol, fn, &_cache_entry, true);
+        return UserFunctionCache::instance()->get_function_ptr(
+            _fn.id, symbol, _fn.hdfs_location, "", fn, &_cache_entry);
     } else {
 #if 0
         DCHECK_EQ(_fn.binary_type, TFunctionBinaryType::IR);
diff --git a/be/src/exprs/scalar_fn_call.h b/be/src/exprs/scalar_fn_call.h
index 240faac6..dcd2ba78 100644
--- a/be/src/exprs/scalar_fn_call.h
+++ b/be/src/exprs/scalar_fn_call.h
@@ -20,6 +20,7 @@
 
 #include <string>
 
+#include "common/object_pool.h"
 #include "exprs/expr.h"
 #include "udf/udf.h"
 
diff --git a/be/src/exprs/slot_ref.h b/be/src/exprs/slot_ref.h
index 01202629..6d6f7ffb 100644
--- a/be/src/exprs/slot_ref.h
+++ b/be/src/exprs/slot_ref.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_QUERY_EXPRS_SLOT_REF_H
 #define DORIS_BE_SRC_QUERY_EXPRS_SLOT_REF_H
 
+#include "common/object_pool.h"
 #include "exprs/expr.h"
 
 namespace doris {
diff --git a/be/src/exprs/tuple_is_null_predicate.h 
b/be/src/exprs/tuple_is_null_predicate.h
index 08c7a823..f55b0e02 100644
--- a/be/src/exprs/tuple_is_null_predicate.h
+++ b/be/src/exprs/tuple_is_null_predicate.h
@@ -18,6 +18,7 @@
 #ifndef DORIS_BE_SRC_QUERY_EXPRS_TUPLE_IS_NULL_PREDICATE_H
 #define DORIS_BE_SRC_QUERY_EXPRS_TUPLE_IS_NULL_PREDICATE_H
 
+#include "common/object_pool.h"
 #include "exprs/predicate.h"
 
 namespace doris {
diff --git a/be/src/http/http_client.cpp b/be/src/http/http_client.cpp
index e8c4834b..6e93a02f 100644
--- a/be/src/http/http_client.cpp
+++ b/be/src/http/http_client.cpp
@@ -159,16 +159,19 @@ Status HttpClient::download(const std::string& 
local_path) {
         LOG(WARNING) << "open file failed, file=" << local_path;
         return Status("open file failed");
     }
-    auto callback = [&fp, &local_path] (const void* data, size_t length) {
+    Status status;
+    auto callback = [&status, &fp, &local_path] (const void* data, size_t 
length) {
         auto res = fwrite(data, length, 1, fp.get());
         if (res != 1) {
             LOG(WARNING) << "fail to write data to file, file=" << local_path
                 << ", error=" << ferror(fp.get());
+            status = Status("fail to write data when download");
             return false;
         }
         return true;
     };
-    return execute(callback);
+    RETURN_IF_ERROR(execute(callback));
+    return status;
 }
 
 Status HttpClient::execute(std::string* response) {
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 5dcf63bb..fd911382 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -37,7 +37,7 @@ add_library(Runtime STATIC
   descriptors.cpp
   exec_env.cpp
   exec_env_init.cpp
-  lib_cache.cpp
+  user_function_cache.cpp
   mem_pool.cpp
   plan_fragment_executor.cpp
   primitive_type.cpp
diff --git a/be/src/runtime/lib_cache.cpp b/be/src/runtime/lib_cache.cpp
deleted file mode 100644
index 5bfaa1a3..00000000
--- a/be/src/runtime/lib_cache.cpp
+++ /dev/null
@@ -1,437 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/lib_cache.h"
-
-#include <boost/filesystem.hpp>
-#include <boost/foreach.hpp>
-#include <boost/thread/locks.hpp>
-
-#include "codegen/llvm_codegen.h"
-#include "runtime/runtime_state.h"
-#include "util/dynamic_util.h"
-#include "util/hash_util.hpp"
-#include "util/path_builder.h"
-
-namespace doris {
-
-boost::scoped_ptr<LibCache> LibCache::_s_instance;
-
-struct LibCache::LibCacheEntry {
-    // Lock protecting all fields in this entry
-    boost::mutex lock;
-
-    // The number of users that are using this cache entry. If this is
-    // a .so, we can't dlclose unless the use_count goes to 0.
-    int use_count;
-
-    // If true, this cache entry should be removed from _lib_cache when
-    // the use_count goes to 0.
-    bool should_remove;
-
-    // If true, we need to check if there is a newer version of the cached 
library in HDFS
-    // on next access. Should hold _lock to read/write.
-    bool check_needs_refresh;
-
-    // The type of this file.
-    LibType type;
-
-    // The path on the local file system for this library.
-    std::string local_path;
-
-    // Status returned from copying this file from HDFS.
-    Status copy_file_status;
-
-    // The last modification time of the HDFS file in seconds.
-    time_t last_mod_time;
-
-    // Handle from dlopen.
-    void* shared_object_handle;
-
-    // mapping from symbol => address of loaded symbol.
-    // Only used if the type is TYPE_SO.
-    typedef boost::unordered_map<std::string, void*> SymbolMap;
-    SymbolMap symbol_cache;
-
-    // Set of symbols in this entry. This is populated once on load and read
-    // only. This is only used if it is a llvm module.
-    // TODO: it would be nice to be able to do this for .so's as well but it's
-    // not trivial to walk an .so for the symbol table.
-    boost::unordered_set<std::string> symbols;
-
-    // Set if an error occurs loading the cache entry before the cache entry
-    // can be evicted. This allows other threads that attempt to use the entry
-    // before it is removed to return the same error.
-    Status loading_status;
-
-    LibCacheEntry() : use_count(0), should_remove(false), 
check_needs_refresh(false),
-    shared_object_handle(NULL) {}
-    ~LibCacheEntry();
-};
-
-LibCache::LibCache() : 
-        _current_process_handle(NULL) {
-}
-
-LibCache::~LibCache() {
-    drop_cache();
-    if (_current_process_handle != NULL) {
-        dynamic_close(_current_process_handle);
-    }
-}
-
-Status LibCache::init() {
-    DCHECK(LibCache::_s_instance.get() == NULL);
-    LibCache::_s_instance.reset(new LibCache());
-    return LibCache::_s_instance->init_internal();
-}
-
-Status LibCache::init_internal() {
-    // if (TestInfo::is_fe_test()) {
-    //     // In the FE tests, NULL gives the handle to the java process.
-    //     // Explicitly load the fe-support shared object.
-    //     std::string fe_support_path;
-    //     PathBuilder::GetFullBuildPath("service/libfesupport.so", 
&fe_support_path);
-    //     RETURN_IF_ERROR(dynamic_open(fe_support_path.c_str(), 
&_current_process_handle));
-    // } else {
-    RETURN_IF_ERROR(dynamic_open(NULL, &_current_process_handle));
-    DCHECK(_current_process_handle != NULL)
-        << "We should always be able to get current process handle.";
-    return Status::OK;
-}
-
-LibCache::LibCacheEntry::~LibCacheEntry() {
-    if (shared_object_handle != NULL) {
-        DCHECK_EQ(use_count, 0);
-        DCHECK(should_remove);
-        dynamic_close(shared_object_handle);
-    }
-    unlink(local_path.c_str());
-}
-
-Status LibCache::get_so_function_ptr(
-        const std::string& hdfs_lib_file, const std::string& origin_symbol,
-        void** fn_ptr, LibCacheEntry** ent, bool quiet) {
-    const std::string symbol = get_real_symbol(origin_symbol);
-    if (hdfs_lib_file.empty()) {
-        // Just loading a function ptr in the current process. No need to take 
any locks.
-        DCHECK(_current_process_handle != NULL);
-        RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, 
symbol.c_str(), fn_ptr));
-        return Status::OK;
-    }
-    LibCacheEntry* entry = NULL;
-    boost::unique_lock<boost::mutex> lock;
-    if (ent != NULL && *ent != NULL) {
-        // Reuse already-cached entry provided by user
-        entry = *ent;
-        boost::unique_lock<boost::mutex> l(entry->lock);
-        lock.swap(l);
-    } else {
-        RETURN_IF_ERROR(get_chache_entry(hdfs_lib_file, TYPE_SO, &lock, 
&entry));
-    }
-    DCHECK(entry != NULL);
-    DCHECK_EQ(entry->type, TYPE_SO);
-
-    LibCacheEntry::SymbolMap::iterator it = entry->symbol_cache.find(symbol);
-    if (it != entry->symbol_cache.end()) {
-        *fn_ptr = it->second;
-    } else {
-        RETURN_IF_ERROR(
-            dynamic_lookup(entry->shared_object_handle, symbol.c_str(), 
fn_ptr));
-        entry->symbol_cache[symbol] = *fn_ptr;
-    }
-
-    DCHECK(*fn_ptr != NULL);
-    if (ent != NULL && *ent == NULL) {
-        // Only set and increment user's entry if it wasn't already cached
-        *ent = entry;
-        ++(*ent)->use_count;
-    }
-    return Status::OK;
-}
-
-void LibCache::decrement_use_count(LibCacheEntry* entry) {
-    if (entry == NULL) {
-        return;
-    }
-    bool can_delete = false;
-    {
-        boost::unique_lock<boost::mutex> lock(entry->lock);
-        --entry->use_count;
-        can_delete = (entry->use_count == 0 && entry->should_remove);
-    }
-    if (can_delete) {
-        delete entry;
-    }
-}
-
-Status LibCache::get_local_lib_path(
-        const std::string& hdfs_lib_file, LibType type, std::string* 
local_path) {
-    boost::unique_lock<boost::mutex> lock;
-    LibCacheEntry* entry = NULL;
-    RETURN_IF_ERROR(get_chache_entry(hdfs_lib_file, type, &lock, &entry));
-    DCHECK(entry != NULL);
-    DCHECK_EQ(entry->type, type);
-    *local_path = entry->local_path;
-    return Status::OK;
-}
-
-Status LibCache::check_symbol_exists(
-        const std::string& hdfs_lib_file, LibType type,
-        const std::string& origin_symbol, bool quiet) {
-    const std::string symbol = get_real_symbol(origin_symbol);
-    if (type == TYPE_SO) {
-        void* dummy_ptr = NULL;
-        return get_so_function_ptr(hdfs_lib_file, symbol, &dummy_ptr, NULL, 
quiet);
-    } else if (type == TYPE_IR) {
-        boost::unique_lock<boost::mutex> lock;
-        LibCacheEntry* entry = NULL;
-        RETURN_IF_ERROR(get_chache_entry(hdfs_lib_file, type, &lock, &entry));
-        DCHECK(entry != NULL);
-        DCHECK_EQ(entry->type, TYPE_IR);
-        if (entry->symbols.find(symbol) == entry->symbols.end()) {
-            std::stringstream ss;
-            ss << "Symbol '" << symbol << "' does not exist in module: " << 
hdfs_lib_file
-                << " (local path: " << entry->local_path << ")";
-            // return quiet ? Status::Expected(ss.str()) : Status(ss.str());
-            return Status(ss.str());
-        }
-        return Status::OK;
-    } else if (type == TYPE_JAR) {
-        // TODO: figure out how to inspect contents of jars
-        boost::unique_lock<boost::mutex> lock;
-        LibCacheEntry* dummy_entry = NULL;
-        return get_chache_entry(hdfs_lib_file, type, &lock, &dummy_entry);
-    } else {
-        DCHECK(false);
-        return Status("Shouldn't get here.");
-    }
-}
-
-void LibCache::set_needs_refresh(const std::string& hdfs_lib_file) {
-    boost::unique_lock<boost::mutex> lib_cache_lock(_lock);
-    LibMap::iterator it = _lib_cache.find(hdfs_lib_file);
-    if (it == _lib_cache.end()) {
-        return;
-    }
-    LibCacheEntry* entry = it->second;
-
-    boost::unique_lock<boost::mutex> entry_lock(entry->lock);
-    // Need to hold _lock before setting check_needs_refresh.
-    entry->check_needs_refresh = true;
-}
-
-void LibCache::remove_entry(const std::string& hdfs_lib_file) {
-    boost::unique_lock<boost::mutex> lib_cache_lock(_lock);
-    LibMap::iterator it = _lib_cache.find(hdfs_lib_file);
-    if (it == _lib_cache.end()) {
-        return;
-    }
-    remove_entry_internal(hdfs_lib_file, it);
-}
-
-void LibCache::remove_entry_internal(const std::string& hdfs_lib_file,
-                                   const LibMap::iterator& entry_iter) {
-    LibCacheEntry* entry = entry_iter->second;
-    VLOG(1) << "Removing lib cache entry: " << hdfs_lib_file
-        << ", local path: " << entry->local_path;
-    boost::unique_lock<boost::mutex> entry_lock(entry->lock);
-
-    // We have both locks so no other thread can be updating _lib_cache or 
trying to get
-    // the entry.
-    _lib_cache.erase(entry_iter);
-
-    entry->should_remove = true;
-    DCHECK_GE(entry->use_count, 0);
-    bool can_delete = entry->use_count == 0;
-
-    // Now that the entry is removed from the map, it means no future threads
-    // can find it->second (the entry), so it is safe to unlock.
-    entry_lock.unlock();
-
-    // Now that we've unlocked, we can delete this entry if no one is using it.
-    if (can_delete) {
-        delete entry;
-    }
-}
-
-void LibCache::drop_cache() {
-    boost::unique_lock<boost::mutex> lib_cache_lock(_lock);
-    BOOST_FOREACH(LibMap::value_type& v, _lib_cache) {
-        bool can_delete = false;
-        {
-            // Lock to wait for any threads currently processing the entry.
-            boost::unique_lock<boost::mutex> entry_lock(v.second->lock);
-            v.second->should_remove = true;
-            DCHECK_GE(v.second->use_count, 0);
-            can_delete = v.second->use_count == 0;
-        }
-        VLOG(1) << "Removed lib cache entry: " << v.first;
-        if (can_delete) delete v.second;
-    }
-    _lib_cache.clear();
-}
-
-Status LibCache::get_chache_entry(
-        const std::string& hdfs_lib_file, LibType type,
-        boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry) {
-    Status status;
-    {
-        // If an error occurs, local_entry_lock is released before calling 
remove_entry()
-        // below because it takes the global _lock which must be acquired 
before taking entry
-        // locks.
-        boost::unique_lock<boost::mutex> local_entry_lock;
-        status = get_chache_entry_internal(hdfs_lib_file, type, 
&local_entry_lock, entry);
-        if (status.ok()) {
-            entry_lock->swap(local_entry_lock);
-            return status;
-        }
-        if (*entry == NULL) return status;
-
-        // Set loading_status on the entry so that if another thread calls
-        // get_chache_entry() for this lib before this thread is able to 
acquire _lock in
-        // remove_entry(), it is able to return the same error.
-        (*entry)->loading_status = status;
-    }
-    // Takes _lock
-    remove_entry(hdfs_lib_file);
-    return status;
-}
-
-Status LibCache::get_chache_entry_internal(
-        const std::string& hdfs_lib_file, LibType type,
-        boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry) {
-    DCHECK(!hdfs_lib_file.empty());
-    *entry = NULL;
-#if 0
-    // Check if this file is already cached or an error occured on another 
thread while
-    // loading the library.
-    boost::unique_lock<boost::mutex> lib_cache_lock(_lock);
-    LibMap::iterator it = _lib_cache.find(hdfs_lib_file);
-    if (it != _lib_cache.end()) {
-        {
-            boost::unique_lock<boost::mutex> 
local_entry_lock((it->second)->lock);
-            if (!(it->second)->loading_status.ok()) {
-                // If loading_status is already set, the returned *entry 
should be NULL.
-                DCHECK(*entry == NULL);
-                return (it->second)->loading_status;
-            }
-        }
-
-        *entry = it->second;
-        if ((*entry)->check_needs_refresh) {
-            // Check if file has been modified since loading the cached copy. 
If so, remove the
-            // cached entry and create a new one.
-            (*entry)->check_needs_refresh = false;
-            time_t last_mod_time;
-            hdfsFS hdfs_conn;
-            Status status = 
HdfsFsCache::instance()->GetConnection(hdfs_lib_file, &hdfs_conn);
-            if (!status.ok()) {
-                remove_entry_internal(hdfs_lib_file, it);
-                *entry = NULL;
-                return status;
-            }
-            status = GetLastModificationTime(hdfs_conn, hdfs_lib_file.c_str(), 
&last_mod_time);
-            if (!status.ok() || (*entry)->last_mod_time < last_mod_time) {
-                remove_entry_internal(hdfs_lib_file, it);
-                *entry = NULL;
-            }
-            RETURN_IF_ERROR(status);
-        }
-    }
-
-    if (*entry != NULL) {
-        // Release the _lib_cache lock. This guarantees other threads looking 
at other
-        // libs can continue.
-        lib_cache_lock.unlock();
-        boost::unique_lock<boost::mutex> local_entry_lock((*entry)->lock);
-        entry_lock->swap(local_entry_lock);
-
-        RETURN_IF_ERROR((*entry)->copy_file_status);
-        DCHECK_EQ((*entry)->type, type);
-        DCHECK(!(*entry)->local_path.empty());
-        return Status::OK;
-    }
-
-    // Entry didn't exist. Add the entry then release _lock (so other libraries
-    // can be accessed).
-    *entry = new LibCacheEntry();
-
-    // Grab the entry lock before adding it to _lib_cache. We still need to do 
more
-    // work to initialize *entry and we don't want another thread to pick up
-    // the uninitialized entry.
-    boost::unique_lock<boost::mutex> local_entry_lock((*entry)->lock);
-    entry_lock->swap(local_entry_lock);
-    _lib_cache[hdfs_lib_file] = *entry;
-    lib_cache_lock.unlock();
-
-    // At this point we have the entry lock but not the lib cache lock.
-    DCHECK(*entry != NULL);
-    (*entry)->type = type;
-
-    // Copy the file
-    (*entry)->local_path = make_local_path(hdfs_lib_file, 
FLAGS_local_library_dir);
-    VLOG(1) << "Adding lib cache entry: " << hdfs_lib_file
-        << ", local path: " << (*entry)->local_path;
-
-    hdfsFS hdfs_conn, local_conn;
-    RETURN_IF_ERROR(HdfsFsCache::instance()->GetConnection(hdfs_lib_file, 
&hdfs_conn));
-    RETURN_IF_ERROR(HdfsFsCache::instance()->GetLocalConnection(&local_conn));
-
-    // Note: the file can be updated between getting last_mod_time and copying 
the file to
-    // local_path. This can only result in the file unnecessarily being 
refreshed, and does
-    // not affect correctness.
-    (*entry)->copy_file_status = GetLastModificationTime(
-        hdfs_conn, hdfs_lib_file.c_str(), &(*entry)->last_mod_time);
-    RETURN_IF_ERROR((*entry)->copy_file_status);
-
-    (*entry)->copy_file_status = CopyHdfsFile(
-        hdfs_conn, hdfs_lib_file, local_conn, (*entry)->local_path);
-    RETURN_IF_ERROR((*entry)->copy_file_status);
-
-    if (type == TYPE_SO) {
-        // dlopen the local library
-        RETURN_IF_ERROR(
-            DynamicOpen((*entry)->local_path.c_str(), 
&(*entry)->shared_object_handle));
-    } else if (type == TYPE_IR) {
-        // Load the module and populate all symbols.
-        ObjectPool pool;
-        scoped_ptr<LlvmCodeGen> codegen;
-        std::string module_id = 
boost::filesystem::path((*entry)->local_path).stem().string();
-        RETURN_IF_ERROR(LlvmCodeGen::LoadFromFile(
-                &pool, (*entry)->local_path, module_id, &codegen));
-        codegen->GetSymbols(&(*entry)->symbols);
-    } else {
-        DCHECK_EQ(type, TYPE_JAR);
-        // Nothing to do.
-    }
-#endif
-    return Status::OK;
-}
-
-std::string LibCache::make_local_path(
-        const std::string& hdfs_path, const std::string& local_dir) {
-    // Append the pid and library number to the local directory.
-    boost::filesystem::path src(hdfs_path);
-    std::stringstream dst;
-    dst << local_dir << "/" << src.stem().native() << "." << getpid() << "."
-        << (__sync_fetch_and_add(&_num_libs_copied, 1)) << 
src.extension().native();
-    return dst.str();
-}
-
-}
-
diff --git a/be/src/runtime/lib_cache.h b/be/src/runtime/lib_cache.h
deleted file mode 100644
index 2083875a..00000000
--- a/be/src/runtime/lib_cache.h
+++ /dev/null
@@ -1,189 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef DORIS_BE_RUNTIME_LIB_CACHE_H
-#define DORIS_BE_RUNTIME_LIB_CACHE_H
-
-#include <string>
-#include <regex>
-#include <boost/scoped_ptr.hpp>
-#include <boost/unordered_map.hpp>
-#include <boost/unordered_set.hpp>
-#include <boost/thread/mutex.hpp>
-#include "common/object_pool.h"
-#include "common/status.h"
-
-namespace doris {
-
-class RuntimeState;
-
-/// Process-wide cache of dynamically-linked libraries loaded from HDFS.
-/// These libraries can either be shared objects, llvm modules or jars. For
-/// shared objects, when we load the shared object, we dlopen() it and keep
-/// it in our process. For modules, we store the symbols in the module to
-/// service symbol lookups. We can't cache the module since it (i.e. the 
external
-/// module) is consumed when it is linked with the query codegen module.
-//
-/// Locking strategy: We don't want to grab a big lock across all operations 
since
-/// one of the operations is copying a file from HDFS. With one lock that would
-/// prevent any UDFs from running on the system. Instead, we have a global lock
-/// that is taken when doing the cache lookup, but is not taking during any 
blocking calls.
-/// During the block calls, we take the per-lib lock.
-//
-/// Entry lifetime management: We cannot delete the entry while a query is
-/// using the library. When the caller requests a ptr into the library, they
-/// are given the entry handle and must decrement the ref count when they
-/// are done.
-//
-/// TODO:
-/// - refresh libraries
-/// - better cached module management.
-class LibCache {
-public:
-    struct LibCacheEntry;
-
-    enum LibType {
-        TYPE_SO,      // Shared object
-        TYPE_IR,      // IR intermediate
-        TYPE_JAR,     // Java jar file. We don't care about the contents in 
the BE.
-    };
-
-    static LibCache* instance() { 
-        return LibCache::_s_instance.get(); 
-    }
-
-    /// Calls dlclose on all cached handles.
-    ~LibCache();
-
-    /// Initializes the libcache. Must be called before any other APIs.
-    static Status init();
-
-    /// Gets the local file system path for the library at 'hdfs_lib_file'. If
-    /// this file is not already on the local fs, it copies it and caches the
-    /// result. Returns an error if 'hdfs_lib_file' cannot be copied to the 
local fs.
-    Status get_local_lib_path(const std::string& hdfs_lib_file, LibType type,
-                           std::string* local_path);
-
-    /// Returns status.ok() if the symbol exists in 'hdfs_lib_file', non-ok 
otherwise.
-    /// If 'quiet' is true, the error status for non-Java unfound symbols will 
not be logged.
-    Status check_symbol_exists(
-        const std::string& hdfs_lib_file, LibType type,
-        const std::string& symbol, bool quiet);
-
-    /// Returns a pointer to the function for the given library and symbol.
-    /// If 'hdfs_lib_file' is empty, the symbol is looked up in the impalad 
process.
-    /// Otherwise, 'hdfs_lib_file' should be the HDFS path to a shared library 
(.so) file.
-    /// dlopen handles and symbols are cached.
-    /// Only usable if 'hdfs_lib_file' refers to a shared object.
-    //
-    /// If entry is non-null and *entry is null, *entry will be set to the 
cached entry. If
-    /// entry is non-null and *entry is non-null, *entry will be reused (i.e., 
the use count
-    /// is not increased). The caller must call decrement_use_count(*entry) 
when it is done
-    /// using fn_ptr and it is no longer valid to use fn_ptr.
-    //
-    /// If 'quiet' is true, returned error statuses will not be logged.
-    Status get_so_function_ptr(
-        const std::string& hdfs_lib_file, const std::string& symbol,
-        void** fn_ptr, LibCacheEntry** entry, bool quiet);
-
-    Status get_so_function_ptr(
-            const std::string& hdfs_lib_file, const std::string& symbol,
-            void** fn_ptr, LibCacheEntry** entry) {
-        return get_so_function_ptr(hdfs_lib_file, symbol, fn_ptr, entry, true);
-    }
-
-    /// Marks the entry for 'hdfs_lib_file' as needing to be refreshed if the 
file in HDFS is
-    /// newer than the local cached copied. The refresh will occur the next 
time the entry is
-    /// accessed.
-    void set_needs_refresh(const std::string& hdfs_lib_file);
-
-    /// See comment in get_so_function_ptr().
-    void decrement_use_count(LibCacheEntry* entry);
-
-    /// Removes the cache entry for 'hdfs_lib_file'
-    void remove_entry(const std::string& hdfs_lib_file);
-
-    /// Removes all cached entries.
-    void drop_cache();
-
-private:
-    /// Singleton instance. Instantiated in Init().
-    static boost::scoped_ptr<LibCache> _s_instance;
-
-    /// dlopen() handle for the current process (i.e. impalad).
-    void* _current_process_handle;
-
-    /// The number of libs that have been copied from HDFS to the local FS.
-    /// This is appended to the local fs path to remove collisions.
-    int64_t _num_libs_copied;
-
-    /// Protects _lib_cache. For lock ordering, this lock must always be taken 
before
-    /// the per entry lock.
-    boost::mutex _lock;
-
-    /// Maps HDFS library path => cache entry.
-    /// Entries in the cache need to be explicitly deleted.
-    typedef boost::unordered_map<std::string, LibCacheEntry*> LibMap;
-    LibMap _lib_cache;
-
-    LibCache();
-    LibCache(LibCache const& l); // disable copy ctor
-    LibCache& operator=(LibCache const& l); // disable assignment
-
-    Status init_internal();
-
-    /// Returns the cache entry for 'hdfs_lib_file'. If this library has not 
been
-    /// copied locally, it will copy it and add a new LibCacheEntry to 
'_lib_cache'.
-    /// Result is returned in *entry.
-    /// No locks should be take before calling this. On return the entry's 
lock is
-    /// taken and returned in *entry_lock.
-    /// If an error is returned, there will be no entry in _lib_cache and 
*entry is NULL.
-    Status get_chache_entry(
-        const std::string& hdfs_lib_file, LibType type,
-        boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry);
-
-    /// Implementation to get the cache entry for 'hdfs_lib_file'. Errors are 
returned
-    /// without evicting the cache entry if the status is not OK and *entry is 
not NULL.
-    Status get_chache_entry_internal(
-        const std::string& hdfs_lib_file, LibType type,
-        boost::unique_lock<boost::mutex>* entry_lock, LibCacheEntry** entry);
-
-    // map "palo" to "doris" in symbol, only for grayscale upgrading
-    std::string get_real_symbol(const std::string& symbol) {
-        static std::regex rx1("8palo_udf");
-        std::string str1 = std::regex_replace(symbol, rx1, "9doris_udf");
-        static std::regex rx2("4palo");
-        std::string str2 = std::regex_replace(str1, rx2, "5doris");
-        return str2;
-    }
- 
-    /// Utility function for generating a filename unique to this process and
-    /// 'hdfs_path'. This is to prevent multiple impalad processes or 
different library files
-    /// with the same name from clobbering each other. 'hdfs_path' should be 
the full path
-    /// (including the filename) of the file we're going to copy to the local 
FS, and
-    /// 'local_dir' is the local directory prefix of the returned path.
-    std::string make_local_path(const std::string& hdfs_path, const 
std::string& local_dir);
-
-    /// Implementation to remove an entry from the cache.
-    /// _lock must be held. The entry's lock should not be held.
-    void remove_entry_internal(const std::string& hdfs_lib_file,
-                             const LibMap::iterator& entry_iterator);
-};
-
-}
-
-#endif
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index 7e2ecf58..a21ecf34 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -46,6 +46,30 @@
 
 namespace doris {
 
+RuntimeState::RuntimeState(
+        const TUniqueId& fragment_instance_id,
+        const TQueryOptions& query_options,
+        const std::string& now, ExecEnv* exec_env) :
+            _obj_pool(new ObjectPool()),
+            _data_stream_recvrs_pool(new ObjectPool()),
+            _unreported_error_idx(0),
+            _profile(_obj_pool.get(), "Fragment " + 
print_id(fragment_instance_id)),
+            _fragment_mem_tracker(NULL),
+            _is_cancelled(false),
+            _per_fragment_instance_idx(0),
+            _root_node_id(-1),
+            _num_rows_load_success(0),
+            _num_rows_load_filtered(0),
+            _num_print_error_rows(0),
+            _normal_row_number(0),
+            _error_row_number(0),
+            _error_log_file(nullptr),
+            _is_running(true),
+            _instance_buffer_reservation(new ReservationTracker) {
+    Status status = init(fragment_instance_id, query_options, now, exec_env);
+    DCHECK(status.ok());
+}
+
 RuntimeState::RuntimeState(
         const TExecPlanFragmentParams& fragment_params,
         const TQueryOptions& query_options,
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 7a413db1..f8d46ee5 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -65,6 +65,11 @@ class RowDescriptor;
 // query and shared across all execution nodes of that query.
 class RuntimeState {
 public:
+    // NOTE: only used to unit test
+    RuntimeState(
+        const TUniqueId& fragment_instance_id,
+        const TQueryOptions& query_options,
+        const std::string& now, ExecEnv* exec_env);
 
     RuntimeState(
         const TExecPlanFragmentParams& fragment_params,
diff --git a/be/src/runtime/user_function_cache.cpp 
b/be/src/runtime/user_function_cache.cpp
new file mode 100644
index 00000000..cc3e3bba
--- /dev/null
+++ b/be/src/runtime/user_function_cache.cpp
@@ -0,0 +1,370 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/user_function_cache.h"
+
+#include <vector>
+
+#include <boost/algorithm/string/split.hpp> // boost::split
+#include <boost/algorithm/string/predicate.hpp> // boost::algorithm::ends_with
+#include <boost/algorithm/string/classification.hpp> // boost::is_any_of
+
+#include "http/http_client.h"
+#include "util/dynamic_util.h"
+#include "util/file_utils.h"
+#include "util/md5.h"
+#include "util/spinlock.h"
+
+namespace doris {
+
+static const int kLibShardNum = 128;
+
+// function cache entry, store information for 
+struct UserFunctionCacheEntry {
+    UserFunctionCacheEntry(int64_t fid_, const std::string& checksum_,
+                           const std::string& lib_file_)
+        : function_id(fid_), checksum(checksum_), lib_file(lib_file_) { }
+    ~UserFunctionCacheEntry();
+
+    void ref() { _refs.fetch_add(1); }
+
+    // If unref() returns true, this object should be delete
+    bool unref() { return _refs.fetch_sub(1) == 1; }
+
+    int64_t function_id = 0;
+    // used to check if this library is valid.
+    std::string checksum;
+
+    // library file
+    std::string lib_file;
+
+    // make it atomic variable instead of holding a lock
+    std::atomic<bool> is_loaded{false};
+
+    // Set to true when this library is not needed.
+    // e.g. deleting some unused library to re
+    std::atomic<bool> should_delete_library{false};
+
+    // lock to make sure only one can load this cache
+    std::mutex load_lock;
+
+    // To reduce cache lock held time, cache entry is
+    // added to cache map before library is downloaded.
+    // And this is used to indicate whether library is downloaded.
+    bool is_downloaded = false;
+
+    // used to lookup a symbol
+    void* lib_handle = nullptr;
+
+    SpinLock map_lock;
+    // from symbol_name to function pointer
+    std::unordered_map<std::string, void*> fptr_map;
+
+private:
+    std::atomic<int> _refs{0};
+};
+
+UserFunctionCacheEntry::~UserFunctionCacheEntry() {
+    // close lib_handle if it was opened
+    if (lib_handle != nullptr) {
+        dynamic_close(lib_handle);
+        lib_handle = nullptr;
+    }
+
+    // delete library file if should_delete_library is set
+    if (should_delete_library.load()) {
+        unlink(lib_file.c_str());
+    }
+}
+
+UserFunctionCache::UserFunctionCache() {
+}
+
+UserFunctionCache::~UserFunctionCache() {
+    std::lock_guard<std::mutex> l(_cache_lock);
+    auto it = _entry_map.begin();
+    while (it != _entry_map.end()) {
+        auto entry = it->second;
+        it = _entry_map.erase(it);
+        if (entry->unref()) {
+            delete entry;
+        }
+    }
+}
+
+UserFunctionCache* UserFunctionCache::instance() {
+    static UserFunctionCache s_cache;
+    return &s_cache;
+}
+
+Status UserFunctionCache::init(const std::string& lib_dir) {
+    DCHECK(_lib_dir.empty());
+    _lib_dir = lib_dir;
+    // 1. dynamic open current process
+    RETURN_IF_ERROR(dynamic_open(nullptr, &_current_process_handle));
+    // 2. load all cached 
+    RETURN_IF_ERROR(_load_cached_lib());
+    return Status::OK;
+}
+
+Status UserFunctionCache::_load_entry_from_lib(const std::string& dir, const 
std::string& file) {
+    if (!boost::algorithm::ends_with(file, ".so")) {
+        return Status("unknown library file format");
+    }
+
+    std::vector<std::string> split_parts;
+    boost::split(split_parts, file, boost::is_any_of("."));
+    if (split_parts.size() != 3) {
+        return Status("user function's name should be 
function_id.checksum.so");
+    }
+    int64_t function_id = std::stol(split_parts[0]);
+    std::string checksum = split_parts[1];
+    auto it = _entry_map.find(function_id);
+    if (it != _entry_map.end()) {
+        LOG(WARNING) << "meet a same function id user function library, 
function_id=" << function_id
+            << ", one_checksum=" << checksum << ", other_checksum=" << 
it->second->checksum;
+        return Status("duplicate function id");
+    }
+    // create a cache entry and put it into entry map
+    UserFunctionCacheEntry* entry = new UserFunctionCacheEntry(
+        function_id, checksum, dir + "/" + file);
+    entry->is_downloaded = true;
+
+    entry->ref();
+    _entry_map[function_id] = entry;
+
+    return Status::OK;
+}
+
+Status UserFunctionCache::_load_cached_lib() {
+    // create library directory if not exist
+    RETURN_IF_ERROR(FileUtils::create_dir(_lib_dir));
+
+    auto scan_cb = [this] (const std::string& dir, const std::string& file) {
+        auto st = _load_entry_from_lib(dir, file);
+        if (!st.ok()) {
+            LOG(WARNING) << "load a library failed, dir=" << dir << ", file=" 
<< file;
+        }
+        return true;
+    };
+    for (int i = 0; i < kLibShardNum; ++i) {
+        std::string sub_dir = _lib_dir + "/" + std::to_string(i);
+        RETURN_IF_ERROR(FileUtils::create_dir(sub_dir));
+        RETURN_IF_ERROR(FileUtils::scan_dir(sub_dir, scan_cb));
+    }
+    return Status::OK;
+}
+
+Status UserFunctionCache::get_function_ptr(
+        int64_t fid,
+        const std::string& symbol,
+        const std::string& url,
+        const std::string& checksum,
+        void** fn_ptr,
+        UserFunctionCacheEntry** output_entry) {
+    if (fid == 0) {
+        // Just loading a function ptr in the current process. No need to take 
any locks.
+        RETURN_IF_ERROR(dynamic_lookup(_current_process_handle, 
symbol.c_str(), fn_ptr));
+        return Status::OK;
+    }
+
+    // if we need to unref entry
+    bool need_unref_entry = false;
+    UserFunctionCacheEntry* entry = nullptr;
+    // find the library entry for this function. If *output_entry is not null
+    // find symbol in it without to get other entry
+    if (output_entry != nullptr && *output_entry != nullptr) {
+        entry = *output_entry;
+    } else {
+        RETURN_IF_ERROR(_get_cache_entry(fid, url, checksum, &entry));
+        need_unref_entry = true;
+    }
+
+    Status status;
+    {
+        std::lock_guard<SpinLock> l(entry->map_lock);
+        // now, we have the library entry, we need to lock it to find symbol
+        auto it = entry->fptr_map.find(symbol);
+        if (it != entry->fptr_map.end()) {
+            *fn_ptr = it->second;
+        } else {
+            status = dynamic_lookup(entry->lib_handle, symbol.c_str(), fn_ptr);
+            if (status.ok()) {
+                entry->fptr_map.emplace(symbol, *fn_ptr);
+            } else {
+                LOG(WARNING) << "fail to lookup symbol in library, symbol=" << 
symbol
+                    << ", file=" << entry->lib_file;
+            }
+        }
+    }
+
+    if (status.ok() && output_entry != nullptr && *output_entry == nullptr) {
+        *output_entry = entry;
+        need_unref_entry = false;
+    }
+
+    if (need_unref_entry) {
+        if (entry->unref()) {
+            delete entry;
+        }
+    }
+
+    return status;
+}
+
+Status UserFunctionCache::_get_cache_entry(
+        int64_t fid, const std::string& url,
+        const std::string& checksum, UserFunctionCacheEntry** output_entry) {
+    UserFunctionCacheEntry* entry = nullptr;
+    {
+        std::lock_guard<std::mutex> l(_cache_lock);
+        auto it = _entry_map.find(fid);
+        if (it != _entry_map.end()) {
+            entry = it->second;
+        } else {
+            entry = new UserFunctionCacheEntry(
+                fid, checksum, _make_lib_file(fid, checksum));
+
+            entry->ref();
+            _entry_map.emplace(fid, entry);
+        }
+        entry->ref();
+    }
+    auto st = _load_cache_entry(url, entry);
+    if (!st.ok()) {
+        LOG(WARNING) << "fail to load cache entry, fid=" << fid;
+        // if we load a cache entry failed, I think we should delete this 
entry cache
+        // evenif this cache was valid before.
+        _destroy_cache_entry(entry);
+        return st;
+    }
+
+    *output_entry = entry;
+    return Status::OK;
+}
+
+void UserFunctionCache::_destroy_cache_entry(UserFunctionCacheEntry* entry) {
+    // 1. we remove cache entry from entry map
+    size_t num_removed = 0;
+    {
+        std::lock_guard<std::mutex> l(_cache_lock);
+        num_removed = _entry_map.erase(entry->function_id);
+    }
+    if (num_removed > 0) {
+        entry->unref();
+    }
+    entry->should_delete_library.store(true);
+    // now we need to drop 
+    if (entry->unref()) {
+        delete entry;
+    }
+}
+
+Status UserFunctionCache::_load_cache_entry(
+        const std::string& url, UserFunctionCacheEntry* entry) {
+    if (entry->is_loaded.load()) {
+        return Status::OK;
+    }
+
+    std::unique_lock<std::mutex> l(entry->load_lock);
+    if (!entry->is_downloaded) {
+        RETURN_IF_ERROR(_download_lib(url, entry));
+    }
+
+    RETURN_IF_ERROR(_load_cache_entry_internal(entry));
+    return Status::OK;
+}
+
+// entry's lock must be held
+Status UserFunctionCache::_download_lib(
+        const std::string& url, UserFunctionCacheEntry* entry) {
+    DCHECK(!entry->is_downloaded);
+
+    // get local path to save library
+    std::string tmp_file = entry->lib_file + ".tmp";
+    auto fp_closer = [] (FILE*fp) { fclose(fp); };
+    std::unique_ptr<FILE, decltype(fp_closer)> fp(fopen(tmp_file.c_str(), 
"w"), fp_closer);
+    if (fp == nullptr) {
+        LOG(WARNING) << "fail to open file, file=" << tmp_file
+            << ", error=" << ferror(fp.get());
+        return Status("fail to open file");
+    }
+
+    Md5Digest digest;
+    HttpClient client;
+    RETURN_IF_ERROR(client.init(url));
+    Status status;
+    auto download_cb = [&status, &tmp_file, &fp, &digest] (const void* data, 
size_t length) {
+        digest.update(data, length);
+        auto res = fwrite(data, length, 1, fp.get());
+        if (res != 1) {
+            LOG(WARNING) << "fail to write data to file, file=" << tmp_file
+                << ", error=" << ferror(fp.get());
+            status = Status("fail to write data when download");
+            return false;
+        }
+        return true;
+    };
+    RETURN_IF_ERROR(client.execute(download_cb));
+    RETURN_IF_ERROR(status);
+    digest.digest();
+    if (!boost::iequals(digest.hex(), entry->checksum)) {
+        LOG(WARNING) << "UDF's checksum is not equal, one=" << digest.hex()
+            << ", other=" << entry->checksum;
+        return Status("UDF's library checksum is not match");
+    }
+    // close this file
+    fp.reset();
+
+    // rename temporary file to library file
+    auto ret = rename(tmp_file.c_str(), entry->lib_file.c_str());
+    if (ret != 0) {
+        char buf[64];
+        LOG(WARNING) << "fail to rename file from=" << tmp_file << ", to=" << 
entry->lib_file
+            << ", errno=" << errno << ", errmsg=" << strerror_r(errno, buf, 
64);
+        return Status("fail to rename file");
+    }
+    
+    // check download
+    entry->is_downloaded = true;
+    return Status::OK;
+}
+
+// entry's lock must be held
+Status UserFunctionCache::_load_cache_entry_internal(UserFunctionCacheEntry* 
entry) {
+    RETURN_IF_ERROR(dynamic_open(entry->lib_file.c_str(), &entry->lib_handle));
+    entry->is_loaded.store(true);
+    return Status::OK;
+}
+
+std::string UserFunctionCache::_make_lib_file(int64_t function_id, const 
std::string& checksum) {
+    int shard = function_id % kLibShardNum;
+    std::stringstream ss;
+    ss << _lib_dir << '/' << shard << '/' << function_id << '.' << checksum << 
".so";
+    return ss.str();
+}
+
+void UserFunctionCache::release_entry(UserFunctionCacheEntry* entry) {
+    if (entry == nullptr) {
+        return;
+    }
+    if (entry->unref()) {
+        delete entry;
+    }
+}
+
+}
diff --git a/be/src/runtime/user_function_cache.h 
b/be/src/runtime/user_function_cache.h
new file mode 100644
index 00000000..e53fe46f
--- /dev/null
+++ b/be/src/runtime/user_function_cache.h
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <mutex>
+#include <string>
+#include <unordered_map>
+
+#include "common/status.h"
+
+namespace doris {
+
+struct UserFunctionCacheEntry;
+
+// Used to cache a user function. Theses functions inlcude
+// UDF(User Definfed Function) and UDAF(User Defined Aggregate 
+// Function), and maybe inlucde UDTF(User Defined Table
+// Function) in future. A user defined function may be splitted
+// into several functions, for example, UDAF is splitted into
+// InitFn, MergeFn, FinalizeFn...
+// In Doris, we call UDF/UDAF/UDTF UserFunction, and we call 
+// implement function Fucntion.
+// An UserFunction have a function id, we can find library with 
+// this id. When we add user function into cache, we need to
+// download from URL and check its checksum. So if we find a function
+// with id, this function library is valid. And when user wants to
+// change its implementation(URL), Doris will generate a new function
+// id.
+class UserFunctionCache {
+public:
+    // local_dir is the directory which contain cached library.
+    UserFunctionCache();
+    ~UserFunctionCache();
+
+    // initialize this cache, call this function before others
+    Status init(const std::string& local_path);
+
+    static UserFunctionCache* instance();
+
+    // Return function pointer for given fid and symbol.
+    // If fid is 0, lookup symbol from this doris-be process.
+    // Otherwise find symbol in UserFunction's library.
+    // Found function pointer is returned in fn_ptr, and cache entry
+    // is returned by entry. Client must call release_entry to release
+    // cache entry if didn't need it.
+    // If *entry is not true means that we should find symbol in this
+    // entry.
+    Status get_function_ptr(int64_t fid,
+                           const std::string& symbol,
+                           const std::string& url,
+                           const std::string& checksum,
+                           void** fn_ptr,
+                           UserFunctionCacheEntry** entry);
+    void release_entry(UserFunctionCacheEntry* entry);
+
+private:
+    Status _load_cached_lib();
+    Status _load_entry_from_lib(const std::string& dir, const std::string& 
file);
+    Status _get_cache_entry(
+        int64_t fid, const std::string& url,
+        const std::string& checksum, UserFunctionCacheEntry** output_entry);
+    Status _load_cache_entry(const std::string& url, UserFunctionCacheEntry* 
entry);
+    Status _download_lib(
+        const std::string& url, UserFunctionCacheEntry* entry);
+    Status _load_cache_entry_internal(UserFunctionCacheEntry* entry);
+
+    std::string _make_lib_file(int64_t function_id, const std::string& 
checksum);
+    void _destroy_cache_entry(UserFunctionCacheEntry* entry);
+
+private:
+    std::string _lib_dir;
+    void* _current_process_handle = nullptr;
+
+    std::mutex _cache_lock;
+    std::unordered_map<int64_t, UserFunctionCacheEntry*> _entry_map;
+};
+
+}
diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp
index b1bc645b..a0cd5dca 100644
--- a/be/src/util/file_utils.cpp
+++ b/be/src/util/file_utils.cpp
@@ -94,19 +94,10 @@ Status FileUtils::scan_dir(
     }
     DeferOp close_dir(std::bind<void>(&closedir, dir));
 
-    struct dirent entry;
-    struct dirent* result = nullptr;
     int64_t count = 0;
     while (true) {
-        int ret = readdir_r(dir, &entry, &result);
-        if (ret != 0) {
-            char buf[64];
-            std::stringstream ss;
-            ss << "readdir(" << dir_path << ") failed, because: " << 
strerror_r(errno, buf, 64);
-            return Status(ss.str());
-        }
+        auto result = readdir(dir.get());
         if (result == nullptr) {
-            // Over
             break;
         }
         std::string file_name = result->d_name;
@@ -127,6 +118,36 @@ Status FileUtils::scan_dir(
     return Status::OK;
 }
 
+Status FileUtils::scan_dir(
+        const std::string& dir_path,
+        const std::function<bool(const std::string&, const std::string&)>& 
callback) {
+    auto dir_closer = [] (DIR* dir) { closedir(dir); };
+    std::unique_ptr<DIR, decltype(dir_closer)> dir(opendir(dir_path.c_str()), 
dir_closer);
+    if (dir == nullptr) {
+        char buf[64];
+        LOG(WARNING) << "fail to open dir, dir=" << dir_path << ", errmsg=" << 
strerror_r(errno, buf, 64);
+        return Status("fail to opendir");
+    }
+
+    struct dirent* result = nullptr;
+    while (true) {
+        auto result = readdir(dir.get());
+        if (result == nullptr) {
+            break;
+        }
+        std::string file_name = result->d_name;
+        if (file_name == "." || file_name == "..") {
+            continue; 
+        }
+        auto is_continue = callback(dir_path, file_name);
+        if (!is_continue) {
+            break;
+        }
+    }
+
+    return Status::OK;
+}
+
 bool FileUtils::is_dir(const std::string& path) {
     struct stat path_stat;    
     if (stat(path.c_str(), &path_stat) != 0) {
diff --git a/be/src/util/file_utils.h b/be/src/util/file_utils.h
index b879ccf0..fca04f41 100644
--- a/be/src/util/file_utils.h
+++ b/be/src/util/file_utils.h
@@ -19,6 +19,7 @@
 #define DORIS_BE_UTIL_FILE_UTILS_H
 
 #include <string>
+#include <functional>
 
 #include "common/status.h"
 
@@ -44,6 +45,9 @@ class FileUtils {
     static Status scan_dir(
             const std::string& dir_path, std::vector<std::string>* files,
             int64_t* file_count = nullptr);
+    static Status scan_dir(
+        const std::string& dir_path,
+        const std::function<bool(const std::string&, const std::string&)>& 
callback);
 
     // If the file_path is not exist, or is not a dir, return false.
     static bool is_dir(const std::string& file_path);
diff --git a/be/test/exec/broker_scan_node_test.cpp 
b/be/test/exec/broker_scan_node_test.cpp
index 3485762e..066f468d 100644
--- a/be/test/exec/broker_scan_node_test.cpp
+++ b/be/test/exec/broker_scan_node_test.cpp
@@ -30,7 +30,7 @@
 #include "runtime/descriptors.h"
 #include "runtime/runtime_state.h"
 #include "runtime/row_batch.h"
-#include "runtime/lib_cache.h"
+#include "runtime/user_function_cache.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 
@@ -44,7 +44,7 @@ class BrokerScanNodeTest : public testing::Test {
     }
     void init();
     static void SetUpTestCase() {
-        LibCache::instance()->init();
+        
UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal");
         CastFunctions::init();
     }
 
diff --git a/be/test/exec/broker_scanner_test.cpp 
b/be/test/exec/broker_scanner_test.cpp
index b1d4813d..5182cbbb 100644
--- a/be/test/exec/broker_scanner_test.cpp
+++ b/be/test/exec/broker_scanner_test.cpp
@@ -29,7 +29,7 @@
 #include "runtime/descriptors.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/runtime_state.h"
-#include "runtime/lib_cache.h"
+#include "runtime/user_function_cache.h"
 #include "gen_cpp/Descriptors_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "exprs/cast_functions.h"
@@ -46,7 +46,7 @@ class BrokerScannerTest : public testing::Test {
     void init();
 
     static void SetUpTestCase() {
-        LibCache::instance()->init();
+        
UserFunctionCache::instance()->init("./be/test/runtime/test_data/user_function_cache/normal");
         CastFunctions::init();
     }
 
diff --git a/be/test/runtime/CMakeLists.txt b/be/test/runtime/CMakeLists.txt
index 5c86ad30..a42f23cb 100644
--- a/be/test/runtime/CMakeLists.txt
+++ b/be/test/runtime/CMakeLists.txt
@@ -54,3 +54,4 @@ ADD_BE_TEST(stream_load_pipe_test)
 ADD_BE_TEST(tablet_writer_mgr_test)
 #ADD_BE_TEST(export_task_mgr_test)
 ADD_BE_TEST(snapshot_loader_test)
+ADD_BE_TEST(user_function_cache_test)
diff --git a/be/test/runtime/test_data/user_function_cache/lib/my_add.cc 
b/be/test/runtime/test_data/user_function_cache/lib/my_add.cc
new file mode 100644
index 00000000..683135f5
--- /dev/null
+++ b/be/test/runtime/test_data/user_function_cache/lib/my_add.cc
@@ -0,0 +1,4 @@
+void my_add() {
+}
+void my_del() {
+}
diff --git a/be/test/runtime/user_function_cache_test.cpp 
b/be/test/runtime/user_function_cache_test.cpp
new file mode 100644
index 00000000..a5f5b23d
--- /dev/null
+++ b/be/test/runtime/user_function_cache_test.cpp
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/user_function_cache.h"
+
+#include <gtest/gtest.h>
+
+#include <cstdio>
+#include <cstdlib>
+
+#include "common/logging.h"
+#include "http/ev_http_server.h"
+#include "http/http_channel.h"
+#include "http/http_handler.h"
+#include "http/http_request.h"
+#include "util/md5.h"
+#include "util/file_utils.h"
+
+int main(int argc, char* argv[]);
+
+namespace doris {
+
+bool k_is_downloaded = false;
+class UserFunctionTestHandler : public HttpHandler {
+public:
+    void handle(HttpRequest* req) override {
+        auto& file_name = req->param("FILE");
+        std::string lib_dir = 
"./be/test/runtime/test_data/user_function_cache/lib";
+        auto lib_file = lib_dir + "/" + file_name;
+        FILE* fp = fopen(lib_file.c_str(), "r");
+        if (fp == nullptr) {
+            HttpChannel::send_error(req, INTERNAL_SERVER_ERROR);
+            return;
+        }
+        std::string response;
+        char buf[1024];
+        while (true) {
+            auto size = fread(buf, 1, 1024, fp);
+            response.append(buf, size);
+            if (size < 1024) {
+                break;
+            }
+        }
+        HttpChannel::send_reply(req, response);
+        k_is_downloaded = true;
+        fclose(fp);
+    }
+};
+
+static UserFunctionTestHandler s_test_handler = UserFunctionTestHandler();
+static EvHttpServer* s_server = nullptr;
+
+std::string my_add_md5sum;
+
+static std::string compute_md5(const std::string& file) {
+    FILE* fp = fopen(file.c_str(), "r");
+    Md5Digest md5;
+    char buf[1024];
+    while (true) {
+        auto size = fread(buf, 1, 1024, fp);
+        md5.update(buf, size);
+        if (size < 1024) {
+            break;
+        }
+    }
+    fclose(fp);
+    md5.digest();
+    return md5.hex();
+}
+class UserFunctionCacheTest : public testing::Test {
+public:
+    UserFunctionCacheTest() { }
+    virtual ~UserFunctionCacheTest() { }
+    static void SetUpTestCase() {
+        s_server = new EvHttpServer(29386);
+        s_server->register_handler(GET, "/{FILE}", &s_test_handler);
+        s_server->start();
+
+        // compile code to so
+        system("g++ -shared 
./be/test/runtime/test_data/user_function_cache/lib/my_add.cc -o 
./be/test/runtime/test_data/user_function_cache/lib/my_add.so");
+
+        my_add_md5sum = 
compute_md5("./be/test/runtime/test_data/user_function_cache/lib/my_add.so");
+    }
+    static void TearDownTestCase() {
+        delete s_server;
+        system("rm -rf 
./be/test/runtime/test_data/user_function_cache/lib/my_add.so");
+    }
+    void SetUp() override {
+        k_is_downloaded = false;
+    }
+};
+
+TEST_F(UserFunctionCacheTest, process_symbol) {
+    UserFunctionCache cache;
+    std::string lib_dir = 
"./be/test/runtime/test_data/user_function_cache/normal";
+    auto st = cache.init(lib_dir);
+    ASSERT_TRUE(st.ok());
+    void* fn_ptr = nullptr;
+    UserFunctionCacheEntry* entry = nullptr;
+    st = cache.get_function_ptr(0, "main", "", "", &fn_ptr, &entry);
+    ASSERT_TRUE(st.ok());
+    ASSERT_EQ(&main, fn_ptr);
+    ASSERT_EQ(nullptr, entry);
+    cache.release_entry(entry);
+}
+
+TEST_F(UserFunctionCacheTest, download_normal) {
+    UserFunctionCache cache;
+    std::string lib_dir = 
"./be/test/runtime/test_data/user_function_cache/download";
+    FileUtils::remove_all(lib_dir);
+
+    auto st = cache.init(lib_dir);
+    ASSERT_TRUE(st.ok());
+    void* fn_ptr = nullptr;
+    UserFunctionCacheEntry* entry = nullptr;
+    // get my_add
+    st = cache.get_function_ptr(1,
+                                "_Z6my_addv",
+                                "http://127.0.0.1:29386/my_add.so";,
+                                my_add_md5sum, &fn_ptr, &entry);
+    ASSERT_TRUE(st.ok());
+    ASSERT_TRUE(k_is_downloaded);
+    ASSERT_NE(nullptr, fn_ptr);
+    ASSERT_NE(nullptr, entry);
+
+    // get my_del
+    st = cache.get_function_ptr(1,
+                                "_Z6my_delv",
+                                "http://127.0.0.1:29386/my_add.so";,
+                                my_add_md5sum, &fn_ptr, &entry);
+    ASSERT_TRUE(st.ok());
+    ASSERT_NE(nullptr, fn_ptr);
+    ASSERT_NE(nullptr, entry);
+
+    // get my_mul
+    st = cache.get_function_ptr(1,
+                                "_Z6my_mulv",
+                                "http://127.0.0.1:29386/my_add.so";,
+                                my_add_md5sum, &fn_ptr, &entry);
+    ASSERT_FALSE(st.ok());
+
+    cache.release_entry(entry);
+}
+
+TEST_F(UserFunctionCacheTest, load_normal) {
+    UserFunctionCache cache;
+    std::string lib_dir = 
"./be/test/runtime/test_data/user_function_cache/download";
+    auto st = cache.init(lib_dir);
+    ASSERT_TRUE(st.ok());
+    void* fn_ptr = nullptr;
+    UserFunctionCacheEntry* entry = nullptr;
+    st = cache.get_function_ptr(1,
+                                "_Z6my_addv",
+                                "http://127.0.0.1:29386/my_add.so";,
+                                my_add_md5sum, &fn_ptr, &entry);
+    ASSERT_TRUE(st.ok());
+    ASSERT_FALSE(k_is_downloaded);
+    ASSERT_NE(nullptr, fn_ptr);
+    ASSERT_NE(nullptr, entry);
+    cache.release_entry(entry);
+}
+
+TEST_F(UserFunctionCacheTest, download_fail) {
+    UserFunctionCache cache;
+    std::string lib_dir = 
"./be/test/runtime/test_data/user_function_cache/download";
+    auto st = cache.init(lib_dir);
+    ASSERT_TRUE(st.ok());
+    void* fn_ptr = nullptr;
+    UserFunctionCacheEntry* entry = nullptr;
+    st = cache.get_function_ptr(2,
+                                "_Z6my_delv",
+                                "http://127.0.0.1:29386/my_del.so";,
+                                my_add_md5sum, &fn_ptr, &entry);
+    ASSERT_FALSE(st.ok());
+}
+
+TEST_F(UserFunctionCacheTest, md5_fail) {
+    UserFunctionCache cache;
+    std::string lib_dir = 
"./be/test/runtime/test_data/user_function_cache/download";
+    FileUtils::remove_all(lib_dir);
+
+    auto st = cache.init(lib_dir);
+    ASSERT_TRUE(st.ok());
+    void* fn_ptr = nullptr;
+    UserFunctionCacheEntry* entry = nullptr;
+    st = cache.get_function_ptr(1,
+                                "_Z6my_addv",
+                                "http://127.0.0.1:29386/my_add.so";,
+                                "1234", &fn_ptr, &entry);
+    ASSERT_FALSE(st.ok());
+}
+
+TEST_F(UserFunctionCacheTest, bad_so) {
+    UserFunctionCache cache;
+    std::string lib_dir = 
"./be/test/runtime/test_data/user_function_cache/bad";
+    FileUtils::create_dir(lib_dir + "/2");
+    auto so_file = lib_dir + "/2/2.abc.so";
+    FILE* fp = fopen(so_file.c_str(), "w");
+    fwrite(&fp, sizeof(FILE*), 1, fp);
+    fclose(fp);
+    auto st = cache.init(lib_dir);
+    ASSERT_TRUE(st.ok());
+    void* fn_ptr = nullptr;
+    UserFunctionCacheEntry* entry = nullptr;
+    st = cache.get_function_ptr(2,
+                                "_Z6my_addv",
+                                "http://127.0.0.1:29386/my_add.so";,
+                                "abc", &fn_ptr, &entry);
+    ASSERT_FALSE(st.ok());
+}
+
+}
+
+int main(int argc, char* argv[]) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 15271c39..c9934ff7 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -285,6 +285,7 @@ struct TFunction {
   10: optional TAggregateFunction aggregate_fn
 
   11: optional i64 id
+  12: optional string checksum
 }
 
 enum TLoadJobState {
diff --git a/run-ut.sh b/run-ut.sh
index cc170707..c3e71075 100755
--- a/run-ut.sh
+++ b/run-ut.sh
@@ -175,8 +175,9 @@ ${DORIS_TEST_BINARY_DIR}/runtime/free_list_test
 ${DORIS_TEST_BINARY_DIR}/runtime/string_buffer_test
 ${DORIS_TEST_BINARY_DIR}/runtime/stream_load_pipe_test
 ${DORIS_TEST_BINARY_DIR}/runtime/tablet_writer_mgr_test
-## Running expr Unittest
 ${DORIS_TEST_BINARY_DIR}/runtime/snapshot_loader_test
+${DORIS_TEST_BINARY_DIR}/runtime/user_function_cache_test
+## Running expr Unittest
 
 # Running http
 ${DORIS_TEST_BINARY_DIR}/http/metrics_action_test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@doris.apache.org
For additional commands, e-mail: dev-h...@doris.apache.org

Reply via email to