Hey,

I am not sure exactly what is going wrong in your case, but I put
together an example to show you how I would do it:


@Test public void testClassloader() throws IOException, ClassNotFoundException {
   URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava(
      folder.newFolder(), "BoolToInt.java", "" + "import 
org.apache.flink.table.functions.ScalarFunction;" + "\n" + "public class 
BoolToInt extends ScalarFunction {\n" + "\tpublic int eval(boolean b) {\n" + 
"\t\treturn b ? 1 : 0;\n" + "\t}\n" + "}" ); TableEnvironment tEnv = 
TableEnvironment.create(EnvironmentSettings .newInstance()
      .useBlinkPlanner()
      .build()); Class<ScalarFunction> boolToInt = (Class<ScalarFunction>) 
functionClassloader.loadClass("BoolToInt"); try (TemporaryClassLoaderContext 
ignored = TemporaryClassLoaderContext.of(functionClassloader)) {
      tEnv.createFunction("BoolToInt", boolToInt); TableResult tableResult = 
tEnv.executeSql("SELECT BoolToInt(TRUE)"); tableResult.print(); }
}

I verified this runs on the current master. The ClassLoaderUtils is a
Flink utility which writes out the provided code and loads it into a
classloader. As far as I can tell it mimics your situation pretty well.

Best,
Dawid

On 10/12/2020 20:16, Jakub N wrote:
> Hi Dawid,
>
> According to your suggestion, given that a I spawn a LocalEnvironment,
> I tried the following:
>
> val root = new File("custom")
> val classLoader: URLClassLoader = new 
> URLClassLoader(Array[URL](root.toURI.toURL), 
> Thread.currentThread().getContextClassLoader)
> val cls = classLoader.loadClass("myFunction")
> val instance = cls.newInstance(); val udf = 
> instance.asInstanceOf[ScalarFunction]
>
> val ignored = TemporaryClassLoaderContext.of(classLoader)
> try {
>   fsTableEnv.createTemporaryFunction("myFunction", udf)
>   fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")
> }
> unfortunately this still results in a ClassNotFoundException when
> executing the environment. (The class is located outside of the
> classpath and is loaded succesfully, instances of it behave as expected)
> Did I possibly missunderstand what you were proposing?
>
> Kind regards,
>
> Jakub
>
>
>
> ------------------------------------------------------------------------
> *Von:* Dawid Wysakowicz
> *Gesendet:* Donnerstag, 10. Dezember 2020 09:59
> *Bis:* Guowei Ma; Jakub N
> *Cc:* user@flink.apache.org
> *Betreff:* Re: Flink UDF registration from jar at runtime
>
> Hi Jakub,
>
> As Guowei said the UDF must be present in the user classloader. It
> must be there when compiling the program and when executing on the
> cluster. As of now the TableEnvironment uses the Thread context
> classloader as the "user classloader" when compiling the query.
> Therefore you can do the trick via:
>
> |ClassLoader yourClassloader = ... // create your classloader with the
> UDF|
>
> |try (TemporaryClassLoaderContext ignored =
> TemporaryClassLoaderContext.of(|||yourClassloader|)) {|
>
> |    fsTableEnv.createTemporaryFunction("myFunction", udf)||
> ||
> ||    fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")|
>
> |}
> |
>
> Take a look at the TemporaryClassLoaderContext[1] for a nice way how
> to do it with a cleanup at the end.
>
> To solve the second problem of having the UDF on the classpath when
> executing. If you are just spawning a LocalEnvironment the above
> should do the trick as it will use the context classloader. If you are
> submitting to a cluster, you can submit multiple jars as part of a
> single job either via the RemoteEnvironment or the flink run command.
>
> That's how we submit UDFs from separate jars in the sql-client. You
> can try to go through a few classes there and see how it is done. I am
> afraid it's not the easiest task as there are quite a few classes to
> navigate through. You could start from e.g.
> org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2]
>
> [1]
> https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java
>
> [2]
> https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305
>
>
>
> On 10/12/2020 09:15, Guowei Ma wrote:
>> Hi,  Jakub
>> If I understand correctly you want the job, which you submitted,
>> could load some table function which does not in the job jar.
>> I don't think Flink could support this natively.(Maybe other guys know).
>> But I think this requirement is like some code generated. You need to
>> submit the "code" to the job. I think you could refer to the [1].
>>
>> [1]
>> https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java#L29
>>
>> Best,
>> Guowei
>>
>>
>> On Tue, Dec 8, 2020 at 8:40 PM Jakub N <jakub1...@hotmail.de
>> <mailto:jakub1...@hotmail.de>> wrote:
>>
>>     Hi Guowei,
>>
>>      1. Unfortunately the UDF and the job are not in the same fatjar.
>>         Essentially there is only one "fatjar" containing the Flink
>>         environment + the job, the UDF is separate. 
>>      2. Yes,  that is correct.
>>      3. As explained in 1.  I don't submit job jars to the Flink
>>         environment, instead the job is created and submitted within
>>         the "fatjar"
>>
>>
>>     Codewise nothing changed except for where the location of the UDF
>>     was specified. 
>>     "Submitting to the environment" works as follows:
>>
>>      1. Create a StreamExecutionEnvironment -> StreamTableEnvironment
>>      2. (Register UDF's)
>>      3. Create tables
>>      4. Query on the tables
>>      5. Execute the environment
>>
>>     The overall process is executed as one program.
>>     Apologies if any of these explanations are unclear or too vague.
>>
>>     Kind regards,
>>
>>     Jakub
>>
>>     ------------------------------------------------------------------------
>>     *Von:* Guowei Ma <guowei....@gmail.com <mailto:guowei....@gmail.com>>
>>     *Gesendet:* Dienstag, 8. Dezember 2020 06:34
>>     *An:* Jakub N <jakub1...@hotmail.de <mailto:jakub1...@hotmail.de>>
>>     *Cc:* user@flink.apache.org <mailto:user@flink.apache.org>
>>     <user@flink.apache.org <mailto:user@flink.apache.org>>
>>     *Betreff:* Re: Flink UDF registration from jar at runtime
>>      
>>     Hi, Jakub
>>     I am not familiar with the `sbt pack`. But I assume you are doing
>>     following (correct me if I misunderstand you)
>>     1. The UDF and Job jar are in the same "fatjar" 
>>     2. You "new" a UDF object in the job(). 
>>     3. You submit the  "fatjar" to the local Flink environment. 
>>
>>     In theory there should not be any problem. Could share how you
>>     change the code and how you submit your job to the local environment.
>>
>>     Best,
>>     Guowei
>>
>>
>>     On Tue, Dec 8, 2020 at 2:53 AM Jakub N <jakub1...@hotmail.de
>>     <mailto:jakub1...@hotmail.de>> wrote:
>>
>>         Hi Guowei,
>>
>>         It turned out for my application I unfortunately can't have
>>         the UDF in the "job's"  classpath. As I am using a local
>>         Flink environment and `sbt pack` (similar to a fatjar) to
>>         create launch scripts therefore, to my understanding, I can't
>>         access the classpath (when the project is packed). 
>>         Are there any ways to add these UDF's from outside the classpath?
>>
>>         Kind regards,
>>
>>         Jakub
>>
>>         
>> ------------------------------------------------------------------------
>>         *Von:* Jakub N <jakub1...@hotmail.de
>>         <mailto:jakub1...@hotmail.de>>
>>         *Gesendet:* Montag, 7. Dezember 2020 12:59
>>         *An:* Guowei Ma <guowei....@gmail.com
>>         <mailto:guowei....@gmail.com>>
>>         *Cc:* user@flink.apache.org <mailto:user@flink.apache.org>
>>         <user@flink.apache.org <mailto:user@flink.apache.org>>
>>         *Betreff:* Re: Flink UDF registration from jar at runtime
>>          
>>         Hi Guowei,
>>
>>         Great thanks for your help. Your suggestion indeed solved the
>>         issue. I moved `myFunction` to the class path where execution
>>         starts.
>>
>>         Kind regards,
>>
>>         Jakub 
>>
>>         
>> ------------------------------------------------------------------------
>>         *Von:* Guowei Ma <guowei....@gmail.com
>>         <mailto:guowei....@gmail.com>>
>>         *Gesendet:* Montag, 7. Dezember 2020 12:16
>>         *An:* Jakub N <jakub1...@hotmail.de
>>         <mailto:jakub1...@hotmail.de>>
>>         *Cc:* user@flink.apache.org <mailto:user@flink.apache.org>
>>         <user@flink.apache.org <mailto:user@flink.apache.org>>
>>         *Betreff:* Re: Flink UDF registration from jar at runtime
>>          
>>         Hi Jakub,
>>         I think the problem is that the `cls`, which you load at
>>         runtime, is not in the thread context classloader.
>>         Flink deserializes the `myFunction` object with the context
>>         classloader.
>>         So maybe you could put the myFunction in the job's class path.
>>         Best,
>>         Guowei
>>
>>
>>         On Mon, Dec 7, 2020 at 5:54 PM Jakub N <jakub1...@hotmail.de
>>         <mailto:jakub1...@hotmail.de>> wrote:
>>
>>             Hi Guowei,
>>
>>             Thanks for your help,
>>             here is the relevant code (QueryCommand class):
>>
>>             val fsSettings: EnvironmentSettings = EnvironmentSettings
>>               .newInstance()
>>               .useBlinkPlanner()
>>               .inStreamingMode()
>>               .build()
>>
>>             val fsEnv: StreamExecutionEnvironment =
>>               StreamExecutionEnvironment.getExecutionEnvironment 
>> fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
>>             fsEnv.getConfig.enableObjectReuse()
>>
>>             val fsTableEnv: StreamTableEnvironment =
>>               StreamTableEnvironment.create(fsEnv, fsSettings)
>>
>>             val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler 
>> compiler.run(null, null, null, new 
>> File("target/custom/myFunction.java").getPath)
>>
>>             val root = new File("target/custom")
>>             val classLoader: URLClassLoader = new 
>> URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader)
>>             val cls = classLoader.loadClass("myFunction")
>>             val instance = cls.newInstance(); val udf = 
>> instance.asInstanceOf[ScalarFunction]
>>             fsTableEnv.createTemporaryFunction("myFunction", udf)
>>
>>             //creating Table... fsTableEnv.executeSql("SELECT a, 
>> myFunction(a) FROM myTopic")
>>
>>             def execute(): Unit = fsEnv.execute()
>>
>>             myFunction.java
>>
>>             import org.apache.flink.table.functions.ScalarFunction; public 
>> class myFunction extends ScalarFunction {
>>
>>                 public String eval(String s) {
>>                     return "myFunction - " + s;     }
>>
>>             }
>>
>>             Execution works as follows: A QueryCommand instance is
>>             created, some properties are being set, /execute()/ will
>>             be invoked
>>
>>             Let me know if any other relevant information is missing,
>>             alternatively you can also have a look at the source code
>>             here (https://github.com/codefeedr/kafkaquery).
>>
>>             Kind regards,
>>
>>             Jakub
>>
>>
>>             
>> ------------------------------------------------------------------------
>>             *Von:* Guowei Ma <guowei....@gmail.com
>>             <mailto:guowei....@gmail.com>>
>>             *Gesendet:* Montag, 7. Dezember 2020 02:55
>>             *An:* Jakub N <jakub1...@hotmail.de
>>             <mailto:jakub1...@hotmail.de>>
>>             *Cc:* user@flink.apache.org
>>             <mailto:user@flink.apache.org> <user@flink.apache.org
>>             <mailto:user@flink.apache.org>>
>>             *Betreff:* Re: Flink UDF registration from jar at runtime
>>              
>>             Hi, Jakub
>>             In theory there should not be any problem because you
>>             could register the function object.
>>             So would you like to share your code and the shell
>>             command that you submit your job? 
>>             Best,
>>             Guowei
>>
>>
>>             On Mon, Dec 7, 2020 at 3:19 AM Jakub N
>>             <jakub1...@hotmail.de <mailto:jakub1...@hotmail.de>> wrote:
>>
>>                 The current setup is: Data in Kafka -> Kafka
>>                 Connector -> StreamTableEnvironment -> execute Flink
>>                 SQL queries
>>
>>                 I would like to register Flink's User-defined
>>                 Functions from a jar or java class file during
>>                 runtime. What I have tried so far is using Java's
>>                 Classloader getting an instance of a ScalarFunction
>>                 (UDF) and registering it in the
>>                 StreamTableEnvironment. When I try executing a query
>>                 making use of the UDF I get the following exception:
>>
>>
>>                         Exception in thread "main"
>>                         java.lang.ClassNotFoundException: myFunction
>>
>>                         at
>>                         
>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>>
>>                         at
>>                         
>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>>
>>                         at
>>                         
>> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>
>>                         at java.base/java.lang.Class.forName0(Native
>>                         Method)
>>
>>                         at
>>                         java.base/java.lang.Class.forName(Class.java:398)
>>
>>                         at
>>                         
>> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>>
>>                         at
>>                         
>> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
>>
>>                         at
>>                         
>> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
>>
>>                         at
>>                         
>> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
>>
>>                         at
>>                         
>> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
>>
>>                         at
>>                         
>> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
>>
>>                         at
>>                         
>> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>>
>>                         at
>>                         
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>>
>>                         at
>>                         
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>>
>>                         at
>>                         
>> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>>
>>                         at
>>                         
>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
>>
>>                         at
>>                         
>> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
>>
>>                         at
>>                         
>> org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
>>
>>                         at
>>                         
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
>>
>>                         at
>>                         
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
>>
>>                         at
>>                         
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
>>
>>                         at
>>                         
>> org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
>>
>>                         at
>>                         
>> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
>>
>>                         at
>>                         
>> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
>>
>>                         at
>>                         
>> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>>
>>                         at
>>                         
>> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>>
>>                         at
>>                         
>> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>>
>>                         at
>>                         
>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>>
>>                         at
>>                         
>> scala.collection.TraversableLike.map(TraversableLike.scala:285)
>>
>>                         at
>>                         
>> scala.collection.TraversableLike.map$(TraversableLike.scala:278)
>>
>>                         at
>>                         
>> scala.collection.AbstractTraversable.map(Traversable.scala:108)
>>                         ...
>>
>>
>>                 I have verified that the generated instance of the
>>                 UDF behaves as expected when invoking any of its methods.
>>
>>                 Do you have any ideas on why this is failing?
>>

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to