This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-2.1-lakehouse in repository https://gitbox.apache.org/repos/asf/doris.git
commit ac05972a74ae60b28bd378000d345af31cc523b4 Author: morningman <morning...@163.com> AuthorDate: Mon Sep 2 18:07:45 2024 +0800 [opt](udf-cache) cache more udf classes #40404 --- .../doris/common/classloader/ScannerLoader.java | 7 +- .../doris/common/jni/utils/UdfClassCache.java | 19 +++ .../java/org/apache/doris/udf/UdfExecutor.java | 166 ++++++++++++--------- fe/fe-common/pom.xml | 4 + 4 files changed, 119 insertions(+), 77 deletions(-) diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java index 59018da3fb4..dcf9b0747c0 100644 --- a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/classloader/ScannerLoader.java @@ -19,6 +19,7 @@ package org.apache.doris.common.classloader; import org.apache.doris.common.jni.utils.ExpiringMap; import org.apache.doris.common.jni.utils.Log4jOutputStream; +import org.apache.doris.common.jni.utils.UdfClassCache; import com.google.common.collect.Streams; import org.apache.log4j.Level; @@ -48,7 +49,7 @@ import java.util.stream.Collectors; public class ScannerLoader { public static final Logger LOG = Logger.getLogger(ScannerLoader.class); private static final Map<String, Class<?>> loadedClasses = new HashMap<>(); - private static final ExpiringMap<String, ClassLoader> udfLoadedClasses = new ExpiringMap<String, ClassLoader>(); + private static final ExpiringMap<String, UdfClassCache> udfLoadedClasses = new ExpiringMap<>(); private static final String CLASS_SUFFIX = ".class"; private static final String LOAD_PACKAGE = "org.apache.doris"; @@ -83,10 +84,10 @@ public class ScannerLoader { return udfLoadedClasses.get(functionSignature); } - public static synchronized void cacheClassLoader(String functionSignature, ClassLoader classLoader, + public static synchronized void cacheClassLoader(String functionSignature, UdfClassCache classCache, long expirationTime) { LOG.info("cacheClassLoader for: " + functionSignature); - udfLoadedClasses.put(functionSignature, classLoader, expirationTime * 60 * 1000L); + udfLoadedClasses.put(functionSignature, classCache, expirationTime * 60 * 1000L); } public synchronized void cleanUdfClassLoader(String functionSignature) { diff --git a/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java new file mode 100644 index 00000000000..a2ac1015b0b --- /dev/null +++ b/fe/be-java-extensions/java-common/src/main/java/org/apache/doris/common/jni/utils/UdfClassCache.java @@ -0,0 +1,19 @@ +package org.apache.doris.common.jni.utils; + +import org.apache.doris.thrift.TFunction; + +import com.esotericsoftware.reflectasm.MethodAccess; + +import java.lang.reflect.Method; + +public class UdfClassCache { + public Class<?> c; + public MethodAccess methodAccess; + public int evaluateIndex; + public JavaUdfDataType[] argTypes; + public JavaUdfDataType retType; + public Class[] argClass; + public TFunction fn; + public Method method; + public Method prepareMethod; +} diff --git a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java index 5a84e9185bd..822800914a3 100644 --- a/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java +++ b/fe/be-java-extensions/java-udf/src/main/java/org/apache/doris/udf/UdfExecutor.java @@ -20,8 +20,10 @@ package org.apache.doris.udf; import org.apache.doris.catalog.Type; import org.apache.doris.common.Pair; import org.apache.doris.common.classloader.ScannerLoader; +import org.apache.doris.common.exception.InternalException; import org.apache.doris.common.exception.UdfRuntimeException; import org.apache.doris.common.jni.utils.JavaUdfDataType; +import org.apache.doris.common.jni.utils.UdfClassCache; import org.apache.doris.common.jni.utils.UdfUtils; import org.apache.doris.common.jni.vec.ColumnValueConverter; import org.apache.doris.common.jni.vec.VectorTable; @@ -140,26 +142,96 @@ public class UdfExecutor extends BaseExecutor { return null; // Method not found } - public ClassLoader getClassLoader(String jarPath, String signature, long expirationTime) - throws MalformedURLException, FileNotFoundException { - ClassLoader loader = null; + public UdfClassCache getClassCache(String className, String jarPath, String signature, long expirationTime, + Type funcRetType, Type... parameterTypes) + throws MalformedURLException, FileNotFoundException, ClassNotFoundException, InternalException, + UdfRuntimeException { + UdfClassCache cache = null; if (jarPath == null) { // for test - loader = ClassLoader.getSystemClassLoader(); + // cache = ClassLoader.getSystemClassLoader(); } else { if (isStaticLoad) { - loader = ScannerLoader.getUdfClassLoader(signature); + cache = ScannerLoader.getUdfClassLoader(signature); } - if (loader == null) { + if (cache == null) { ClassLoader parent = getClass().getClassLoader(); classLoader = UdfUtils.getClassLoader(jarPath, parent); - loader = classLoader; + ClassLoader loader = classLoader; + cache = new UdfClassCache(); + cache.c = Class.forName(className, true, loader); + cache.methodAccess = MethodAccess.get(cache.c); + checkUdfClass(className, cache, funcRetType, parameterTypes); if (isStaticLoad) { - ScannerLoader.cacheClassLoader(signature, loader, expirationTime); + ScannerLoader.cacheClassLoader(signature, cache, expirationTime); } } } - return loader; + return cache; + } + + private void checkUdfClass(String className, UdfClassCache cache, Type funcRetType, Type... parameterTypes) + throws InternalException, UdfRuntimeException { + ArrayList<String> signatures = Lists.newArrayList(); + Class<?> c = cache.c; + Method[] methods = c.getMethods(); + Method prepareMethod = findPrepareMethod(methods); + if (prepareMethod != null) { + cache.prepareMethod = prepareMethod; + } + for (Method m : methods) { + // By convention, the udf must contain the function "evaluate" + if (!m.getName().equals(UDF_FUNCTION_NAME)) { + continue; + } + signatures.add(m.toGenericString()); + cache.argClass = m.getParameterTypes(); + + // Try to match the arguments + if (cache.argClass.length != parameterTypes.length) { + continue; + } + cache.method = m; + cache.evaluateIndex = cache.methodAccess.getIndex(UDF_FUNCTION_NAME, cache.argClass); + Pair<Boolean, JavaUdfDataType> returnType; + if (cache.argClass.length == 0 && parameterTypes.length == 0) { + // Special case where the UDF doesn't take any input args + returnType = UdfUtils.setReturnType(funcRetType, m.getReturnType()); + if (!returnType.first) { + continue; + } else { + cache.retType = returnType.second; + } + cache.argTypes = new JavaUdfDataType[0]; + return; + } + returnType = UdfUtils.setReturnType(funcRetType, m.getReturnType()); + if (!returnType.first) { + continue; + } else { + cache.retType = returnType.second; + } + Type keyType = cache.retType.getKeyType(); + Type valueType = cache.retType.getValueType(); + Pair<Boolean, JavaUdfDataType[]> inputType = UdfUtils.setArgTypes(parameterTypes, cache.argClass, false); + if (!inputType.first) { + continue; + } else { + cache.argTypes = inputType.second; + } + cache.retType.setKeyType(keyType); + cache.retType.setValueType(valueType); + return; + } + StringBuilder sb = new StringBuilder(); + sb.append("Unable to find evaluate function with the correct signature: ") + .append(className) + .append(".evaluate(") + .append(Joiner.on(", ").join(parameterTypes)) + .append(")\n") + .append("UDF contains: \n ") + .append(Joiner.on("\n ").join(signatures)); + throw new UdfRuntimeException(sb.toString()); } // Preallocate the input objects that will be passed to the underlying UDF. @@ -168,7 +240,6 @@ public class UdfExecutor extends BaseExecutor { protected void init(TJavaUdfExecutorCtorParams request, String jarPath, Type funcRetType, Type... parameterTypes) throws UdfRuntimeException { String className = request.fn.scalar_fn.symbol; - ArrayList<String> signatures = Lists.newArrayList(); try { if (LOG.isDebugEnabled()) { LOG.debug("Loading UDF '" + className + "' from " + jarPath); @@ -178,76 +249,22 @@ public class UdfExecutor extends BaseExecutor { if (request.getFn().isSetExpirationTime()) { expirationTime = request.getFn().getExpirationTime(); } - ClassLoader loader = getClassLoader(jarPath, request.getFn().getSignature(), expirationTime); - Class<?> c = Class.forName(className, true, loader); - methodAccess = MethodAccess.get(c); + UdfClassCache cache = getClassCache(className, jarPath, request.getFn().getSignature(), expirationTime, + funcRetType, parameterTypes); + Class<?> c = cache.c; + methodAccess = cache.methodAccess; Constructor<?> ctor = c.getConstructor(); udf = ctor.newInstance(); - Method[] methods = c.getMethods(); - Method prepareMethod = findPrepareMethod(methods); + Method prepareMethod = cache.prepareMethod; if (prepareMethod != null) { prepareMethod.invoke(udf); } - for (Method m : methods) { - // By convention, the udf must contain the function "evaluate" - if (!m.getName().equals(UDF_FUNCTION_NAME)) { - continue; - } - signatures.add(m.toGenericString()); - argClass = m.getParameterTypes(); - - // Try to match the arguments - if (argClass.length != parameterTypes.length) { - continue; - } - method = m; - evaluateIndex = methodAccess.getIndex(UDF_FUNCTION_NAME, argClass); - Pair<Boolean, JavaUdfDataType> returnType; - if (argClass.length == 0 && parameterTypes.length == 0) { - // Special case where the UDF doesn't take any input args - returnType = UdfUtils.setReturnType(funcRetType, m.getReturnType()); - if (!returnType.first) { - continue; - } else { - retType = returnType.second; - } - argTypes = new JavaUdfDataType[0]; - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded UDF '" + className + "' from " + jarPath); - } - return; - } - returnType = UdfUtils.setReturnType(funcRetType, m.getReturnType()); - if (!returnType.first) { - continue; - } else { - retType = returnType.second; - } - Type keyType = retType.getKeyType(); - Type valueType = retType.getValueType(); - Pair<Boolean, JavaUdfDataType[]> inputType = UdfUtils.setArgTypes(parameterTypes, argClass, false); - if (!inputType.first) { - continue; - } else { - argTypes = inputType.second; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Loaded UDF '" + className + "' from " + jarPath); - } - retType.setKeyType(keyType); - retType.setValueType(valueType); - return; - } - StringBuilder sb = new StringBuilder(); - sb.append("Unable to find evaluate function with the correct signature: ") - .append(className) - .append(".evaluate(") - .append(Joiner.on(", ").join(parameterTypes)) - .append(")\n") - .append("UDF contains: \n ") - .append(Joiner.on("\n ").join(signatures)); - throw new UdfRuntimeException(sb.toString()); + argClass = cache.argClass; + method = cache.method; + evaluateIndex = cache.evaluateIndex; + retType = cache.retType; + argTypes = cache.argTypes; } catch (MalformedURLException e) { throw new UdfRuntimeException("Unable to load jar.", e); } catch (SecurityException e) { @@ -265,3 +282,4 @@ public class UdfExecutor extends BaseExecutor { } } } + diff --git a/fe/fe-common/pom.xml b/fe/fe-common/pom.xml index 6112ec15068..eae7b690996 100644 --- a/fe/fe-common/pom.xml +++ b/fe/fe-common/pom.xml @@ -143,6 +143,10 @@ under the License. <version>${commons-collections.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo-shaded</artifactId> + </dependency> </dependencies> <build> <finalName>doris-fe-common</finalName> --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org