Hi Jakub, I think the problem is that the `cls`, which you load at runtime, is not in the thread context classloader. Flink deserializes the `myFunction` object with the context classloader. So maybe you could put the myFunction in the job's class path. Best, Guowei
On Mon, Dec 7, 2020 at 5:54 PM Jakub N <jakub1...@hotmail.de> wrote: > Hi Guowei, > > Thanks for your help, > here is the relevant code (QueryCommand class): > > val fsSettings: EnvironmentSettings = EnvironmentSettings > .newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > > val fsEnv: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > > fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > fsEnv.getConfig.enableObjectReuse() > > val fsTableEnv: StreamTableEnvironment = > StreamTableEnvironment.create(fsEnv, fsSettings) > > val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler > compiler.run(null, null, null, new > File("target/custom/myFunction.java").getPath) > > val root = new File("target/custom") > val classLoader: URLClassLoader = new > URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader) > val cls = classLoader.loadClass("myFunction") > val instance = cls.newInstance(); > val udf = instance.asInstanceOf[ScalarFunction] > fsTableEnv.createTemporaryFunction("myFunction", udf) > > //creating Table... > > fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic") > > def execute(): Unit = fsEnv.execute() > > myFunction.java > > import org.apache.flink.table.functions.ScalarFunction; > > public class myFunction extends ScalarFunction { > > public String eval(String s) { > return "myFunction - " + s; > } > > } > > Execution works as follows: A QueryCommand instance is created, some > properties are being set, *execute()* will be invoked > > Let me know if any other relevant information is missing, alternatively > you can also have a look at the source code here ( > https://github.com/codefeedr/kafkaquery). > > Kind regards, > > Jakub > > > ------------------------------ > *Von:* Guowei Ma <guowei....@gmail.com> > *Gesendet:* Montag, 7. Dezember 2020 02:55 > *An:* Jakub N <jakub1...@hotmail.de> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Betreff:* Re: Flink UDF registration from jar at runtime > > Hi, Jakub > In theory there should not be any problem because you could register the > function object. > So would you like to share your code and the shell command that you submit > your job? > Best, > Guowei > > > On Mon, Dec 7, 2020 at 3:19 AM Jakub N <jakub1...@hotmail.de> wrote: > > The current setup is: Data in Kafka -> Kafka Connector -> > StreamTableEnvironment -> execute Flink SQL queries > > I would like to register Flink's User-defined Functions from a jar or java > class file during runtime. What I have tried so far is using Java's > Classloader getting an instance of a ScalarFunction (UDF) and registering > it in the StreamTableEnvironment. When I try executing a query making use > of the UDF I get the following exception: > > > Exception in thread "main" java.lang.ClassNotFoundException: myFunction > > at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > > at java.base/java.lang.Class.forName0(Native Method) > > at java.base/java.lang.Class.forName(Class.java:398) > > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) > > at > java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943) > > at > java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829) > > at > java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117) > > at > java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646) > > at > java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) > > at > java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > > at > org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626) > > at > org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648) > > at > org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53) > > at org.apache.calcite.rex.RexCall.accept(RexCall.java:288) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133) > > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152) > > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) > > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > > at scala.collection.TraversableLike.map(TraversableLike.scala:285) > > at scala.collection.TraversableLike.map$(TraversableLike.scala:278) > > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > ... > > > I have verified that the generated instance of the UDF behaves as expected > when invoking any of its methods. > > Do you have any ideas on why this is failing? > >