Hi,  Jakub
If I understand correctly you want the job, which you submitted, could load
some table function which does not in the job jar.
I don't think Flink could support this natively.(Maybe other guys know).
But I think this requirement is like some code generated. You need to
submit the "code" to the job. I think you could refer to the [1].

[1]
https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java#L29

Best,
Guowei


On Tue, Dec 8, 2020 at 8:40 PM Jakub N <jakub1...@hotmail.de> wrote:

> Hi Guowei,
>
>
>    1. Unfortunately the UDF and the job are not in the same fatjar.
>    Essentially there is only one "fatjar" containing the Flink environment +
>    the job, the UDF is separate.
>    2. Yes,  that is correct.
>    3. As explained in 1.  I don't submit job jars to the Flink
>    environment, instead the job is created and submitted within the "fatjar"
>
>
> Codewise nothing changed except for where the location of the UDF was
> specified.
> "Submitting to the environment" works as follows:
>
>    1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
>    2. (Register UDF's)
>    3. Create tables
>    4. Query on the tables
>    5. Execute the environment
>
> The overall process is executed as one program.
> Apologies if any of these explanations are unclear or too vague.
>
> Kind regards,
>
> Jakub
>
> ------------------------------
> *Von:* Guowei Ma <guowei....@gmail.com>
> *Gesendet:* Dienstag, 8. Dezember 2020 06:34
> *An:* Jakub N <jakub1...@hotmail.de>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Betreff:* Re: Flink UDF registration from jar at runtime
>
> Hi, Jakub
> I am not familiar with the `sbt pack`. But I assume you are doing
> following (correct me if I misunderstand you)
> 1. The UDF and Job jar are in the same "fatjar"
> 2. You "new" a UDF object in the job().
> 3. You submit the  "fatjar" to the local Flink environment.
>
> In theory there should not be any problem. Could share how you change the
> code and how you submit your job to the local environment.
>
> Best,
> Guowei
>
>
> On Tue, Dec 8, 2020 at 2:53 AM Jakub N <jakub1...@hotmail.de> wrote:
>
> Hi Guowei,
>
> It turned out for my application I unfortunately can't have the UDF in the
> "job's"  classpath. As I am using a local Flink environment and `sbt
> pack` (similar to a fatjar) to create launch scripts therefore, to my
> understanding, I can't access the classpath (when the project is packed).
> Are there any ways to add these UDF's from outside the classpath?
>
> Kind regards,
>
> Jakub
>
> ------------------------------
> *Von:* Jakub N <jakub1...@hotmail.de>
> *Gesendet:* Montag, 7. Dezember 2020 12:59
> *An:* Guowei Ma <guowei....@gmail.com>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Betreff:* Re: Flink UDF registration from jar at runtime
>
> Hi Guowei,
>
> Great thanks for your help. Your suggestion indeed solved the issue. I
> moved `myFunction` to the class path where execution starts.
>
> Kind regards,
>
> Jakub
>
> ------------------------------
> *Von:* Guowei Ma <guowei....@gmail.com>
> *Gesendet:* Montag, 7. Dezember 2020 12:16
> *An:* Jakub N <jakub1...@hotmail.de>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Betreff:* Re: Flink UDF registration from jar at runtime
>
> Hi Jakub,
> I think the problem is that the `cls`, which you load at runtime, is not
> in the thread context classloader.
> Flink deserializes the `myFunction` object with the context classloader.
> So maybe you could put the myFunction in the job's class path.
> Best,
> Guowei
>
>
> On Mon, Dec 7, 2020 at 5:54 PM Jakub N <jakub1...@hotmail.de> wrote:
>
> Hi Guowei,
>
> Thanks for your help,
> here is the relevant code (QueryCommand class):
>
> val fsSettings: EnvironmentSettings = EnvironmentSettings
>   .newInstance()
>   .useBlinkPlanner()
>   .inStreamingMode()
>   .build()
>
> val fsEnv: StreamExecutionEnvironment =
>   StreamExecutionEnvironment.getExecutionEnvironment
>
> fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
> fsEnv.getConfig.enableObjectReuse()
>
> val fsTableEnv: StreamTableEnvironment =
>   StreamTableEnvironment.create(fsEnv, fsSettings)
>
> val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler
> compiler.run(null, null, null, new 
> File("target/custom/myFunction.java").getPath)
>
> val root = new File("target/custom")
> val classLoader: URLClassLoader = new 
> URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
> val cls = classLoader.loadClass("myFunction")
> val instance = cls.newInstance();
> val udf = instance.asInstanceOf[ScalarFunction]
> fsTableEnv.createTemporaryFunction("myFunction", udf)
>
> //creating Table...
>
> fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
>
> def execute(): Unit = fsEnv.execute()
>
> myFunction.java
>
> import org.apache.flink.table.functions.ScalarFunction;
>
> public class myFunction extends ScalarFunction {
>
>     public String eval(String s) {
>         return "myFunction - " + s;
>     }
>
> }
>
> Execution works as follows: A QueryCommand instance is created, some
> properties are being set, *execute()* will be invoked
>
> Let me know if any other relevant information is missing, alternatively
> you can also have a look at the source code here (
> https://github.com/codefeedr/kafkaquery).
>
> Kind regards,
>
> Jakub
>
>
> ------------------------------
> *Von:* Guowei Ma <guowei....@gmail.com>
> *Gesendet:* Montag, 7. Dezember 2020 02:55
> *An:* Jakub N <jakub1...@hotmail.de>
> *Cc:* user@flink.apache.org <user@flink.apache.org>
> *Betreff:* Re: Flink UDF registration from jar at runtime
>
> Hi, Jakub
> In theory there should not be any problem because you could register the
> function object.
> So would you like to share your code and the shell command that you submit
> your job?
> Best,
> Guowei
>
>
> On Mon, Dec 7, 2020 at 3:19 AM Jakub N <jakub1...@hotmail.de> wrote:
>
> The current setup is: Data in Kafka -> Kafka Connector ->
> StreamTableEnvironment -> execute Flink SQL queries
>
> I would like to register Flink's User-defined Functions from a jar or java
> class file during runtime. What I have tried so far is using Java's
> Classloader getting an instance of a ScalarFunction (UDF) and registering
> it in the StreamTableEnvironment. When I try executing a query making use
> of the UDF I get the following exception:
>
>
> Exception in thread "main" java.lang.ClassNotFoundException: myFunction
>
> at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>
> at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>
> at java.base/java.lang.Class.forName0(Native Method)
>
> at java.base/java.lang.Class.forName(Class.java:398)
>
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>
> at
> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
>
> at
> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
>
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
>
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
>
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
>
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
>
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
>
> at
> org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
>
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
>
> at
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
>
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>
> at scala.collection.TraversableLike.map(TraversableLike.scala:285)
>
> at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> ...
>
>
> I have verified that the generated instance of the UDF behaves as expected
> when invoking any of its methods.
>
> Do you have any ideas on why this is failing?
>
>

Reply via email to