This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1-lakehouse in repository https://gitbox.apache.org/repos/asf/doris.git
commit 58791a18002487b1b455d7917a80d578c2194022 Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com> AuthorDate: Wed Jun 26 14:27:27 2024 +0800 [improve](udf) support java-udf static load (#34980) ``` CREATE FUNCTION print_12() RETURNS int PROPERTIES ( "symbol" = "org.apache.doris.udf.AddOne", "type" = "JAVA_UDF", "file" = "file:///mnt/ava-udf-demo-jar-with-dependencies.jar", "static_load" = "true", // default value is false "expiration_time" = "60" // default value is 360 minutes ); ``` if set static_load=true, the udf ClassLoader will be cache, and will be clear when expiration timeout or use drop function stmt. --- be/src/agent/agent_server.cpp | 3 + be/src/agent/agent_server.h | 1 + be/src/agent/task_worker_pool.cpp | 8 ++ be/src/agent/task_worker_pool.h | 2 + be/src/util/jni-util.cpp | 20 +++++ be/src/util/jni-util.h | 3 + .../doris/common/classloader/ScannerLoader.java | 18 ++++ .../apache/doris/common/jni/utils/ExpiringMap.java | 91 +++++++++++++++++++++ .../java/org/apache/doris/udf/BaseExecutor.java | 1 + .../java/org/apache/doris/udf/UdafExecutor.java | 2 + .../java/org/apache/doris/udf/UdfExecutor.java | 44 +++++++--- .../main/java/org/apache/doris/common/Config.java | 6 ++ .../apache/doris/analysis/CreateFunctionStmt.java | 26 ++++-- .../apache/doris/analysis/DropFunctionStmt.java | 9 ++ .../main/java/org/apache/doris/catalog/Env.java | 14 ++++ .../java/org/apache/doris/catalog/Function.java | 3 + .../trees/expressions/functions/udf/JavaUdf.java | 3 +- .../java/org/apache/doris/task/AgentBatchTask.java | 10 +++ .../org/apache/doris/task/CleanUDFCacheTask.java | 41 ++++++++++ gensrc/thrift/AgentService.thrift | 5 ++ gensrc/thrift/Types.thrift | 5 +- .../javaudf_p0/test_javaudf_static_load_test.out | Bin 0 -> 478 bytes .../java/org/apache/doris/udf/StaticIntTest.java | 29 +++++++ .../test_javaudf_static_load_test.groovy | 81 ++++++++++++++++++ 24 files changed, 404 insertions(+), 21 deletions(-) diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index 5355c037b19..99df91f6bd6 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -167,6 +167,9 @@ void AgentServer::start_workers(ExecEnv* exec_env) { _update_visible_version_workers = std::make_unique<TaskWorkerPool>( "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); }); + _clean_udf_cache_workers = std::make_unique<TaskWorkerPool>( + "CLEAN_UDF_CACHE", 1, [](auto&& task) {return clean_udf_cache_callback(task); }); + // clang-format on } diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index 9f3d91d5621..abe00c6556f 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -98,6 +98,7 @@ private: std::unique_ptr<TaskWorkerPool> _gc_binlog_workers; std::unique_ptr<TaskWorkerPool> _clean_trash_workers; std::unique_ptr<TaskWorkerPool> _update_visible_version_workers; + std::unique_ptr<TaskWorkerPool> _clean_udf_cache_workers; }; } // end namespace doris diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index f3302fc2f3e..927d7c64536 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -79,6 +79,7 @@ #include "service/backend_options.h" #include "util/debug_points.h" #include "util/doris_metrics.h" +#include "util/jni-util.h" #include "util/mem_info.h" #include "util/random.h" #include "util/s3_util.h" @@ -1807,4 +1808,11 @@ void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req) { LOG(INFO) << "clean trash finish"; } +void clean_udf_cache_callback(const TAgentTaskRequest& req) { + LOG(INFO) << "clean udf cache start: " << req.clean_udf_cache_req.function_signature; + static_cast<void>( + JniUtil::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature)); + LOG(INFO) << "clean udf cache finish: " << req.clean_udf_cache_req.function_signature; +} + } // namespace doris diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h index 14d9ff32686..45d49592c2f 100644 --- a/be/src/agent/task_worker_pool.h +++ b/be/src/agent/task_worker_pool.h @@ -163,6 +163,8 @@ void gc_binlog_callback(StorageEngine& engine, const TAgentTaskRequest& req); void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req); +void clean_udf_cache_callback(const TAgentTaskRequest& req); + void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& req); void report_task_callback(const TMasterInfo& master_info); diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp index 01409fb3ea5..8259bb21c00 100644 --- a/be/src/util/jni-util.cpp +++ b/be/src/util/jni-util.cpp @@ -173,6 +173,7 @@ jmethodID JniUtil::get_jvm_threads_id_ = nullptr; jmethodID JniUtil::get_jmx_json_ = nullptr; jobject JniUtil::jni_scanner_loader_obj_ = nullptr; jmethodID JniUtil::jni_scanner_loader_method_ = nullptr; +jmethodID JniUtil::_clean_udf_cache_method_id = nullptr; Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* out) { DCHECK(jstr != nullptr); @@ -400,6 +401,25 @@ Status JniUtil::init_jni_scanner_loader(JNIEnv* env) { } env->CallVoidMethod(jni_scanner_loader_obj_, load_jni_scanner); RETURN_ERROR_IF_EXC(env); + + _clean_udf_cache_method_id = env->GetMethodID(jni_scanner_loader_cls, "cleanUdfClassLoader", + "(Ljava/lang/String;)V"); + if (_clean_udf_cache_method_id == nullptr) { + if (env->ExceptionOccurred()) { + env->ExceptionDescribe(); + } + return Status::InternalError("Failed to find removeUdfClassLoader method."); + } + RETURN_ERROR_IF_EXC(env); + return Status::OK(); +} + +Status JniUtil::clean_udf_class_load_cache(const std::string& function_signature) { + JNIEnv* env = nullptr; + RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env)); + env->CallVoidMethod(jni_scanner_loader_obj_, _clean_udf_cache_method_id, + env->NewStringUTF(function_signature.c_str())); + RETURN_ERROR_IF_EXC(env); return Status::OK(); } diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h index 9a9da4ab04b..de26f882a52 100644 --- a/be/src/util/jni-util.h +++ b/be/src/util/jni-util.h @@ -24,6 +24,7 @@ #include <stdint.h> #include <string> +#include <unordered_map> #include "common/status.h" #include "jni_md.h" @@ -102,6 +103,7 @@ public: static Status get_jni_scanner_class(JNIEnv* env, const char* classname, jclass* loaded_class); static jobject convert_to_java_map(JNIEnv* env, const std::map<std::string, std::string>& map); static std::map<std::string, std::string> convert_to_cpp_map(JNIEnv* env, jobject map); + static Status clean_udf_class_load_cache(const std::string& function_signature); private: static Status GetJNIEnvSlowPath(JNIEnv** env); @@ -121,6 +123,7 @@ private: static jmethodID jni_scanner_loader_method_; // Thread-local cache of the JNIEnv for this thread. static __thread JNIEnv* tls_env_; + static jmethodID _clean_udf_cache_method_id; }; /// Helper class for lifetime management of chars from JNI, releasing JNI chars when diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java index 67f8ea416c2..59018da3fb4 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java @@ -17,6 +17,7 @@ package org.apache.doris.common.classloader; +import org.apache.doris.common.jni.utils.ExpiringMap; import org.apache.doris.common.jni.utils.Log4jOutputStream; import com.google.common.collect.Streams; @@ -45,7 +46,9 @@ import java.util.stream.Collectors; * BE will load scanners by JNI call, and then the JniConnector on BE will get scanner class by getLoadedClass. */ public class ScannerLoader { + public static final Logger LOG = Logger.getLogger(ScannerLoader.class); private static final Map<String, Class<?>> loadedClasses = new HashMap<>(); + private static final ExpiringMap<String, ClassLoader> udfLoadedClasses = new ExpiringMap<String, ClassLoader>(); private static final String CLASS_SUFFIX = ".class"; private static final String LOAD_PACKAGE = "org.apache.doris"; @@ -76,6 +79,21 @@ public class ScannerLoader { System.setErr(errorPrintStream); } + public static ClassLoader getUdfClassLoader(String functionSignature) { + return udfLoadedClasses.get(functionSignature); + } + + public static synchronized void cacheClassLoader(String functionSignature, ClassLoader classLoader, + long expirationTime) { + LOG.info("cacheClassLoader for: " + functionSignature); + udfLoadedClasses.put(functionSignature, classLoader, expirationTime * 60 * 1000L); + } + + public synchronized void cleanUdfClassLoader(String functionSignature) { + LOG.info("cleanUdfClassLoader for: " + functionSignature); + udfLoadedClasses.remove(functionSignature); + } + /** * Get loaded class for JNI scanners * @param className JNI scanner class name diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java new file mode 100644 index 00000000000..f08b50a0c42 --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java @@ -0,0 +1,91 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.common.jni.utils; + +import org.apache.log4j.Logger; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +public class ExpiringMap<K, V> { + private final ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>(); // key --> value + private final ConcurrentHashMap<K, Long> ttlMap = new ConcurrentHashMap<>(); // key --> ttl interval + // key --> expirationTime(ttl interval + currentTimeMillis) + private final ConcurrentHashMap<K, Long> expirationMap = new ConcurrentHashMap<>(); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private static final long DEFAULT_INTERVAL_TIME = 10 * 60 * 1000L; // 10 minutes + public static final Logger LOG = Logger.getLogger(ExpiringMap.class); + + public ExpiringMap() { + startExpirationTask(); + } + + public void put(K key, V value, long expirationTimeMs) { + long expirationTime = System.currentTimeMillis() + expirationTimeMs; + map.put(key, value); + expirationMap.put(key, expirationTime); + ttlMap.put(key, expirationTimeMs); + } + + public V get(K key) { + Long expirationTime = expirationMap.get(key); + if (expirationTime == null || System.currentTimeMillis() > expirationTime) { + map.remove(key); + expirationMap.remove(key); + ttlMap.remove(key); + return null; + } + // reset time again + long ttl = ttlMap.get(key); + long newExpirationTime = System.currentTimeMillis() + ttl; + expirationMap.put(key, newExpirationTime); + return map.get(key); + } + + private void startExpirationTask() { + scheduler.scheduleAtFixedRate(() -> { + long now = System.currentTimeMillis(); + for (K key : expirationMap.keySet()) { + if (expirationMap.get(key) <= now) { + map.remove(key); + expirationMap.remove(key); + ttlMap.remove(key); + } + } + }, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MINUTES); + } + + public void remove(K key) { + map.remove(key); + expirationMap.remove(key); + ttlMap.remove(key); + } + + public void shutdown() { + scheduler.shutdown(); + try { + if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) { + scheduler.shutdownNow(); + } + } catch (InterruptedException e) { + scheduler.shutdownNow(); + } + } +} diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java index 8ad171d6013..2d641f74354 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java @@ -146,6 +146,7 @@ public abstract class BaseExecutor { // We are now un-usable (because the class loader has been // closed), so null out method_ and classLoader_. classLoader = null; + methodAccess = null; } protected ColumnValueConverter getInputConverter(TPrimitiveType primitiveType, Class clz) { diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java index cf0021c7db0..1eeef08c332 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java @@ -73,6 +73,8 @@ public class UdafExecutor extends BaseExecutor { outputTable.close(); } super.close(); + allMethods = null; + stateObjMap = null; } private Map<Integer, ColumnValueConverter> getInputConverters(int numColumns) { diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java index 7e44cd3e423..5a84e9185bd 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java @@ -19,6 +19,7 @@ package org.apache.doris.udf; import org.apache.doris.catalog.Type; import org.apache.doris.common.Pair; +import org.apache.doris.common.classloader.ScannerLoader; import org.apache.doris.common.exception.UdfRuntimeException; import org.apache.doris.common.jni.utils.JavaUdfDataType; import org.apache.doris.common.jni.utils.UdfUtils; @@ -31,6 +32,7 @@ import com.google.common.base.Joiner; import com.google.common.collect.Lists; import org.apache.log4j.Logger; +import java.io.FileNotFoundException; import java.lang.reflect.Array; import java.lang.reflect.Constructor; import java.lang.reflect.Method; @@ -50,6 +52,8 @@ public class UdfExecutor extends BaseExecutor { private VectorTable outputTable = null; + private boolean isStaticLoad = false; + /** * Create a UdfExecutor, using parameters from a serialized thrift object. Used by * the backend. @@ -70,7 +74,9 @@ public class UdfExecutor extends BaseExecutor { // We are now un-usable (because the class loader has been // closed), so null out method_ and classLoader_. method = null; - super.close(); + if (!isStaticLoad) { + super.close(); + } } private Map<Integer, ColumnValueConverter> getInputConverters(int numColumns) { @@ -134,6 +140,28 @@ public class UdfExecutor extends BaseExecutor { return null; // Method not found } + public ClassLoader getClassLoader(String jarPath, String signature, long expirationTime) + throws MalformedURLException, FileNotFoundException { + ClassLoader loader = null; + if (jarPath == null) { + // for test + loader = ClassLoader.getSystemClassLoader(); + } else { + if (isStaticLoad) { + loader = ScannerLoader.getUdfClassLoader(signature); + } + if (loader == null) { + ClassLoader parent = getClass().getClassLoader(); + classLoader = UdfUtils.getClassLoader(jarPath, parent); + loader = classLoader; + if (isStaticLoad) { + ScannerLoader.cacheClassLoader(signature, loader, expirationTime); + } + } + } + return loader; + } + // Preallocate the input objects that will be passed to the underlying UDF. // These objects are allocated once and reused across calls to evaluate() @Override @@ -145,16 +173,12 @@ public class UdfExecutor extends BaseExecutor { if (LOG.isDebugEnabled()) { LOG.debug("Loading UDF '" + className + "' from " + jarPath); } - ClassLoader loader; - if (jarPath != null) { - // Save for cleanup. - ClassLoader parent = getClass().getClassLoader(); - classLoader = UdfUtils.getClassLoader(jarPath, parent); - loader = classLoader; - } else { - // for test - loader = ClassLoader.getSystemClassLoader(); + isStaticLoad = request.getFn().isSetIsStaticLoad() && request.getFn().is_static_load; + long expirationTime = 360L; // default is 6 hours + if (request.getFn().isSetExpirationTime()) { + expirationTime = request.getFn().getExpirationTime(); } + ClassLoader loader = getClassLoader(jarPath, request.getFn().getSignature(), expirationTime); Class<?> c = Class.forName(className, true, loader); methodAccess = MethodAccess.get(c); Constructor<?> ctor = c.getConstructor(); diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java index de749bc08be..a367c13c75b 100644 --- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java +++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java @@ -2891,6 +2891,12 @@ public class Config extends ConfigBase { "Stream_Load When importing, the maximum length of label is limited"}) public static int label_regex_length = 128; + @ConfField + public static boolean enable_java_udf_static_load = false; + + @ConfField + public static long java_udf_load_expiration_time_min = 360; + //========================================================================== // end of cloud config //========================================================================== diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java index 3db95cb6a9f..cdfc6dc576f 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java @@ -97,6 +97,9 @@ public class CreateFunctionStmt extends DdlStmt { public static final String STATE_CLASS_NAME = "State"; // add for java udf check return type nullable mode, always_nullable or always_not_nullable public static final String IS_RETURN_NULL = "always_nullable"; + // iff is static load, BE will be cache the udf class load, so only need load once + public static final String IS_STATIC_LOAD = "static_load"; + public static final String EXPIRATION_TIME = "expiration_time"; private static final Logger LOG = LogManager.getLogger(CreateFunctionStmt.class); private SetType type = SetType.DEFAULT; @@ -262,20 +265,25 @@ public class CreateFunctionStmt extends DdlStmt { if (binaryType == TFunctionBinaryType.JAVA_UDF) { FunctionUtil.checkEnableJavaUdf(); - String returnNullModeStr = properties.get(IS_RETURN_NULL); - if (returnNullModeStr == null) { - return; - } - if (!returnNullModeStr.equalsIgnoreCase("false") && !returnNullModeStr.equalsIgnoreCase("true")) { - throw new AnalysisException("'always_nullable' in properties, you should set it false or true"); - } - - if (!Boolean.parseBoolean(returnNullModeStr)) { + // always_nullable the default value is true, equal null means true + Boolean isReturnNull = parseBooleanFromProperties(IS_RETURN_NULL); + if (isReturnNull != null && !isReturnNull) { returnNullMode = NullableMode.ALWAYS_NOT_NULLABLE; } } } + private Boolean parseBooleanFromProperties(String propertyString) throws AnalysisException { + String valueOfString = properties.get(propertyString); + if (valueOfString == null) { + return null; + } + if (!valueOfString.equalsIgnoreCase("false") && !valueOfString.equalsIgnoreCase("true")) { + throw new AnalysisException(propertyString + " in properties, you should set it false or true"); + } + return Boolean.parseBoolean(valueOfString); + } + private void computeObjectChecksum() throws IOException, NoSuchAlgorithmException { if (FeConstants.runningUnitTest) { // skip checking checksum when running ut diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java index beeed11c870..9f3d297b6b6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java @@ -25,6 +25,8 @@ import org.apache.doris.common.UserException; import org.apache.doris.mysql.privilege.PrivPredicate; import org.apache.doris.qe.ConnectContext; +import com.google.common.base.Joiner; + public class DropFunctionStmt extends DdlStmt { private final boolean ifExists; private final FunctionName functionName; @@ -81,6 +83,13 @@ public class DropFunctionStmt extends DdlStmt { return stringBuilder.toString(); } + public String signatureString() { + StringBuilder sb = new StringBuilder(); + sb.append(functionName.getFunction()).append("(").append(Joiner.on(", ").join(argsDef.getArgTypes())); + sb.append(")"); + return sb.toString(); + } + @Override public RedirectStatus getRedirectStatus() { return RedirectStatus.FORWARD_WITH_SYNC; diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java index 58f8c1e6a90..5aa435813f3 100755 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java @@ -267,6 +267,7 @@ import org.apache.doris.system.SystemInfoService.HostInfo; import org.apache.doris.task.AgentBatchTask; import org.apache.doris.task.AgentTaskExecutor; import org.apache.doris.task.CleanTrashTask; +import org.apache.doris.task.CleanUDFCacheTask; import org.apache.doris.task.CompactionTask; import org.apache.doris.task.DropReplicaTask; import org.apache.doris.task.MasterTaskExecutor; @@ -5563,6 +5564,7 @@ public class Env { Database db = getInternalCatalog().getDbOrDdlException(name.getDb()); db.dropFunction(stmt.getFunction(), stmt.isIfExists()); } + cleanUDFCacheTask(stmt); // BE will cache classload, when drop function, BE need clear cache } public void replayDropFunction(FunctionSearchDesc functionSearchDesc) throws MetaNotFoundException { @@ -6087,6 +6089,18 @@ public class Env { AgentTaskExecutor.submit(batchTask); } + public void cleanUDFCacheTask(DropFunctionStmt stmt) { + ImmutableMap<Long, Backend> backendsInfo = Env.getCurrentSystemInfo().getIdToBackend(); + String functionSignature = stmt.signatureString(); + AgentBatchTask batchTask = new AgentBatchTask(); + for (Backend backend : backendsInfo.values()) { + CleanUDFCacheTask cleanUDFCacheTask = new CleanUDFCacheTask(backend.getId(), functionSignature); + batchTask.addTask(cleanUDFCacheTask); + LOG.info("clean udf cache in be {}, beId {}", backend.getHost(), backend.getId()); + } + AgentTaskExecutor.submit(batchTask); + } + public void setPartitionVersion(AdminSetPartitionVersionStmt stmt) throws DdlException { String database = stmt.getDatabase(); String table = stmt.getTable(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java index 7dbf3a0ec0a..54a3fd49e30 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java +++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java @@ -22,6 +22,7 @@ import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.FunctionName; import org.apache.doris.analysis.FunctionParams; import org.apache.doris.common.AnalysisException; +import org.apache.doris.common.Config; import org.apache.doris.common.FeMetaVersion; import org.apache.doris.common.UserException; import org.apache.doris.common.io.IOUtils; @@ -563,6 +564,8 @@ public class Function implements Writable { fn.setChecksum(checksum); } fn.setVectorized(vectorized); + fn.setIsStaticLoad(Config.enable_java_udf_static_load); + fn.setExpirationTime(Config.java_udf_load_expiration_time_min); return fn; } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java index 3ed770e12ed..0b49e50a346 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java @@ -104,7 +104,8 @@ public class JavaUdf extends ScalarFunction implements ExplicitlyCastableSignatu public JavaUdf withChildren(List<Expression> children) { Preconditions.checkArgument(children.size() == this.children.size()); return new JavaUdf(getName(), functionId, dbName, binaryType, signature, nullableMode, - objectFile, symbol, prepareFn, closeFn, checkSum, children.toArray(new Expression[0])); + objectFile, symbol, prepareFn, closeFn, checkSum, + children.toArray(new Expression[0])); } /** diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java index ebfdb28a16b..8507f27a896 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java +++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java @@ -29,6 +29,7 @@ import org.apache.doris.thrift.TAlterInvertedIndexReq; import org.apache.doris.thrift.TAlterTabletReqV2; import org.apache.doris.thrift.TCheckConsistencyReq; import org.apache.doris.thrift.TCleanTrashReq; +import org.apache.doris.thrift.TCleanUDFCacheReq; import org.apache.doris.thrift.TClearAlterTaskRequest; import org.apache.doris.thrift.TClearTransactionTaskRequest; import org.apache.doris.thrift.TCloneReq; @@ -441,6 +442,15 @@ public class AgentBatchTask implements Runnable { tAgentTaskRequest.setVisibleVersionReq(request); return tAgentTaskRequest; } + case CLEAN_UDF_CACHE: { + CleanUDFCacheTask cleanUDFCacheTask = (CleanUDFCacheTask) task; + TCleanUDFCacheReq request = cleanUDFCacheTask.toThrift(); + if (LOG.isDebugEnabled()) { + LOG.debug(request.toString()); + } + tAgentTaskRequest.setCleanUdfCacheReq(request); + return tAgentTaskRequest; + } default: if (LOG.isDebugEnabled()) { LOG.debug("could not find task type for task [{}]", task); diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java b/fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java new file mode 100644 index 00000000000..63ed5473bf1 --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java @@ -0,0 +1,41 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.task; + +import org.apache.doris.thrift.TCleanUDFCacheReq; +import org.apache.doris.thrift.TTaskType; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + + +public class CleanUDFCacheTask extends AgentTask { + private static final Logger LOG = LogManager.getLogger(CleanUDFCacheTask.class); + private String functionSignature; + + public CleanUDFCacheTask(long backendId, String signature) { + super(null, backendId, TTaskType.CLEAN_UDF_CACHE, -1, -1, -1, -1, -1, -1, -1); + this.functionSignature = signature; + } + + public TCleanUDFCacheReq toThrift() { + TCleanUDFCacheReq req = new TCleanUDFCacheReq(); + req.setFunctionSignature(this.functionSignature); + return req; + } +} diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index e879b54615d..b7f273b6e44 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -103,6 +103,10 @@ struct TPushStoragePolicyReq { struct TCleanTrashReq {} +struct TCleanUDFCacheReq { + 1: optional string function_signature //function_name(arg_type) +} + enum TCompressionType { UNKNOWN_COMPRESSION = 0, DEFAULT_COMPRESSION = 1, @@ -503,6 +507,7 @@ struct TAgentTaskRequest { 33: optional TGcBinlogReq gc_binlog_req 34: optional TCleanTrashReq clean_trash_req 35: optional TVisibleVersionReq visible_version_req + 36: optional TCleanUDFCacheReq clean_udf_cache_req } struct TAgentResult { diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift index 89a67fdba15..8639a1a1476 100644 --- a/gensrc/thrift/Types.thrift +++ b/gensrc/thrift/Types.thrift @@ -224,7 +224,8 @@ enum TTaskType { ALTER_INVERTED_INDEX, GC_BINLOG, CLEAN_TRASH, - UPDATE_VISIBLE_VERSION + UPDATE_VISIBLE_VERSION, + CLEAN_UDF_CACHE; } enum TStmtType { @@ -380,6 +381,8 @@ struct TFunction { 11: optional i64 id 12: optional string checksum 13: optional bool vectorized = false + 15: optional bool is_static_load = false + 16: optional i64 expiration_time //minutes } enum TJdbcOperation { diff --git a/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out b/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out new file mode 100644 index 00000000000..9f57f52d091 Binary files /dev/null and b/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out differ diff --git a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTest.java b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTest.java new file mode 100644 index 00000000000..e226c0e6029 --- /dev/null +++ b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTest.java @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.udf; + +public class StaticIntTest { + static { + System.out.println("static load should only print once"); + } + private static int value = 0; + public Integer evaluate() { + value = value + 1; + return value; + } +} diff --git a/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy b/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy new file mode 100644 index 00000000000..c816ec90292 --- /dev/null +++ b/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy @@ -0,0 +1,81 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import org.codehaus.groovy.runtime.IOGroovyMethods + +import java.nio.charset.StandardCharsets +import java.nio.file.Files +import java.nio.file.Paths + +suite("test_javaudf_static_load_test") { + def tableName = "test_javaudf_static_load_test" + def jarPath = """${context.file.parent}/jars/java-udf-case-jar-with-dependencies.jar""" + + + log.info("Jar path: ${jarPath}".toString()) + try { + sql """ DROP TABLE IF EXISTS ${tableName} """ + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + `user_id` INT NOT NULL COMMENT "用户id", + `char_col` CHAR NOT NULL COMMENT "", + `varchar_col` VARCHAR(10) NOT NULL COMMENT "", + `string_col` STRING NOT NULL COMMENT "" + ) + DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1"); + """ + StringBuilder sb = new StringBuilder() + int i = 1 + for (; i < 9; i ++) { + sb.append(""" + (${i % 3}, '${i}','abcdefg${i}','poiuytre${i}abcdefg'), + """) + } + sb.append(""" + (${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg') + """) + sql """ INSERT INTO ${tableName} VALUES + ${sb.toString()} + """ + qt_select_default """ SELECT * FROM ${tableName} t ORDER BY char_col; """ + + File path = new File(jarPath) + if (!path.exists()) { + throw new IllegalStateException("""${jarPath} doesn't exist! """) + } + + sql """ CREATE FUNCTION static_load_test() RETURNS int PROPERTIES ( + "file"="file://${jarPath}", + "symbol"="org.apache.doris.udf.StaticIntTest", + "static_load"="true", + "expiration_time"="10", + "type"="JAVA_UDF" + ); """ + + sql """set parallel_pipeline_task_num = 1; """ + qt_select1 """ SELECT static_load_test(); """ + qt_select2 """ SELECT static_load_test(); """ + qt_select3 """ SELECT static_load_test(); """ + qt_select4 """ SELECT static_load_test(); """ + qt_select5 """ SELECT static_load_test(); """ + + } finally { + try_sql("DROP FUNCTION IF EXISTS static_load_test();") + try_sql("DROP TABLE IF EXISTS ${tableName}") + } +} + --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org