The config option "pipeline.jars" is used to specify the user jar, which contains the main class. I think what you need is "pipeline.classpaths".
/** * A list of URLs that are added to the classpath of each user code classloader of the program. * Paths must specify a protocol (e.g. file://) and be accessible on all nodes */ public static final ConfigOption<List<String>> CLASSPATHS = key("pipeline.classpaths") .stringType() .asList() .noDefaultValue() .withDescription( "A semicolon-separated list of the classpaths to package with the job jars to be sent to" + " the cluster. These have to be valid URLs."); Best, Yang Pouria Pirzadeh <pouria.pirza...@gmail.com> 于2021年12月17日周五 03:43写道: > I am developing a Java application which uses UDFs on Flink 1.14. > It uses PipelineOptions.JARS config to add jar files, containing UDF > classes, dynamically to the user classpath in the main method; However the > application fails to load UDF class from configured jar files at job > launch time with and crashes with ClassNotFoundException. > > Is PipelineOptions.JARS the correct option to add files to classpath on > Job manager and all task managers? > > Sample code snippet: > > final Configuration configuration = new Configuration(); > > configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar")); > StreamExecutionEnvironment streamEnv = > StreamExecutionEnvironment.getExecutionEnvironment(configuration); > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv); > ... > Class udfClass = Class.forName("demo.MyUDF", ...); > tableEnv.createTemporarySystemFunction("MyUDF", udfClass); > ... > > Error stack trace: > Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF > at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582) > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) > at > java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886) > at > java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772) > at > java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060) > at > java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594) > at > java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589) > at > org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692) > at > org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714) > at > org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130) > at > org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116) > at > org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73) > at > org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503) > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70) > at > org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432) > at > org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356) > ... >