I have tried 'PipelineOptions.CLASSPATHS'; It also fails with
ClassNotFoundException with the exact same error stack trace as
PipelineOptions.JARS.

FYI The Same application jar works fine if submitted via Flink CLI using
'flink run' with the "-C" option to update classpath:
<FLINK_HOME>/bin/flink run --detached -C file:///path/to/udf.jar ....

The problem seems to be that the classpath for the ClassLoader which
codegen in table planner uses is not updated according to Configuration
passed to the StreamExecutionEnvironment, and I am not sure how that can be
done.

Pouria


On Thu, Dec 16, 2021 at 8:46 PM Yang Wang <danrtsey...@gmail.com> wrote:

> The config option "pipeline.jars" is used to specify the user jar, which
> contains the main class.
> I think what you need is "pipeline.classpaths".
>
> /**
>  * A list of URLs that are added to the classpath of each user code 
> classloader of the program.
>  * Paths must specify a protocol (e.g. file://) and be accessible on all nodes
>  */
> public static final ConfigOption<List<String>> CLASSPATHS =
>         key("pipeline.classpaths")
>                 .stringType()
>                 .asList()
>                 .noDefaultValue()
>                 .withDescription(
>                         "A semicolon-separated list of the classpaths to 
> package with the job jars to be sent to"
>                                 + " the cluster. These have to be valid 
> URLs.");
>
>
> Best,
> Yang
>
> Pouria Pirzadeh <pouria.pirza...@gmail.com> 于2021年12月17日周五 03:43写道:
>
>> I am developing a Java application which uses UDFs on Flink 1.14.
>> It uses PipelineOptions.JARS config to add jar files, containing UDF
>> classes, dynamically to the user classpath in the main method; However the
>> application fails to load UDF class from configured jar files at job
>> launch time with and crashes with ClassNotFoundException.
>>
>> Is PipelineOptions.JARS the correct option to add files to classpath on
>> Job manager and all task managers?
>>
>> Sample code snippet:
>>
>> final Configuration configuration = new Configuration();
>>
>> configuration.set(PipelineOptions.JARS,Collections.singletonList("file:///path/to/udf.jar"));
>> StreamExecutionEnvironment streamEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment(configuration);
>> StreamTableEnvironment tableEnv =
>> StreamTableEnvironment.create(streamEnv);
>> ...
>> Class udfClass = Class.forName("demo.MyUDF", ...);
>> tableEnv.createTemporarySystemFunction("MyUDF", udfClass);
>> ...
>>
>> Error stack trace:
>> Exception in thread "main" java.lang.ClassNotFoundException: demo.MyUDF
>>     at
>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:582)
>>     at
>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>>     at
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>     at
>> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1886)
>>     at
>> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1772)
>>     at
>> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2060)
>>     at
>> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1594)
>>     at
>> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:430)
>>     at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
>>     at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
>>     at
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
>>     at
>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:692)
>>     at
>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:714)
>>     at
>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:130)
>>     at
>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCallWithDataType(BridgingFunctionGenUtil.scala:116)
>>     at
>> org.apache.flink.table.planner.codegen.calls.BridgingFunctionGenUtil$.generateFunctionAwareCall(BridgingFunctionGenUtil.scala:73)
>>     at
>> org.apache.flink.table.planner.codegen.calls.BridgingSqlFunctionCallGen.generate(BridgingSqlFunctionCallGen.scala:81)
>>     at
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:825)
>>     at
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:503)
>>     at
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:58)org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:70)
>>     at
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:185)
>>     at
>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:437)
>>     at
>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toStreamInternal(StreamTableEnvironmentImpl.java:432)
>>     at
>> org.apache.flink.table.api.bridge.java.internal.StreamTableEnvironmentImpl.toDataStream(StreamTableEnvironmentImpl.java:356)
>>     ...
>>
>

Reply via email to