This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1-lakehouse
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 58791a18002487b1b455d7917a80d578c2194022
Author: zhangstar333 <87313068+zhangstar...@users.noreply.github.com>
AuthorDate: Wed Jun 26 14:27:27 2024 +0800

    [improve](udf) support java-udf static load (#34980)
    
    ```
    CREATE FUNCTION print_12() RETURNS int
    PROPERTIES (
        "symbol" = "org.apache.doris.udf.AddOne",
        "type" = "JAVA_UDF",
        "file" = "file:///mnt/ava-udf-demo-jar-with-dependencies.jar",
        "static_load" = "true", // default value is false
        "expiration_time" = "60" // default value is 360 minutes
    );
    ```
    if set static_load=true, the udf ClassLoader will be cache, and will be
    clear when expiration timeout
    or use drop function stmt.
---
 be/src/agent/agent_server.cpp                      |   3 +
 be/src/agent/agent_server.h                        |   1 +
 be/src/agent/task_worker_pool.cpp                  |   8 ++
 be/src/agent/task_worker_pool.h                    |   2 +
 be/src/util/jni-util.cpp                           |  20 +++++
 be/src/util/jni-util.h                             |   3 +
 .../doris/common/classloader/ScannerLoader.java    |  18 ++++
 .../apache/doris/common/jni/utils/ExpiringMap.java |  91 +++++++++++++++++++++
 .../java/org/apache/doris/udf/BaseExecutor.java    |   1 +
 .../java/org/apache/doris/udf/UdafExecutor.java    |   2 +
 .../java/org/apache/doris/udf/UdfExecutor.java     |  44 +++++++---
 .../main/java/org/apache/doris/common/Config.java  |   6 ++
 .../apache/doris/analysis/CreateFunctionStmt.java  |  26 ++++--
 .../apache/doris/analysis/DropFunctionStmt.java    |   9 ++
 .../main/java/org/apache/doris/catalog/Env.java    |  14 ++++
 .../java/org/apache/doris/catalog/Function.java    |   3 +
 .../trees/expressions/functions/udf/JavaUdf.java   |   3 +-
 .../java/org/apache/doris/task/AgentBatchTask.java |  10 +++
 .../org/apache/doris/task/CleanUDFCacheTask.java   |  41 ++++++++++
 gensrc/thrift/AgentService.thrift                  |   5 ++
 gensrc/thrift/Types.thrift                         |   5 +-
 .../javaudf_p0/test_javaudf_static_load_test.out   | Bin 0 -> 478 bytes
 .../java/org/apache/doris/udf/StaticIntTest.java   |  29 +++++++
 .../test_javaudf_static_load_test.groovy           |  81 ++++++++++++++++++
 24 files changed, 404 insertions(+), 21 deletions(-)

diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp
index 5355c037b19..99df91f6bd6 100644
--- a/be/src/agent/agent_server.cpp
+++ b/be/src/agent/agent_server.cpp
@@ -167,6 +167,9 @@ void AgentServer::start_workers(ExecEnv* exec_env) {
     _update_visible_version_workers = std::make_unique<TaskWorkerPool>(
             "UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return 
visible_version_callback(engine, task); });
 
+    _clean_udf_cache_workers = std::make_unique<TaskWorkerPool>(
+            "CLEAN_UDF_CACHE", 1, [](auto&& task) {return 
clean_udf_cache_callback(task); });
+
     // clang-format on
 }
 
diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h
index 9f3d91d5621..abe00c6556f 100644
--- a/be/src/agent/agent_server.h
+++ b/be/src/agent/agent_server.h
@@ -98,6 +98,7 @@ private:
     std::unique_ptr<TaskWorkerPool> _gc_binlog_workers;
     std::unique_ptr<TaskWorkerPool> _clean_trash_workers;
     std::unique_ptr<TaskWorkerPool> _update_visible_version_workers;
+    std::unique_ptr<TaskWorkerPool> _clean_udf_cache_workers;
 };
 
 } // end namespace doris
diff --git a/be/src/agent/task_worker_pool.cpp 
b/be/src/agent/task_worker_pool.cpp
index f3302fc2f3e..927d7c64536 100644
--- a/be/src/agent/task_worker_pool.cpp
+++ b/be/src/agent/task_worker_pool.cpp
@@ -79,6 +79,7 @@
 #include "service/backend_options.h"
 #include "util/debug_points.h"
 #include "util/doris_metrics.h"
+#include "util/jni-util.h"
 #include "util/mem_info.h"
 #include "util/random.h"
 #include "util/s3_util.h"
@@ -1807,4 +1808,11 @@ void clean_trash_callback(StorageEngine& engine, const 
TAgentTaskRequest& req) {
     LOG(INFO) << "clean trash finish";
 }
 
+void clean_udf_cache_callback(const TAgentTaskRequest& req) {
+    LOG(INFO) << "clean udf cache start: " << 
req.clean_udf_cache_req.function_signature;
+    static_cast<void>(
+            
JniUtil::clean_udf_class_load_cache(req.clean_udf_cache_req.function_signature));
+    LOG(INFO) << "clean udf cache  finish: " << 
req.clean_udf_cache_req.function_signature;
+}
+
 } // namespace doris
diff --git a/be/src/agent/task_worker_pool.h b/be/src/agent/task_worker_pool.h
index 14d9ff32686..45d49592c2f 100644
--- a/be/src/agent/task_worker_pool.h
+++ b/be/src/agent/task_worker_pool.h
@@ -163,6 +163,8 @@ void gc_binlog_callback(StorageEngine& engine, const 
TAgentTaskRequest& req);
 
 void clean_trash_callback(StorageEngine& engine, const TAgentTaskRequest& req);
 
+void clean_udf_cache_callback(const TAgentTaskRequest& req);
+
 void visible_version_callback(StorageEngine& engine, const TAgentTaskRequest& 
req);
 
 void report_task_callback(const TMasterInfo& master_info);
diff --git a/be/src/util/jni-util.cpp b/be/src/util/jni-util.cpp
index 01409fb3ea5..8259bb21c00 100644
--- a/be/src/util/jni-util.cpp
+++ b/be/src/util/jni-util.cpp
@@ -173,6 +173,7 @@ jmethodID JniUtil::get_jvm_threads_id_ = nullptr;
 jmethodID JniUtil::get_jmx_json_ = nullptr;
 jobject JniUtil::jni_scanner_loader_obj_ = nullptr;
 jmethodID JniUtil::jni_scanner_loader_method_ = nullptr;
+jmethodID JniUtil::_clean_udf_cache_method_id = nullptr;
 
 Status JniUtfCharGuard::create(JNIEnv* env, jstring jstr, JniUtfCharGuard* 
out) {
     DCHECK(jstr != nullptr);
@@ -400,6 +401,25 @@ Status JniUtil::init_jni_scanner_loader(JNIEnv* env) {
     }
     env->CallVoidMethod(jni_scanner_loader_obj_, load_jni_scanner);
     RETURN_ERROR_IF_EXC(env);
+
+    _clean_udf_cache_method_id = env->GetMethodID(jni_scanner_loader_cls, 
"cleanUdfClassLoader",
+                                                  "(Ljava/lang/String;)V");
+    if (_clean_udf_cache_method_id == nullptr) {
+        if (env->ExceptionOccurred()) {
+            env->ExceptionDescribe();
+        }
+        return Status::InternalError("Failed to find removeUdfClassLoader 
method.");
+    }
+    RETURN_ERROR_IF_EXC(env);
+    return Status::OK();
+}
+
+Status JniUtil::clean_udf_class_load_cache(const std::string& 
function_signature) {
+    JNIEnv* env = nullptr;
+    RETURN_IF_ERROR(JniUtil::GetJNIEnv(&env));
+    env->CallVoidMethod(jni_scanner_loader_obj_, _clean_udf_cache_method_id,
+                        env->NewStringUTF(function_signature.c_str()));
+    RETURN_ERROR_IF_EXC(env);
     return Status::OK();
 }
 
diff --git a/be/src/util/jni-util.h b/be/src/util/jni-util.h
index 9a9da4ab04b..de26f882a52 100644
--- a/be/src/util/jni-util.h
+++ b/be/src/util/jni-util.h
@@ -24,6 +24,7 @@
 #include <stdint.h>
 
 #include <string>
+#include <unordered_map>
 
 #include "common/status.h"
 #include "jni_md.h"
@@ -102,6 +103,7 @@ public:
     static Status get_jni_scanner_class(JNIEnv* env, const char* classname, 
jclass* loaded_class);
     static jobject convert_to_java_map(JNIEnv* env, const 
std::map<std::string, std::string>& map);
     static std::map<std::string, std::string> convert_to_cpp_map(JNIEnv* env, 
jobject map);
+    static Status clean_udf_class_load_cache(const std::string& 
function_signature);
 
 private:
     static Status GetJNIEnvSlowPath(JNIEnv** env);
@@ -121,6 +123,7 @@ private:
     static jmethodID jni_scanner_loader_method_;
     // Thread-local cache of the JNIEnv for this thread.
     static __thread JNIEnv* tls_env_;
+    static jmethodID _clean_udf_cache_method_id;
 };
 
 /// Helper class for lifetime management of chars from JNI, releasing JNI 
chars when
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
index 67f8ea416c2..59018da3fb4 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.common.classloader;
 
+import org.apache.doris.common.jni.utils.ExpiringMap;
 import org.apache.doris.common.jni.utils.Log4jOutputStream;
 
 import com.google.common.collect.Streams;
@@ -45,7 +46,9 @@ import java.util.stream.Collectors;
  * BE will load scanners by JNI call, and then the JniConnector on BE will get 
scanner class by getLoadedClass.
  */
 public class ScannerLoader {
+    public static final Logger LOG = Logger.getLogger(ScannerLoader.class);
     private static final Map<String, Class<?>> loadedClasses = new HashMap<>();
+    private static final ExpiringMap<String, ClassLoader> udfLoadedClasses = 
new ExpiringMap<String, ClassLoader>();
     private static final String CLASS_SUFFIX = ".class";
     private static final String LOAD_PACKAGE = "org.apache.doris";
 
@@ -76,6 +79,21 @@ public class ScannerLoader {
         System.setErr(errorPrintStream);
     }
 
+    public static ClassLoader getUdfClassLoader(String functionSignature) {
+        return udfLoadedClasses.get(functionSignature);
+    }
+
+    public static synchronized void cacheClassLoader(String functionSignature, 
ClassLoader classLoader,
+            long expirationTime) {
+        LOG.info("cacheClassLoader for: " + functionSignature);
+        udfLoadedClasses.put(functionSignature, classLoader, expirationTime * 
60 * 1000L);
+    }
+
+    public synchronized void cleanUdfClassLoader(String functionSignature) {
+        LOG.info("cleanUdfClassLoader for: " + functionSignature);
+        udfLoadedClasses.remove(functionSignature);
+    }
+
     /**
      * Get loaded class for JNI scanners
      * @param className JNI scanner class name
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
new file mode 100644
index 00000000000..f08b50a0c42
--- /dev/null
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/ExpiringMap.java
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.common.jni.utils;
+
+import org.apache.log4j.Logger;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ExpiringMap<K, V> {
+    private final ConcurrentHashMap<K, V> map = new ConcurrentHashMap<>(); // 
key --> value
+    private final ConcurrentHashMap<K, Long> ttlMap = new 
ConcurrentHashMap<>(); // key --> ttl interval
+    // key --> expirationTime(ttl interval + currentTimeMillis)
+    private final ConcurrentHashMap<K, Long> expirationMap = new 
ConcurrentHashMap<>();
+    private final ScheduledExecutorService scheduler = 
Executors.newScheduledThreadPool(1);
+    private static final long DEFAULT_INTERVAL_TIME = 10 * 60 * 1000L; // 10 
minutes
+    public static final Logger LOG = Logger.getLogger(ExpiringMap.class);
+
+    public ExpiringMap() {
+        startExpirationTask();
+    }
+
+    public void put(K key, V value, long expirationTimeMs) {
+        long expirationTime = System.currentTimeMillis() + expirationTimeMs;
+        map.put(key, value);
+        expirationMap.put(key, expirationTime);
+        ttlMap.put(key, expirationTimeMs);
+    }
+
+    public V get(K key) {
+        Long expirationTime = expirationMap.get(key);
+        if (expirationTime == null || System.currentTimeMillis() > 
expirationTime) {
+            map.remove(key);
+            expirationMap.remove(key);
+            ttlMap.remove(key);
+            return null;
+        }
+        // reset time again
+        long ttl = ttlMap.get(key);
+        long newExpirationTime = System.currentTimeMillis() + ttl;
+        expirationMap.put(key, newExpirationTime);
+        return map.get(key);
+    }
+
+    private void startExpirationTask() {
+        scheduler.scheduleAtFixedRate(() -> {
+            long now = System.currentTimeMillis();
+            for (K key : expirationMap.keySet()) {
+                if (expirationMap.get(key) <= now) {
+                    map.remove(key);
+                    expirationMap.remove(key);
+                    ttlMap.remove(key);
+                }
+            }
+        }, DEFAULT_INTERVAL_TIME, DEFAULT_INTERVAL_TIME, TimeUnit.MINUTES);
+    }
+
+    public void remove(K key) {
+        map.remove(key);
+        expirationMap.remove(key);
+        ttlMap.remove(key);
+    }
+
+    public void shutdown() {
+        scheduler.shutdown();
+        try {
+            if (!scheduler.awaitTermination(60, TimeUnit.SECONDS)) {
+                scheduler.shutdownNow();
+            }
+        } catch (InterruptedException e) {
+            scheduler.shutdownNow();
+        }
+    }
+}
diff --git 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
index 8ad171d6013..2d641f74354 100644
--- 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
+++ 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/BaseExecutor.java
@@ -146,6 +146,7 @@ public abstract class BaseExecutor {
         // We are now un-usable (because the class loader has been
         // closed), so null out method_ and classLoader_.
         classLoader = null;
+        methodAccess = null;
     }
 
     protected ColumnValueConverter getInputConverter(TPrimitiveType 
primitiveType, Class clz) {
diff --git 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
index cf0021c7db0..1eeef08c332 100644
--- 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
+++ 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdafExecutor.java
@@ -73,6 +73,8 @@ public class UdafExecutor extends BaseExecutor {
             outputTable.close();
         }
         super.close();
+        allMethods = null;
+        stateObjMap = null;
     }
 
     private Map<Integer, ColumnValueConverter> getInputConverters(int 
numColumns) {
diff --git 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
index 7e44cd3e423..5a84e9185bd 100644
--- 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
+++ 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
@@ -19,6 +19,7 @@ package org.apache.doris.udf;
 
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.Pair;
+import org.apache.doris.common.classloader.ScannerLoader;
 import org.apache.doris.common.exception.UdfRuntimeException;
 import org.apache.doris.common.jni.utils.JavaUdfDataType;
 import org.apache.doris.common.jni.utils.UdfUtils;
@@ -31,6 +32,7 @@ import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import org.apache.log4j.Logger;
 
+import java.io.FileNotFoundException;
 import java.lang.reflect.Array;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.Method;
@@ -50,6 +52,8 @@ public class UdfExecutor extends BaseExecutor {
 
     private VectorTable outputTable = null;
 
+    private boolean isStaticLoad = false;
+
     /**
      * Create a UdfExecutor, using parameters from a serialized thrift object. 
Used by
      * the backend.
@@ -70,7 +74,9 @@ public class UdfExecutor extends BaseExecutor {
         // We are now un-usable (because the class loader has been
         // closed), so null out method_ and classLoader_.
         method = null;
-        super.close();
+        if (!isStaticLoad) {
+            super.close();
+        }
     }
 
     private Map<Integer, ColumnValueConverter> getInputConverters(int 
numColumns) {
@@ -134,6 +140,28 @@ public class UdfExecutor extends BaseExecutor {
         return null; // Method not found
     }
 
+    public ClassLoader getClassLoader(String jarPath, String signature, long 
expirationTime)
+            throws MalformedURLException, FileNotFoundException {
+        ClassLoader loader = null;
+        if (jarPath == null) {
+            // for test
+            loader = ClassLoader.getSystemClassLoader();
+        } else {
+            if (isStaticLoad) {
+                loader = ScannerLoader.getUdfClassLoader(signature);
+            }
+            if (loader == null) {
+                ClassLoader parent = getClass().getClassLoader();
+                classLoader = UdfUtils.getClassLoader(jarPath, parent);
+                loader = classLoader;
+                if (isStaticLoad) {
+                    ScannerLoader.cacheClassLoader(signature, loader, 
expirationTime);
+                }
+            }
+        }
+        return loader;
+    }
+
     // Preallocate the input objects that will be passed to the underlying UDF.
     // These objects are allocated once and reused across calls to evaluate()
     @Override
@@ -145,16 +173,12 @@ public class UdfExecutor extends BaseExecutor {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Loading UDF '" + className + "' from " + jarPath);
             }
-            ClassLoader loader;
-            if (jarPath != null) {
-                // Save for cleanup.
-                ClassLoader parent = getClass().getClassLoader();
-                classLoader = UdfUtils.getClassLoader(jarPath, parent);
-                loader = classLoader;
-            } else {
-                // for test
-                loader = ClassLoader.getSystemClassLoader();
+            isStaticLoad = request.getFn().isSetIsStaticLoad() && 
request.getFn().is_static_load;
+            long expirationTime = 360L; // default is 6 hours
+            if (request.getFn().isSetExpirationTime()) {
+                expirationTime = request.getFn().getExpirationTime();
             }
+            ClassLoader loader = getClassLoader(jarPath, 
request.getFn().getSignature(), expirationTime);
             Class<?> c = Class.forName(className, true, loader);
             methodAccess = MethodAccess.get(c);
             Constructor<?> ctor = c.getConstructor();
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java 
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index de749bc08be..a367c13c75b 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2891,6 +2891,12 @@ public class Config extends ConfigBase {
             "Stream_Load When importing, the maximum length of label is 
limited"})
     public static int label_regex_length = 128;
 
+    @ConfField
+    public static boolean enable_java_udf_static_load = false;
+
+    @ConfField
+    public static long java_udf_load_expiration_time_min = 360;
+
     
//==========================================================================
     //                      end of cloud config
     
//==========================================================================
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
index 3db95cb6a9f..cdfc6dc576f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/CreateFunctionStmt.java
@@ -97,6 +97,9 @@ public class CreateFunctionStmt extends DdlStmt {
     public static final String STATE_CLASS_NAME = "State";
     // add for java udf check return type nullable mode, always_nullable or 
always_not_nullable
     public static final String IS_RETURN_NULL = "always_nullable";
+    // iff is static load, BE will be cache the udf class load, so only need 
load once
+    public static final String IS_STATIC_LOAD = "static_load";
+    public static final String EXPIRATION_TIME = "expiration_time";
     private static final Logger LOG = 
LogManager.getLogger(CreateFunctionStmt.class);
 
     private SetType type = SetType.DEFAULT;
@@ -262,20 +265,25 @@ public class CreateFunctionStmt extends DdlStmt {
         if (binaryType == TFunctionBinaryType.JAVA_UDF) {
             FunctionUtil.checkEnableJavaUdf();
 
-            String returnNullModeStr = properties.get(IS_RETURN_NULL);
-            if (returnNullModeStr == null) {
-                return;
-            }
-            if (!returnNullModeStr.equalsIgnoreCase("false") && 
!returnNullModeStr.equalsIgnoreCase("true")) {
-                throw new AnalysisException("'always_nullable' in properties, 
you should set it false or true");
-            }
-
-            if (!Boolean.parseBoolean(returnNullModeStr)) {
+            // always_nullable the default value is true, equal null means true
+            Boolean isReturnNull = parseBooleanFromProperties(IS_RETURN_NULL);
+            if (isReturnNull != null && !isReturnNull) {
                 returnNullMode = NullableMode.ALWAYS_NOT_NULLABLE;
             }
         }
     }
 
+    private Boolean parseBooleanFromProperties(String propertyString) throws 
AnalysisException {
+        String valueOfString = properties.get(propertyString);
+        if (valueOfString == null) {
+            return null;
+        }
+        if (!valueOfString.equalsIgnoreCase("false") && 
!valueOfString.equalsIgnoreCase("true")) {
+            throw new AnalysisException(propertyString + " in properties, you 
should set it false or true");
+        }
+        return Boolean.parseBoolean(valueOfString);
+    }
+
     private void computeObjectChecksum() throws IOException, 
NoSuchAlgorithmException {
         if (FeConstants.runningUnitTest) {
             // skip checking checksum when running ut
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java
index beeed11c870..9f3d297b6b6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropFunctionStmt.java
@@ -25,6 +25,8 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.qe.ConnectContext;
 
+import com.google.common.base.Joiner;
+
 public class DropFunctionStmt extends DdlStmt {
     private final boolean ifExists;
     private final FunctionName functionName;
@@ -81,6 +83,13 @@ public class DropFunctionStmt extends DdlStmt {
         return stringBuilder.toString();
     }
 
+    public String signatureString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append(functionName.getFunction()).append("(").append(Joiner.on(", 
").join(argsDef.getArgTypes()));
+        sb.append(")");
+        return sb.toString();
+    }
+
     @Override
     public RedirectStatus getRedirectStatus() {
         return RedirectStatus.FORWARD_WITH_SYNC;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
index 58f8c1e6a90..5aa435813f3 100755
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Env.java
@@ -267,6 +267,7 @@ import org.apache.doris.system.SystemInfoService.HostInfo;
 import org.apache.doris.task.AgentBatchTask;
 import org.apache.doris.task.AgentTaskExecutor;
 import org.apache.doris.task.CleanTrashTask;
+import org.apache.doris.task.CleanUDFCacheTask;
 import org.apache.doris.task.CompactionTask;
 import org.apache.doris.task.DropReplicaTask;
 import org.apache.doris.task.MasterTaskExecutor;
@@ -5563,6 +5564,7 @@ public class Env {
             Database db = 
getInternalCatalog().getDbOrDdlException(name.getDb());
             db.dropFunction(stmt.getFunction(), stmt.isIfExists());
         }
+        cleanUDFCacheTask(stmt); // BE will cache classload, when drop 
function, BE need clear cache
     }
 
     public void replayDropFunction(FunctionSearchDesc functionSearchDesc) 
throws MetaNotFoundException {
@@ -6087,6 +6089,18 @@ public class Env {
         AgentTaskExecutor.submit(batchTask);
     }
 
+    public void cleanUDFCacheTask(DropFunctionStmt stmt) {
+        ImmutableMap<Long, Backend> backendsInfo = 
Env.getCurrentSystemInfo().getIdToBackend();
+        String functionSignature = stmt.signatureString();
+        AgentBatchTask batchTask = new AgentBatchTask();
+        for (Backend backend : backendsInfo.values()) {
+            CleanUDFCacheTask cleanUDFCacheTask = new 
CleanUDFCacheTask(backend.getId(), functionSignature);
+            batchTask.addTask(cleanUDFCacheTask);
+            LOG.info("clean udf cache in be {}, beId {}", backend.getHost(), 
backend.getId());
+        }
+        AgentTaskExecutor.submit(batchTask);
+    }
+
     public void setPartitionVersion(AdminSetPartitionVersionStmt stmt) throws 
DdlException {
         String database = stmt.getDatabase();
         String table = stmt.getTable();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java
index 7dbf3a0ec0a..54a3fd49e30 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/Function.java
@@ -22,6 +22,7 @@ import org.apache.doris.analysis.FunctionCallExpr;
 import org.apache.doris.analysis.FunctionName;
 import org.apache.doris.analysis.FunctionParams;
 import org.apache.doris.common.AnalysisException;
+import org.apache.doris.common.Config;
 import org.apache.doris.common.FeMetaVersion;
 import org.apache.doris.common.UserException;
 import org.apache.doris.common.io.IOUtils;
@@ -563,6 +564,8 @@ public class Function implements Writable {
             fn.setChecksum(checksum);
         }
         fn.setVectorized(vectorized);
+        fn.setIsStaticLoad(Config.enable_java_udf_static_load);
+        fn.setExpirationTime(Config.java_udf_load_expiration_time_min);
         return fn;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java
index 3ed770e12ed..0b49e50a346 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/udf/JavaUdf.java
@@ -104,7 +104,8 @@ public class JavaUdf extends ScalarFunction implements 
ExplicitlyCastableSignatu
     public JavaUdf withChildren(List<Expression> children) {
         Preconditions.checkArgument(children.size() == this.children.size());
         return new JavaUdf(getName(), functionId, dbName, binaryType, 
signature, nullableMode,
-                objectFile, symbol, prepareFn, closeFn, checkSum, 
children.toArray(new Expression[0]));
+                objectFile, symbol, prepareFn, closeFn, checkSum,
+                children.toArray(new Expression[0]));
     }
 
     /**
diff --git a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
index ebfdb28a16b..8507f27a896 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/AgentBatchTask.java
@@ -29,6 +29,7 @@ import org.apache.doris.thrift.TAlterInvertedIndexReq;
 import org.apache.doris.thrift.TAlterTabletReqV2;
 import org.apache.doris.thrift.TCheckConsistencyReq;
 import org.apache.doris.thrift.TCleanTrashReq;
+import org.apache.doris.thrift.TCleanUDFCacheReq;
 import org.apache.doris.thrift.TClearAlterTaskRequest;
 import org.apache.doris.thrift.TClearTransactionTaskRequest;
 import org.apache.doris.thrift.TCloneReq;
@@ -441,6 +442,15 @@ public class AgentBatchTask implements Runnable {
                 tAgentTaskRequest.setVisibleVersionReq(request);
                 return tAgentTaskRequest;
             }
+            case CLEAN_UDF_CACHE: {
+                CleanUDFCacheTask cleanUDFCacheTask = (CleanUDFCacheTask) task;
+                TCleanUDFCacheReq request = cleanUDFCacheTask.toThrift();
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug(request.toString());
+                }
+                tAgentTaskRequest.setCleanUdfCacheReq(request);
+                return tAgentTaskRequest;
+            }
             default:
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("could not find task type for task [{}]", task);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java 
b/fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java
new file mode 100644
index 00000000000..63ed5473bf1
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/task/CleanUDFCacheTask.java
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.task;
+
+import org.apache.doris.thrift.TCleanUDFCacheReq;
+import org.apache.doris.thrift.TTaskType;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+
+public class CleanUDFCacheTask extends AgentTask {
+    private static final Logger LOG = 
LogManager.getLogger(CleanUDFCacheTask.class);
+    private String functionSignature;
+
+    public CleanUDFCacheTask(long backendId, String signature) {
+        super(null, backendId, TTaskType.CLEAN_UDF_CACHE, -1, -1, -1, -1, -1, 
-1, -1);
+        this.functionSignature = signature;
+    }
+
+    public TCleanUDFCacheReq toThrift() {
+        TCleanUDFCacheReq req = new TCleanUDFCacheReq();
+        req.setFunctionSignature(this.functionSignature);
+        return req;
+    }
+}
diff --git a/gensrc/thrift/AgentService.thrift 
b/gensrc/thrift/AgentService.thrift
index e879b54615d..b7f273b6e44 100644
--- a/gensrc/thrift/AgentService.thrift
+++ b/gensrc/thrift/AgentService.thrift
@@ -103,6 +103,10 @@ struct TPushStoragePolicyReq {
 
 struct TCleanTrashReq {}
 
+struct TCleanUDFCacheReq {
+    1: optional string function_signature //function_name(arg_type)
+}
+
 enum TCompressionType {
     UNKNOWN_COMPRESSION = 0,
     DEFAULT_COMPRESSION = 1,
@@ -503,6 +507,7 @@ struct TAgentTaskRequest {
     33: optional TGcBinlogReq gc_binlog_req
     34: optional TCleanTrashReq clean_trash_req
     35: optional TVisibleVersionReq visible_version_req
+    36: optional TCleanUDFCacheReq clean_udf_cache_req
 }
 
 struct TAgentResult {
diff --git a/gensrc/thrift/Types.thrift b/gensrc/thrift/Types.thrift
index 89a67fdba15..8639a1a1476 100644
--- a/gensrc/thrift/Types.thrift
+++ b/gensrc/thrift/Types.thrift
@@ -224,7 +224,8 @@ enum TTaskType {
     ALTER_INVERTED_INDEX,
     GC_BINLOG,
     CLEAN_TRASH,
-    UPDATE_VISIBLE_VERSION
+    UPDATE_VISIBLE_VERSION,
+    CLEAN_UDF_CACHE;
 }
 
 enum TStmtType {
@@ -380,6 +381,8 @@ struct TFunction {
   11: optional i64 id
   12: optional string checksum
   13: optional bool vectorized = false
+  15: optional bool is_static_load = false
+  16: optional i64 expiration_time //minutes
 }
 
 enum TJdbcOperation {
diff --git a/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out 
b/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out
new file mode 100644
index 00000000000..9f57f52d091
Binary files /dev/null and 
b/regression-test/data/javaudf_p0/test_javaudf_static_load_test.out differ
diff --git 
a/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTest.java
 
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTest.java
new file mode 100644
index 00000000000..e226c0e6029
--- /dev/null
+++ 
b/regression-test/java-udf-src/src/main/java/org/apache/doris/udf/StaticIntTest.java
@@ -0,0 +1,29 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.udf;
+
+public class StaticIntTest {
+    static {
+        System.out.println("static load should only print once");
+    }
+    private static int value = 0;
+    public Integer evaluate() {
+        value = value + 1;
+        return value;
+    }
+}
diff --git 
a/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy 
b/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy
new file mode 100644
index 00000000000..c816ec90292
--- /dev/null
+++ b/regression-test/suites/javaudf_p0/test_javaudf_static_load_test.groovy
@@ -0,0 +1,81 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+import java.nio.charset.StandardCharsets
+import java.nio.file.Files
+import java.nio.file.Paths
+
+suite("test_javaudf_static_load_test") {
+    def tableName = "test_javaudf_static_load_test"
+    def jarPath = 
"""${context.file.parent}/jars/java-udf-case-jar-with-dependencies.jar"""
+
+
+    log.info("Jar path: ${jarPath}".toString())
+    try {
+        sql """ DROP TABLE IF EXISTS ${tableName} """
+        sql """
+        CREATE TABLE IF NOT EXISTS ${tableName} (
+            `user_id`     INT         NOT NULL COMMENT "用户id",
+            `char_col`    CHAR        NOT NULL COMMENT "",
+            `varchar_col` VARCHAR(10) NOT NULL COMMENT "",
+            `string_col`  STRING      NOT NULL COMMENT ""
+            )
+            DISTRIBUTED BY HASH(user_id) PROPERTIES("replication_num" = "1");
+        """
+        StringBuilder sb = new StringBuilder()
+        int i = 1
+        for (; i < 9; i ++) {
+            sb.append("""
+                (${i % 3}, '${i}','abcdefg${i}','poiuytre${i}abcdefg'),
+            """)
+        }
+        sb.append("""
+                (${i}, '${i}','abcdefg${i}','poiuytre${i}abcdefg')
+            """)
+        sql """ INSERT INTO ${tableName} VALUES
+             ${sb.toString()}
+            """
+        qt_select_default """ SELECT * FROM ${tableName} t ORDER BY char_col; 
"""
+
+        File path = new File(jarPath)
+        if (!path.exists()) {
+            throw new IllegalStateException("""${jarPath} doesn't exist! """)
+        }
+
+        sql """ CREATE FUNCTION static_load_test() RETURNS int PROPERTIES (
+            "file"="file://${jarPath}",
+            "symbol"="org.apache.doris.udf.StaticIntTest",
+            "static_load"="true",
+            "expiration_time"="10",
+            "type"="JAVA_UDF"
+        ); """
+
+        sql """set parallel_pipeline_task_num = 1; """
+        qt_select1 """ SELECT static_load_test(); """
+        qt_select2 """ SELECT static_load_test(); """
+        qt_select3 """ SELECT static_load_test(); """
+        qt_select4 """ SELECT static_load_test(); """
+        qt_select5 """ SELECT static_load_test(); """
+
+    } finally {
+        try_sql("DROP FUNCTION IF EXISTS static_load_test();")
+        try_sql("DROP TABLE IF EXISTS ${tableName}")
+    }
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org


Reply via email to