This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-2.1-lakehouse
in repository https://gitbox.apache.org/repos/asf/doris.git

commit ac05972a74ae60b28bd378000d345af31cc523b4
Author: morningman <morning...@163.com>
AuthorDate: Mon Sep 2 18:07:45 2024 +0800

    [opt](udf-cache) cache more udf classes #40404
---
 .../doris/common/classloader/ScannerLoader.java    |   7 +-
 .../doris/common/jni/utils/UdfClassCache.java      |  19 +++
 .../java/org/apache/doris/udf/UdfExecutor.java     | 166 ++++++++++++---------
 fe/fe-common/pom.xml                               |   4 +
 4 files changed, 119 insertions(+), 77 deletions(-)

diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
index 59018da3fb4..dcf9b0747c0 100644
--- 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java
@@ -19,6 +19,7 @@ package org.apache.doris.common.classloader;
 
 import org.apache.doris.common.jni.utils.ExpiringMap;
 import org.apache.doris.common.jni.utils.Log4jOutputStream;
+import org.apache.doris.common.jni.utils.UdfClassCache;
 
 import com.google.common.collect.Streams;
 import org.apache.log4j.Level;
@@ -48,7 +49,7 @@ import java.util.stream.Collectors;
 public class ScannerLoader {
     public static final Logger LOG = Logger.getLogger(ScannerLoader.class);
     private static final Map<String, Class<?>> loadedClasses = new HashMap<>();
-    private static final ExpiringMap<String, ClassLoader> udfLoadedClasses = 
new ExpiringMap<String, ClassLoader>();
+    private static final ExpiringMap<String, UdfClassCache> udfLoadedClasses = 
new ExpiringMap<>();
     private static final String CLASS_SUFFIX = ".class";
     private static final String LOAD_PACKAGE = "org.apache.doris";
 
@@ -83,10 +84,10 @@ public class ScannerLoader {
         return udfLoadedClasses.get(functionSignature);
     }
 
-    public static synchronized void cacheClassLoader(String functionSignature, 
ClassLoader classLoader,
+    public static synchronized void cacheClassLoader(String functionSignature, 
UdfClassCache classCache,
             long expirationTime) {
         LOG.info("cacheClassLoader for: " + functionSignature);
-        udfLoadedClasses.put(functionSignature, classLoader, expirationTime * 
60 * 1000L);
+        udfLoadedClasses.put(functionSignature, classCache, expirationTime * 
60 * 1000L);
     }
 
     public synchronized void cleanUdfClassLoader(String functionSignature) {
diff --git 
a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java
 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java
new file mode 100644
index 00000000000..a2ac1015b0b
--- /dev/null
+++ 
b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java
@@ -0,0 +1,19 @@
+package org.apache.doris.common.jni.utils;
+
+import org.apache.doris.thrift.TFunction;
+
+import com.esotericsoftware.reflectasm.MethodAccess;
+
+import java.lang.reflect.Method;
+
+public class UdfClassCache {
+    public Class<?> c;
+    public MethodAccess methodAccess;
+    public int evaluateIndex;
+    public JavaUdfDataType[] argTypes;
+    public JavaUdfDataType retType;
+    public Class[] argClass;
+    public TFunction fn;
+    public Method method;
+    public Method prepareMethod;
+}
diff --git 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
index 5a84e9185bd..822800914a3 100644
--- 
a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
+++ 
b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java
@@ -20,8 +20,10 @@ package org.apache.doris.udf;
 import org.apache.doris.catalog.Type;
 import org.apache.doris.common.Pair;
 import org.apache.doris.common.classloader.ScannerLoader;
+import org.apache.doris.common.exception.InternalException;
 import org.apache.doris.common.exception.UdfRuntimeException;
 import org.apache.doris.common.jni.utils.JavaUdfDataType;
+import org.apache.doris.common.jni.utils.UdfClassCache;
 import org.apache.doris.common.jni.utils.UdfUtils;
 import org.apache.doris.common.jni.vec.ColumnValueConverter;
 import org.apache.doris.common.jni.vec.VectorTable;
@@ -140,26 +142,96 @@ public class UdfExecutor extends BaseExecutor {
         return null; // Method not found
     }
 
-    public ClassLoader getClassLoader(String jarPath, String signature, long 
expirationTime)
-            throws MalformedURLException, FileNotFoundException {
-        ClassLoader loader = null;
+    public UdfClassCache getClassCache(String className, String jarPath, 
String signature, long expirationTime,
+            Type funcRetType, Type... parameterTypes)
+            throws MalformedURLException, FileNotFoundException, 
ClassNotFoundException, InternalException,
+            UdfRuntimeException {
+        UdfClassCache cache = null;
         if (jarPath == null) {
             // for test
-            loader = ClassLoader.getSystemClassLoader();
+            // cache = ClassLoader.getSystemClassLoader();
         } else {
             if (isStaticLoad) {
-                loader = ScannerLoader.getUdfClassLoader(signature);
+                cache = ScannerLoader.getUdfClassLoader(signature);
             }
-            if (loader == null) {
+            if (cache == null) {
                 ClassLoader parent = getClass().getClassLoader();
                 classLoader = UdfUtils.getClassLoader(jarPath, parent);
-                loader = classLoader;
+                ClassLoader loader = classLoader;
+                cache = new UdfClassCache();
+                cache.c = Class.forName(className, true, loader);
+                cache.methodAccess = MethodAccess.get(cache.c);
+                checkUdfClass(className, cache, funcRetType, parameterTypes);
                 if (isStaticLoad) {
-                    ScannerLoader.cacheClassLoader(signature, loader, 
expirationTime);
+                    ScannerLoader.cacheClassLoader(signature, cache, 
expirationTime);
                 }
             }
         }
-        return loader;
+        return cache;
+    }
+
+    private void checkUdfClass(String className, UdfClassCache cache, Type 
funcRetType, Type... parameterTypes)
+            throws InternalException, UdfRuntimeException {
+        ArrayList<String> signatures = Lists.newArrayList();
+        Class<?> c = cache.c;
+        Method[] methods = c.getMethods();
+        Method prepareMethod = findPrepareMethod(methods);
+        if (prepareMethod != null) {
+            cache.prepareMethod = prepareMethod;
+        }
+        for (Method m : methods) {
+            // By convention, the udf must contain the function "evaluate"
+            if (!m.getName().equals(UDF_FUNCTION_NAME)) {
+                continue;
+            }
+            signatures.add(m.toGenericString());
+            cache.argClass = m.getParameterTypes();
+
+            // Try to match the arguments
+            if (cache.argClass.length != parameterTypes.length) {
+                continue;
+            }
+            cache.method = m;
+            cache.evaluateIndex = 
cache.methodAccess.getIndex(UDF_FUNCTION_NAME, cache.argClass);
+            Pair<Boolean, JavaUdfDataType> returnType;
+            if (cache.argClass.length == 0 && parameterTypes.length == 0) {
+                // Special case where the UDF doesn't take any input args
+                returnType = UdfUtils.setReturnType(funcRetType, 
m.getReturnType());
+                if (!returnType.first) {
+                    continue;
+                } else {
+                    cache.retType = returnType.second;
+                }
+                cache.argTypes = new JavaUdfDataType[0];
+                return;
+            }
+            returnType = UdfUtils.setReturnType(funcRetType, 
m.getReturnType());
+            if (!returnType.first) {
+                continue;
+            } else {
+                cache.retType = returnType.second;
+            }
+            Type keyType = cache.retType.getKeyType();
+            Type valueType = cache.retType.getValueType();
+            Pair<Boolean, JavaUdfDataType[]> inputType = 
UdfUtils.setArgTypes(parameterTypes, cache.argClass, false);
+            if (!inputType.first) {
+                continue;
+            } else {
+                cache.argTypes = inputType.second;
+            }
+            cache.retType.setKeyType(keyType);
+            cache.retType.setValueType(valueType);
+            return;
+        }
+        StringBuilder sb = new StringBuilder();
+        sb.append("Unable to find evaluate function with the correct 
signature: ")
+                         .append(className)
+                         .append(".evaluate(")
+                         .append(Joiner.on(", ").join(parameterTypes))
+                         .append(")\n")
+                         .append("UDF contains: \n    ")
+                         .append(Joiner.on("\n    ").join(signatures));
+        throw new UdfRuntimeException(sb.toString());
     }
 
     // Preallocate the input objects that will be passed to the underlying UDF.
@@ -168,7 +240,6 @@ public class UdfExecutor extends BaseExecutor {
     protected void init(TJavaUdfExecutorCtorParams request, String jarPath, 
Type funcRetType,
             Type... parameterTypes) throws UdfRuntimeException {
         String className = request.fn.scalar_fn.symbol;
-        ArrayList<String> signatures = Lists.newArrayList();
         try {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Loading UDF '" + className + "' from " + jarPath);
@@ -178,76 +249,22 @@ public class UdfExecutor extends BaseExecutor {
             if (request.getFn().isSetExpirationTime()) {
                 expirationTime = request.getFn().getExpirationTime();
             }
-            ClassLoader loader = getClassLoader(jarPath, 
request.getFn().getSignature(), expirationTime);
-            Class<?> c = Class.forName(className, true, loader);
-            methodAccess = MethodAccess.get(c);
+            UdfClassCache cache = getClassCache(className, jarPath, 
request.getFn().getSignature(), expirationTime,
+                    funcRetType, parameterTypes);
+            Class<?> c = cache.c;
+            methodAccess = cache.methodAccess;
             Constructor<?> ctor = c.getConstructor();
             udf = ctor.newInstance();
-            Method[] methods = c.getMethods();
-            Method prepareMethod = findPrepareMethod(methods);
+            Method prepareMethod = cache.prepareMethod;
             if (prepareMethod != null) {
                 prepareMethod.invoke(udf);
             }
-            for (Method m : methods) {
-                // By convention, the udf must contain the function "evaluate"
-                if (!m.getName().equals(UDF_FUNCTION_NAME)) {
-                    continue;
-                }
-                signatures.add(m.toGenericString());
-                argClass = m.getParameterTypes();
-
-                // Try to match the arguments
-                if (argClass.length != parameterTypes.length) {
-                    continue;
-                }
-                method = m;
-                evaluateIndex = methodAccess.getIndex(UDF_FUNCTION_NAME, 
argClass);
-                Pair<Boolean, JavaUdfDataType> returnType;
-                if (argClass.length == 0 && parameterTypes.length == 0) {
-                    // Special case where the UDF doesn't take any input args
-                    returnType = UdfUtils.setReturnType(funcRetType, 
m.getReturnType());
-                    if (!returnType.first) {
-                        continue;
-                    } else {
-                        retType = returnType.second;
-                    }
-                    argTypes = new JavaUdfDataType[0];
-                    if (LOG.isDebugEnabled()) {
-                        LOG.debug("Loaded UDF '" + className + "' from " + 
jarPath);
-                    }
-                    return;
-                }
-                returnType = UdfUtils.setReturnType(funcRetType, 
m.getReturnType());
-                if (!returnType.first) {
-                    continue;
-                } else {
-                    retType = returnType.second;
-                }
-                Type keyType = retType.getKeyType();
-                Type valueType = retType.getValueType();
-                Pair<Boolean, JavaUdfDataType[]> inputType = 
UdfUtils.setArgTypes(parameterTypes, argClass, false);
-                if (!inputType.first) {
-                    continue;
-                } else {
-                    argTypes = inputType.second;
-                }
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug("Loaded UDF '" + className + "' from " + 
jarPath);
-                }
-                retType.setKeyType(keyType);
-                retType.setValueType(valueType);
-                return;
-            }
 
-            StringBuilder sb = new StringBuilder();
-            sb.append("Unable to find evaluate function with the correct 
signature: ")
-                    .append(className)
-                    .append(".evaluate(")
-                    .append(Joiner.on(", ").join(parameterTypes))
-                    .append(")\n")
-                    .append("UDF contains: \n    ")
-                    .append(Joiner.on("\n    ").join(signatures));
-            throw new UdfRuntimeException(sb.toString());
+            argClass = cache.argClass;
+            method = cache.method;
+            evaluateIndex = cache.evaluateIndex;
+            retType = cache.retType;
+            argTypes = cache.argTypes;
         } catch (MalformedURLException e) {
             throw new UdfRuntimeException("Unable to load jar.", e);
         } catch (SecurityException e) {
@@ -265,3 +282,4 @@ public class UdfExecutor extends BaseExecutor {
         }
     }
 }
+
diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml
index 6112ec15068..eae7b690996 100644
--- a/fe/fe-common/pom.xml
+++ b/fe/fe-common/pom.xml
@@ -143,6 +143,10 @@ under the License.
             <version>${commons-collections.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>com.esotericsoftware</groupId>
+            <artifactId>kryo-shaded</artifactId>
+        </dependency>
     </dependencies>
     <build>
         <finalName>doris-fe-common</finalName>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to