Hi Jakub,
I think the problem is that the `cls`, which you load at runtime, is not in
the thread context classloader.
Flink deserializes the `myFunction` object with the context classloader.
So maybe you could put the myFunction in the job's class path.
Best,
Guowei


On Mon, Dec 7, 2020 at 5:54 PM Jakub N <jakub1...@hotmail.de> wrote:

> Hi Guowei,
>
> Thanks for your help,
> here is the relevant code (QueryCommand class):
>
> val fsSettings: EnvironmentSettings = EnvironmentSettings
>   .newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
>
> val fsEnv: StreamExecutionEnvironment =
>   StreamExecutionEnvironment.getExecutionEnvironment
>
> fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> fsEnv.getConfig.enableObjectReuse()
>
> val fsTableEnv: StreamTableEnvironment =
>   StreamTableEnvironment.create(fsEnv, fsSettings)
>
> val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
> compiler.run(null, null, null, new 
> File("target/custom/myFunction.java").getPath)
>
> val root = new File("target/custom")
> val classLoader: URLClassLoader = new 
> URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
> val cls = classLoader.loadClass("myFunction")
> val instance = cls.newInstance();
> val udf = instance.asInstanceOf[ScalarFunction]
> fsTableEnv.createTemporaryFunction("myFunction", udf)
>
> //creating Table...
>
> fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
>
> def execute(): Unit = fsEnv.execute()
>
> myFunction.java
>
> import org.apache.flink.table.functions.ScalarFunction;
>
> public class myFunction extends ScalarFunction {
>
>     public String eval(String s) {
>         return "myFunction - " + s;
>     }
>
> }
>
> Execution works as follows: A QueryCommand instance is created, some
> properties are being set, *execute()* will be invoked
>
> Let me know if any other relevant information is missing, alternatively
> you can also have a look at the source code here (
> https://github.com/codefeedr/kafkaquery).
>
> Kind regards,
>
> Jakub
>
>
> ------------------------------
> *Von:* Guowei Ma <guowei....@gmail.com>
> *Gesendet:* Montag, 7. Dezember 2020 02:55
> *An:* Jakub N <jakub1...@hotmail.de>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Betreff:* Re: Flink UDF registration from jar at runtime
>
> Hi, Jakub
> In theory there should not be any problem because you could register the
> function object.
> So would you like to share your code and the shell command that you submit
> your job?
> Best,
> Guowei
>
>
> On Mon, Dec 7, 2020 at 3:19 AM Jakub N <jakub1...@hotmail.de> wrote:
>
> The current setup is: Data in Kafka -> Kafka Connector ->
> StreamTableEnvironment -> execute Flink SQL queries
>
> I would like to register Flink's User-defined Functions from a jar or java
> class file during runtime. What I have tried so far is using Java's
> Classloader getting an instance of a ScalarFunction (UDF) and registering
> it in the StreamTableEnvironment. When I try executing a query making use
> of the UDF I get the following exception:
>
>
> Exception in thread "main" java.lang.ClassNotFoundException: myFunction
>
> at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>
> at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>
> at java.base/java.lang.Class.forName0(Native Method)
>
> at java.base/java.lang.Class.forName(Class.java:398)
>
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>
> at
> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
>
> at
> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
>
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
>
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
>
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
>
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
>
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
>
> at
> org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
>
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
>
> at
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
>
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>
> at scala.collection.TraversableLike.map(TraversableLike.scala:285)
>
> at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> ...
>
>
> I have verified that the generated instance of the UDF behaves as expected
> when invoking any of its methods.
>
> Do you have any ideas on why this is failing?
>
>

Reply via email to