The config option "pipeline.jars" is used to specify the user jar, which
contains the main class.
I think what you need is "pipeline.classpaths".

/**
 * A list of URLs that are added to the classpath of each user code
classloader of the program.
 * Paths must specify a protocol (e.g. file://) and be accessible on all nodes
 */
public static final ConfigOption<List<String>> CLASSPATHS =
        key("pipeline.classpaths")
                .stringType()
                .asList()
                .noDefaultValue()
                .withDescription(
                        "A semicolon-separated list of the classpaths
to package with the job jars to be sent to"
                                + " the cluster. These have to be valid URLs.");


Best,
Yang

Pouria Pirzadeh <pouria.pirza...@gmail.com> 于2021年12月17日周五 03:43写道:

> I am developing a Java application which uses UDFs on Flink 1.14.
> It uses PipelineOptions.JARS config to add jar files, containing UDF
> classes, dynamically to the user classpath in the main method; However the
> application fails to load UDF class from configured jar files at job
> launch time with and crashes with ClassNotFoundException.
>
> Is PipelineOptions.JARS the correct option to add files to classpath on
> Job manager and all task managers?
>
> Sample code snippet:
>
> final Configuration configuration = new Configuration();
>
> configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar"));
> StreamExecutionEnvironment streamEnv =
> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv);
> ...
> Class udfClass = Class.forName("demo.MyUDF", ...);
> tableEnv.createTemporarySystemFunction("MyUDF", udfClass);
> ...
>
> Error stack trace:
> Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF
>     at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
>     at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>     at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>     at
> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886)
>     at
> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772)
>     at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>     at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)
>     at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>     at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>     at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692)
>     at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714)
>     at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130)
>     at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116)
>     at
> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
>     at
> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
>     at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825)
>     at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
>     at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>     at
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
>     at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437)
>     at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432)
>     at
> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356)
>     ...
>

Reply via email to