I have tried 'PipelineOptions.CLASSPATHS'; It also fails with ClassNotFoundException with the exact same error stack trace as PipelineOptions.JARS.
FYI The Same application jar works fine if submitted via Flink CLI using 'flink run' with the "-C" option to update classpath: <FLINK_HOME>/bin/flink run --detached -C file:///path/to/udf.jar .... The problem seems to be that the classpath for the ClassLoader which codegen in table planner uses is not updated according to Configuration passed to the StreamExecutionEnvironment, and I am not sure how that can be done. Pouria On Thu, Dec 16, 2021 at 8:46 PM Yang Wang <danrtsey...@gmail.com> wrote: > The config option "pipeline.jars" is used to specify the user jar, which > contains the main class. > I think what you need is "pipeline.classpaths". > > /** > * A list of URLs that are added to the classpath of each user code > classloader of the program. > * Paths must specify a protocol (e.g. file://) and be accessible on all nodes > */ > public static final ConfigOption<List<String>> CLASSPATHS = > key("pipeline.classpaths") > .stringType() > .asList() > .noDefaultValue() > .withDescription( > "A semicolon-separated list of the classpaths to > package with the job jars to be sent to" > + " the cluster. These have to be valid > URLs."); > > > Best, > Yang > > Pouria Pirzadeh <pouria.pirza...@gmail.com> 于2021年12月17日周五 03:43写道: > >> I am developing a Java application which uses UDFs on Flink 1.14. >> It uses PipelineOptions.JARS config to add jar files, containing UDF >> classes, dynamically to the user classpath in the main method; However the >> application fails to load UDF class from configured jar files at job >> launch time with and crashes with ClassNotFoundException. >> >> Is PipelineOptions.JARS the correct option to add files to classpath on >> Job manager and all task managers? >> >> Sample code snippet: >> >> final Configuration configuration = new Configuration(); >> >> configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar")); >> StreamExecutionEnvironment streamEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(configuration); >> StreamTableEnvironment tableEnv = >> StreamTableEnvironment.create(streamEnv); >> ... >> Class udfClass = Class.forName("demo.MyUDF", ...); >> tableEnv.createTemporarySystemFunction("MyUDF", udfClass); >> ... >> >> Error stack trace: >> Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF >> at >> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582) >> at >> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) >> at >> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) >> at >> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886) >> at >> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772) >> at >> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) >> at >> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594) >> at >> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) >> at >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) >> at >> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692) >> at >> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714) >> at >> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130) >> at >> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116) >> at >> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73) >> at >> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81) >> at >> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825) >> at >> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503) >> at >> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) >> at >> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) >> at >> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437) >> at >> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432) >> at >> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356) >> ... >> >