Yes. You need to set the "pipeline.classpath" via flink-conf.yaml or CLI options(-C/--classpath). I do not think setting it in your main class could work. Just like you said, the user classloader will not be updated after the user main class is executed.
Best, Yang Pouria Pirzadeh <pouria.pirza...@gmail.com> 于2021年12月18日周六 01:23写道: > I have tried 'PipelineOptions.CLASSPATHS'; It also fails with > ClassNotFoundException with the exact same error stack trace as > PipelineOptions.JARS. > > FYI The Same application jar works fine if submitted via Flink CLI using > 'flink run' with the "-C" option to update classpath: > <FLINK_HOME>/bin/flink run --detached -C file:///path/to/udf.jar .... > > The problem seems to be that the classpath for the ClassLoader which > codegen in table planner uses is not updated according to Configuration > passed to the StreamExecutionEnvironment, and I am not sure how that can > be done. > > Pouria > > > On Thu, Dec 16, 2021 at 8:46 PM Yang Wang <danrtsey...@gmail.com> wrote: > >> The config option "pipeline.jars" is used to specify the user jar, which >> contains the main class. >> I think what you need is "pipeline.classpaths". >> >> /** >> * A list of URLs that are added to the classpath of each user code >> classloader of the program. >> * Paths must specify a protocol (e.g. file://) and be accessible on all >> nodes >> */ >> public static final ConfigOption<List<String>> CLASSPATHS = >> key("pipeline.classpaths") >> .stringType() >> .asList() >> .noDefaultValue() >> .withDescription( >> "A semicolon-separated list of the classpaths to >> package with the job jars to be sent to" >> + " the cluster. These have to be valid >> URLs."); >> >> >> Best, >> Yang >> >> Pouria Pirzadeh <pouria.pirza...@gmail.com> 于2021年12月17日周五 03:43写道: >> >>> I am developing a Java application which uses UDFs on Flink 1.14. >>> It uses PipelineOptions.JARS config to add jar files, containing UDF >>> classes, dynamically to the user classpath in the main method; However the >>> application fails to load UDF class from configured jar files at job >>> launch time with and crashes with ClassNotFoundException. >>> >>> Is PipelineOptions.JARS the correct option to add files to classpath on >>> Job manager and all task managers? >>> >>> Sample code snippet: >>> >>> final Configuration configuration = new Configuration(); >>> >>> configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar")); >>> StreamExecutionEnvironment streamEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(configuration); >>> StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(streamEnv); >>> ... >>> Class udfClass = Class.forName("demo.MyUDF", ...); >>> tableEnv.createTemporarySystemFunction("MyUDF", udfClass); >>> ... >>> >>> Error stack trace: >>> Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF >>> at >>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582) >>> at >>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) >>> at >>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) >>> at >>> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886) >>> at >>> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772) >>> at >>> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) >>> at >>> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594) >>> at >>> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) >>> at >>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) >>> at >>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692) >>> at >>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714) >>> at >>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130) >>> at >>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116) >>> at >>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73) >>> at >>> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81) >>> at >>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825) >>> at >>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503) >>> at >>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) >>> at >>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) >>> at >>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437) >>> at >>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432) >>> at >>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356) >>> ... >>> >>