Thanks for driving this ;)
-- Best! Xuyang 在 2024-04-16 10:47:56,"Xiaolong Wang" <xiaolong.w...@smartnews.com> 写道: Reported. JIRA link: https://issues.apache.org/jira/browse/FLINK-35117?filter=-2 On Tue, Apr 16, 2024 at 10:05 AM Xiaolong Wang <xiaolong.w...@smartnews.com> wrote: By adding `'org.apache.commons.text` to the OWNER_CLASSPATH list, the issue can be resolved. I'll create a JIRA about it. On Mon, Apr 15, 2024 at 12:14 PM Xiaolong Wang <xiaolong.w...@smartnews.com> wrote: Sure, the AsyncScalarFunction's code looks like : public class AsyncHashCodeFunction extends AsyncScalarFunction { private int factor = 0; private ExecutorService executor; @Override public void open(FunctionContext context) throws Exception { // access the global "hashcode_factor" parameter // "12" would be the default value if the parameter does not exist factor = Integer.parseInt(context.getJobParameter("hashcode_factor", "12")); executor = Executors.newFixedThreadPool( Integer.parseInt(context.getJobParameter("in-flight-requests", "10")) ); } public final void eval( CompletableFuture<Integer> future, String s) { executor.submit(() -> { future.complete(s.hashCode() * factor); }); } } I tried to package `common-text-1.10` in different ways: 1. In the user jar. 2. Into the classpath of the Flink image. They both fail. I checked the source code and it seems that the flink-table-planner-loader uses a URLClassloader to load dependencies, thus it'll first check the `flink-table-planner-loader.jar` and skip the user's dependencies. There is an exceptional list in the class of `PlannerModule` which looks like this: class PlannerModule { /** * The name of the table planner dependency jar, bundled with flink-table-planner-loader module * artifact. */ static final String FLINK_TABLE_PLANNER_FAT_JAR = "flink-table-planner.jar"; private static final String HINT_USAGE = "mvn clean package -pl flink-table/flink-table-planner,flink-table/flink-table-planner-loader -DskipTests"; private static final String[] OWNER_CLASSPATH = Stream.concat( Arrays.stream(CoreOptions.PARENT_FIRST_LOGGING_PATTERNS), Stream.of( // These packages are shipped either by // flink-table-runtime or flink-dist itself "org.codehaus.janino", "org.codehaus.commons", "org.apache.commons.lang3", "org.apache.commons.math3", // with hive dialect, hadoop jar should be in classpath, // also, we should make it loaded by owner classloader, // otherwise, it'll throw class not found exception // when initialize HiveParser which requires hadoop "org.apache.hadoop")) .toArray(String[]::new); I think adding the `org.apache.commons.text` would do . On Wed, Apr 10, 2024 at 2:06 PM Xuyang <xyzhong...@163.com> wrote: Hi, Wang. Could you provide more details for this bug, such as minimum reproducible test code, pom dependencies, etc? Further more, can you try again to package the dependency "commons-text" with version "1.10.0" manually to check if it works? If you can work around this bug by this way, I think we should open an bug issue for it. -- Best! Xuyang At 2024-04-09 18:11:27, "Xiaolong Wang" <xiaolong.w...@smartnews.com> wrote: Hi, I found a ClassNotFound exception when using Flink 1.19's AsyncScalarFunction. Stack trace: Caused by: java.lang.ClassNotFoundException: org.apache.commons.text.StringSubstitutor at java.net.URLClassLoader.findClass(Unknown Source) ~[?:?] at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at org.apache.flink.core.classloading.ComponentClassLoader.loadClassFromComponentOnly(ComponentClassLoader.java:150) ~[flink-dist-1.19.0.jar:1.19.0] at org.apache.flink.core.classloading.ComponentClassLoader.loadClass(ComponentClassLoader.java:113) ~[flink-dist-1.19.0.jar:1.19.0] at java.lang.ClassLoader.loadClass(Unknown Source) ~[?:?] at org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateProcessCode(AsyncCodeGenerator.java:173) ~[?:?] at org.apache.flink.table.planner.codegen.AsyncCodeGenerator.generateFunction(AsyncCodeGenerator.java:77) ~[?:?] at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.getAsyncFunctionOperator(CommonExecAsyncCalc.java:146) ~[?:?] at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.createAsyncOneInputTransformation(CommonExecAsyncCalc.java:126) ~[?:?] at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecAsyncCalc.translateToPlanInternal(CommonExecAsyncCalc.java:89) ~[?:?] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168) ~[?:?] at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259) ~[?:?] at org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecCalc.translateToPlanInternal(CommonExecCalc.java:94) ~[?:?] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168) ~[?:?] at org.apache.flink.table.planner.plan.nodes.exec.ExecEdge.translateToPlan(ExecEdge.java:259) ~[?:?] at org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecSink.translateToPlanInternal(StreamExecSink.java:177) ~[?:?] at org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase.translateToPlan(ExecNodeBase.java:168) ~[?:?] at org.apache.flink.table.planner.delegation.StreamPlanner.$anonfun$translateToPlan$1(StreamPlanner.scala:85) ~[?:?] Environment: flink image: flink:1.19.0-scala_2.12-java11 Tried solutions: I tried to package the needed dependency `commons-io-1.10.0.jar` into both user jar and the classpath and the issue remained. Would someone please help resolve this ?