Hi, Jakub If I understand correctly you want the job, which you submitted, could load some table function which does not in the job jar. I don't think Flink could support this natively.(Maybe other guys know). But I think this requirement is like some code generated. You need to submit the "code" to the job. I think you could refer to the [1].
[1] https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java#L29 Best, Guowei On Tue, Dec 8, 2020 at 8:40 PM Jakub N <jakub1...@hotmail.de> wrote: > Hi Guowei, > > > 1. Unfortunately the UDF and the job are not in the same fatjar. > Essentially there is only one "fatjar" containing the Flink environment + > the job, the UDF is separate. > 2. Yes, that is correct. > 3. As explained in 1. I don't submit job jars to the Flink > environment, instead the job is created and submitted within the "fatjar" > > > Codewise nothing changed except for where the location of the UDF was > specified. > "Submitting to the environment" works as follows: > > 1. Create a StreamExecutionEnvironment -> StreamTableEnvironment > 2. (Register UDF's) > 3. Create tables > 4. Query on the tables > 5. Execute the environment > > The overall process is executed as one program. > Apologies if any of these explanations are unclear or too vague. > > Kind regards, > > Jakub > > ------------------------------ > *Von:* Guowei Ma <guowei....@gmail.com> > *Gesendet:* Dienstag, 8. Dezember 2020 06:34 > *An:* Jakub N <jakub1...@hotmail.de> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Betreff:* Re: Flink UDF registration from jar at runtime > > Hi, Jakub > I am not familiar with the `sbt pack`. But I assume you are doing > following (correct me if I misunderstand you) > 1. The UDF and Job jar are in the same "fatjar" > 2. You "new" a UDF object in the job(). > 3. You submit the "fatjar" to the local Flink environment. > > In theory there should not be any problem. Could share how you change the > code and how you submit your job to the local environment. > > Best, > Guowei > > > On Tue, Dec 8, 2020 at 2:53 AM Jakub N <jakub1...@hotmail.de> wrote: > > Hi Guowei, > > It turned out for my application I unfortunately can't have the UDF in the > "job's" classpath. As I am using a local Flink environment and `sbt > pack` (similar to a fatjar) to create launch scripts therefore, to my > understanding, I can't access the classpath (when the project is packed). > Are there any ways to add these UDF's from outside the classpath? > > Kind regards, > > Jakub > > ------------------------------ > *Von:* Jakub N <jakub1...@hotmail.de> > *Gesendet:* Montag, 7. Dezember 2020 12:59 > *An:* Guowei Ma <guowei....@gmail.com> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Betreff:* Re: Flink UDF registration from jar at runtime > > Hi Guowei, > > Great thanks for your help. Your suggestion indeed solved the issue. I > moved `myFunction` to the class path where execution starts. > > Kind regards, > > Jakub > > ------------------------------ > *Von:* Guowei Ma <guowei....@gmail.com> > *Gesendet:* Montag, 7. Dezember 2020 12:16 > *An:* Jakub N <jakub1...@hotmail.de> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Betreff:* Re: Flink UDF registration from jar at runtime > > Hi Jakub, > I think the problem is that the `cls`, which you load at runtime, is not > in the thread context classloader. > Flink deserializes the `myFunction` object with the context classloader. > So maybe you could put the myFunction in the job's class path. > Best, > Guowei > > > On Mon, Dec 7, 2020 at 5:54 PM Jakub N <jakub1...@hotmail.de> wrote: > > Hi Guowei, > > Thanks for your help, > here is the relevant code (QueryCommand class): > > val fsSettings: EnvironmentSettings = EnvironmentSettings > .newInstance() > .useBlinkPlanner() > .inStreamingMode() > .build() > > val fsEnv: StreamExecutionEnvironment = > StreamExecutionEnvironment.getExecutionEnvironment > > fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) > fsEnv.getConfig.enableObjectReuse() > > val fsTableEnv: StreamTableEnvironment = > StreamTableEnvironment.create(fsEnv, fsSettings) > > val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler > compiler.run(null, null, null, new > File("target/custom/myFunction.java").getPath) > > val root = new File("target/custom") > val classLoader: URLClassLoader = new > URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader) > val cls = classLoader.loadClass("myFunction") > val instance = cls.newInstance(); > val udf = instance.asInstanceOf[ScalarFunction] > fsTableEnv.createTemporaryFunction("myFunction", udf) > > //creating Table... > > fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic") > > def execute(): Unit = fsEnv.execute() > > myFunction.java > > import org.apache.flink.table.functions.ScalarFunction; > > public class myFunction extends ScalarFunction { > > public String eval(String s) { > return "myFunction - " + s; > } > > } > > Execution works as follows: A QueryCommand instance is created, some > properties are being set, *execute()* will be invoked > > Let me know if any other relevant information is missing, alternatively > you can also have a look at the source code here ( > https://github.com/codefeedr/kafkaquery). > > Kind regards, > > Jakub > > > ------------------------------ > *Von:* Guowei Ma <guowei....@gmail.com> > *Gesendet:* Montag, 7. Dezember 2020 02:55 > *An:* Jakub N <jakub1...@hotmail.de> > *Cc:* user@flink.apache.org <user@flink.apache.org> > *Betreff:* Re: Flink UDF registration from jar at runtime > > Hi, Jakub > In theory there should not be any problem because you could register the > function object. > So would you like to share your code and the shell command that you submit > your job? > Best, > Guowei > > > On Mon, Dec 7, 2020 at 3:19 AM Jakub N <jakub1...@hotmail.de> wrote: > > The current setup is: Data in Kafka -> Kafka Connector -> > StreamTableEnvironment -> execute Flink SQL queries > > I would like to register Flink's User-defined Functions from a jar or java > class file during runtime. What I have tried so far is using Java's > Classloader getting an instance of a ScalarFunction (UDF) and registering > it in the StreamTableEnvironment. When I try executing a query making use > of the UDF I get the following exception: > > > Exception in thread "main" java.lang.ClassNotFoundException: myFunction > > at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) > > at java.base/java.lang.Class.forName0(Native Method) > > at java.base/java.lang.Class.forName(Class.java:398) > > at > org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) > > at > java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943) > > at > java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829) > > at > java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117) > > at > java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646) > > at > java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) > > at > java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) > > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) > > at > org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626) > > at > org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648) > > at > org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53) > > at org.apache.calcite.rex.RexCall.accept(RexCall.java:288) > > at > org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133) > > at > org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152) > > at > scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) > > at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) > > at > scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) > > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) > > at scala.collection.TraversableLike.map(TraversableLike.scala:285) > > at scala.collection.TraversableLike.map$(TraversableLike.scala:278) > > at scala.collection.AbstractTraversable.map(Traversable.scala:108) > ... > > > I have verified that the generated instance of the UDF behaves as expected > when invoking any of its methods. > > Do you have any ideas on why this is failing? > >