Hi, Jakub I am not familiar with the `sbt pack`. But I assume you are doing following (correct me if I misunderstand you) 1. The UDF and Job jar are in the same "fatjar" 2. You "new" a UDF object in the job(). 3. You submit the "fatjar" to the local Flink environment.
In theory there should not be any problem. Could share how you change the code and how you submit your job to the local environment. Best, Guowei On Tue, Dec 8, 2020 at 2:53 AM Jakub N <jakub1...@hotmail.de> wrote: > Hi Guowei, > > It turned out for my application I unfortunately can't have the UDF in the > "job's" classpath. As I am using a local Flink environment and `sbt > pack` (similar to a fatjar) to create launch scripts therefore, to my > understanding, I can't access the classpath (when the project is packed). > Are there any ways to add these UDF's from outside the classpath? > > Kind regards, > > Jakub > > ------------------------------ > *Von:* Jakub N <jakub1...@hotmail.de> > *Gesendet:* Montag, 7. Dezember 2020 12:59 > *An:* Guowei Ma <guowei....@gmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Betreff:* Re: Flink UDF registration from jar at runtime > > Hi Guowei, > > Great thanks for your help. Your suggestion indeed solved the issue. I > moved `myFunction` to the class path where execution starts. > > Kind regards, > > Jakub > > ------------------------------ > *Von:* Guowei Ma <guowei....@gmail.com> > *Gesendet:* Montag, 7. Dezember 2020 12:16 > *An:* Jakub N <jakub1...@hotmail.de> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Betreff:* Re: Flink UDF registration from jar at runtime > > Hi Jakub, > I think the problem is that the `cls`, which you load at runtime, is not > in the thread context classloader. > Flink deserializes the `myFunction` object with the context classloader. > So maybe you could put the myFunction in the job's class path. > Best, > Guowei > > > On Mon, Dec 7, 2020 at 5:54 PM Jakub N <jakub1...@hotmail.de> wrote: > > Hi Guowei, > > Thanks for your help, > here is the relevant code (QueryCommand class): > > val fsSettings: EnvironmentSettings = EnvironmentSettings > .newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > > val fsEnv: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > > fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > fsEnv.getConfig.enableObjectReuse() > > val fsTableEnv: StreamTableEnvironment = > StreamTableEnvironment.create(fsEnv, fsSettings) > > val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler > compiler.run(null, null, null, new > File("target/custom/myFunction.java").getPath) > > val root = new File("target/custom") > val classLoader: URLClassLoader = new > URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader) > val cls = classLoader.loadClass("myFunction") > val instance = cls.newInstance(); > val udf = instance.asInstanceOf[ScalarFunction] > fsTableEnv.createTemporaryFunction("myFunction", udf) > > //creating Table... > > fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic") > > def execute(): Unit = fsEnv.execute() > > myFunction.java > > import org.apache.flink.table.functions.ScalarFunction; > > public class myFunction extends ScalarFunction { > > public String eval(String s) { > return "myFunction - " + s; > } > > } > > Execution works as follows: A QueryCommand instance is created, some > properties are being set, *execute()* will be invoked > > Let me know if any other relevant information is missing, alternatively > you can also have a look at the source code here ( > https://github.com/codefeedr/kafkaquery). > > Kind regards, > > Jakub > > > ------------------------------ > *Von:* Guowei Ma <guowei....@gmail.com> > *Gesendet:* Montag, 7. Dezember 2020 02:55 > *An:* Jakub N <jakub1...@hotmail.de> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Betreff:* Re: Flink UDF registration from jar at runtime > > Hi, Jakub > In theory there should not be any problem because you could register the > function object. > So would you like to share your code and the shell command that you submit > your job? > Best, > Guowei > > > On Mon, Dec 7, 2020 at 3:19 AM Jakub N <jakub1...@hotmail.de> wrote: > > The current setup is: Data in Kafka -> Kafka Connector -> > StreamTableEnvironment -> execute Flink SQL queries > > I would like to register Flink's User-defined Functions from a jar or java > class file during runtime. What I have tried so far is using Java's > Classloader getting an instance of a ScalarFunction (UDF) and registering > it in the StreamTableEnvironment. When I try executing a query making use > of the UDF I get the following exception: > > > Exception in thread "main" java.lang.ClassNotFoundException: myFunction > > at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > > at java.base/java.lang.Class.forName0(Native Method) > > at java.base/java.lang.Class.forName(Class.java:398) > > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) > > at > java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943) > > at > java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829) > > at > java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117) > > at > java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646) > > at > java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) > > at > java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > > at > org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626) > > at > org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648) > > at > org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53) > > at org.apache.calcite.rex.RexCall.accept(RexCall.java:288) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133) > > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152) > > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) > > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > > at scala.collection.TraversableLike.map(TraversableLike.scala:285) > > at scala.collection.TraversableLike.map$(TraversableLike.scala:278) > > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > ... > > > I have verified that the generated instance of the UDF behaves as expected > when invoking any of its methods. > > Do you have any ideas on why this is failing? > >