Hey, I am not sure exactly what is going wrong in your case, but I put together an example to show you how I would do it:
@Test public void testClassloader() throws IOException, ClassNotFoundException { URLClassLoader functionClassloader = ClassLoaderUtils.compileAndLoadJava( folder.newFolder(), "BoolToInt.java", "" + "import org.apache.flink.table.functions.ScalarFunction;" + "\n" + "public class BoolToInt extends ScalarFunction {\n" + "\tpublic int eval(boolean b) {\n" + "\t\treturn b ? 1 : 0;\n" + "\t}\n" + "}" ); TableEnvironment tEnv = TableEnvironment.create(EnvironmentSettings .newInstance() .useBlinkPlanner() .build()); Class<ScalarFunction> boolToInt = (Class<ScalarFunction>) functionClassloader.loadClass("BoolToInt"); try (TemporaryClassLoaderContext ignored = TemporaryClassLoaderContext.of(functionClassloader)) { tEnv.createFunction("BoolToInt", boolToInt); TableResult tableResult = tEnv.executeSql("SELECT BoolToInt(TRUE)"); tableResult.print(); } } I verified this runs on the current master. The ClassLoaderUtils is a Flink utility which writes out the provided code and loads it into a classloader. As far as I can tell it mimics your situation pretty well. Best, Dawid On 10/12/2020 20:16, Jakub N wrote: > Hi Dawid, > > According to your suggestion, given that a I spawn a LocalEnvironment, > I tried the following: > > val root = new File("custom") > val classLoader: URLClassLoader = new > URLClassLoader(Array[URL](root.toURI.toURL), > Thread.currentThread().getContextClassLoader) > val cls = classLoader.loadClass("myFunction") > val instance = cls.newInstance(); val udf = > instance.asInstanceOf[ScalarFunction] > > val ignored = TemporaryClassLoaderContext.of(classLoader) > try { > fsTableEnv.createTemporaryFunction("myFunction", udf) > fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic") > } > unfortunately this still results in a ClassNotFoundException when > executing the environment. (The class is located outside of the > classpath and is loaded succesfully, instances of it behave as expected) > Did I possibly missunderstand what you were proposing? > > Kind regards, > > Jakub > > > > ------------------------------------------------------------------------ > *Von:* Dawid Wysakowicz > *Gesendet:* Donnerstag, 10. Dezember 2020 09:59 > *Bis:* Guowei Ma; Jakub N > *Cc:* user@flink.apache.org > *Betreff:* Re: Flink UDF registration from jar at runtime > > Hi Jakub, > > As Guowei said the UDF must be present in the user classloader. It > must be there when compiling the program and when executing on the > cluster. As of now the TableEnvironment uses the Thread context > classloader as the "user classloader" when compiling the query. > Therefore you can do the trick via: > > |ClassLoader yourClassloader = ... // create your classloader with the > UDF| > > |try (TemporaryClassLoaderContext ignored = > TemporaryClassLoaderContext.of(|||yourClassloader|)) {| > > | fsTableEnv.createTemporaryFunction("myFunction", udf)|| > || > || fsTableEnv.executeSql("SELECT a, myFunction(a) FROM myTopic")| > > |} > | > > Take a look at the TemporaryClassLoaderContext[1] for a nice way how > to do it with a cleanup at the end. > > To solve the second problem of having the UDF on the classpath when > executing. If you are just spawning a LocalEnvironment the above > should do the trick as it will use the context classloader. If you are > submitting to a cluster, you can submit multiple jars as part of a > single job either via the RemoteEnvironment or the flink run command. > > That's how we submit UDFs from separate jars in the sql-client. You > can try to go through a few classes there and see how it is done. I am > afraid it's not the easiest task as there are quite a few classes to > navigate through. You could start from e.g. > org.apache.flink.table.client.gateway.local.LocalExecutor#executeSql[2] > > [1] > https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/util/TemporaryClassLoaderContext.java > > [2] > https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java#L305 > > > > On 10/12/2020 09:15, Guowei Ma wrote: >> Hi, Jakub >> If I understand correctly you want the job, which you submitted, >> could load some table function which does not in the job jar. >> I don't think Flink could support this natively.(Maybe other guys know). >> But I think this requirement is like some code generated. You need to >> submit the "code" to the job. I think you could refer to the [1]. >> >> [1] >> https://github.com/apache/flink/blob/0a6e457e6b2bff9acc25e45c3083fc12a95fd717/flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/generated/GeneratedClass.java#L29 >> >> Best, >> Guowei >> >> >> On Tue, Dec 8, 2020 at 8:40 PM Jakub N <jakub1...@hotmail.de >> <mailto:jakub1...@hotmail.de>> wrote: >> >> Hi Guowei, >> >> 1. Unfortunately the UDF and the job are not in the same fatjar. >> Essentially there is only one "fatjar" containing the Flink >> environment + the job, the UDF is separate. >> 2. Yes, that is correct. >> 3. As explained in 1. I don't submit job jars to the Flink >> environment, instead the job is created and submitted within >> the "fatjar" >> >> >> Codewise nothing changed except for where the location of the UDF >> was specified. >> "Submitting to the environment" works as follows: >> >> 1. Create a StreamExecutionEnvironment -> StreamTableEnvironment >> 2. (Register UDF's) >> 3. Create tables >> 4. Query on the tables >> 5. Execute the environment >> >> The overall process is executed as one program. >> Apologies if any of these explanations are unclear or too vague. >> >> Kind regards, >> >> Jakub >> >> ------------------------------------------------------------------------ >> *Von:* Guowei Ma <guowei....@gmail.com <mailto:guowei....@gmail.com>> >> *Gesendet:* Dienstag, 8. Dezember 2020 06:34 >> *An:* Jakub N <jakub1...@hotmail.de <mailto:jakub1...@hotmail.de>> >> *Cc:* user@flink.apache.org <mailto:user@flink.apache.org> >> <user@flink.apache.org <mailto:user@flink.apache.org>> >> *Betreff:* Re: Flink UDF registration from jar at runtime >> >> Hi, Jakub >> I am not familiar with the `sbt pack`. But I assume you are doing >> following (correct me if I misunderstand you) >> 1. The UDF and Job jar are in the same "fatjar" >> 2. You "new" a UDF object in the job(). >> 3. You submit the "fatjar" to the local Flink environment. >> >> In theory there should not be any problem. Could share how you >> change the code and how you submit your job to the local environment. >> >> Best, >> Guowei >> >> >> On Tue, Dec 8, 2020 at 2:53 AM Jakub N <jakub1...@hotmail.de >> <mailto:jakub1...@hotmail.de>> wrote: >> >> Hi Guowei, >> >> It turned out for my application I unfortunately can't have >> the UDF in the "job's" classpath. As I am using a local >> Flink environment and `sbt pack` (similar to a fatjar) to >> create launch scripts therefore, to my understanding, I can't >> access the classpath (when the project is packed). >> Are there any ways to add these UDF's from outside the classpath? >> >> Kind regards, >> >> Jakub >> >> >> ------------------------------------------------------------------------ >> *Von:* Jakub N <jakub1...@hotmail.de >> <mailto:jakub1...@hotmail.de>> >> *Gesendet:* Montag, 7. Dezember 2020 12:59 >> *An:* Guowei Ma <guowei....@gmail.com >> <mailto:guowei....@gmail.com>> >> *Cc:* user@flink.apache.org <mailto:user@flink.apache.org> >> <user@flink.apache.org <mailto:user@flink.apache.org>> >> *Betreff:* Re: Flink UDF registration from jar at runtime >> >> Hi Guowei, >> >> Great thanks for your help. Your suggestion indeed solved the >> issue. I moved `myFunction` to the class path where execution >> starts. >> >> Kind regards, >> >> Jakub >> >> >> ------------------------------------------------------------------------ >> *Von:* Guowei Ma <guowei....@gmail.com >> <mailto:guowei....@gmail.com>> >> *Gesendet:* Montag, 7. Dezember 2020 12:16 >> *An:* Jakub N <jakub1...@hotmail.de >> <mailto:jakub1...@hotmail.de>> >> *Cc:* user@flink.apache.org <mailto:user@flink.apache.org> >> <user@flink.apache.org <mailto:user@flink.apache.org>> >> *Betreff:* Re: Flink UDF registration from jar at runtime >> >> Hi Jakub, >> I think the problem is that the `cls`, which you load at >> runtime, is not in the thread context classloader. >> Flink deserializes the `myFunction` object with the context >> classloader. >> So maybe you could put the myFunction in the job's class path. >> Best, >> Guowei >> >> >> On Mon, Dec 7, 2020 at 5:54 PM Jakub N <jakub1...@hotmail.de >> <mailto:jakub1...@hotmail.de>> wrote: >> >> Hi Guowei, >> >> Thanks for your help, >> here is the relevant code (QueryCommand class): >> >> val fsSettings: EnvironmentSettings = EnvironmentSettings >> .newInstance() >> .useBlinkPlanner() >> .inStreamingMode() >> .build() >> >> val fsEnv: StreamExecutionEnvironment = >> StreamExecutionEnvironment.getExecutionEnvironment >> fsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) >> fsEnv.getConfig.enableObjectReuse() >> >> val fsTableEnv: StreamTableEnvironment = >> StreamTableEnvironment.create(fsEnv, fsSettings) >> >> val compiler: JavaCompiler = ToolProvider.getSystemJavaCompiler >> compiler.run(null, null, null, new >> File("target/custom/myFunction.java").getPath) >> >> val root = new File("target/custom") >> val classLoader: URLClassLoader = new >> URLClassLoader(Array[URL](root.toURI.toURL), this.getClass.getClassLoader) >> val cls = classLoader.loadClass("myFunction") >> val instance = cls.newInstance(); val udf = >> instance.asInstanceOf[ScalarFunction] >> fsTableEnv.createTemporaryFunction("myFunction", udf) >> >> //creating Table... fsTableEnv.executeSql("SELECT a, >> myFunction(a) FROM myTopic") >> >> def execute(): Unit = fsEnv.execute() >> >> myFunction.java >> >> import org.apache.flink.table.functions.ScalarFunction; public >> class myFunction extends ScalarFunction { >> >> public String eval(String s) { >> return "myFunction - " + s; } >> >> } >> >> Execution works as follows: A QueryCommand instance is >> created, some properties are being set, /execute()/ will >> be invoked >> >> Let me know if any other relevant information is missing, >> alternatively you can also have a look at the source code >> here (https://github.com/codefeedr/kafkaquery). >> >> Kind regards, >> >> Jakub >> >> >> >> ------------------------------------------------------------------------ >> *Von:* Guowei Ma <guowei....@gmail.com >> <mailto:guowei....@gmail.com>> >> *Gesendet:* Montag, 7. Dezember 2020 02:55 >> *An:* Jakub N <jakub1...@hotmail.de >> <mailto:jakub1...@hotmail.de>> >> *Cc:* user@flink.apache.org >> <mailto:user@flink.apache.org> <user@flink.apache.org >> <mailto:user@flink.apache.org>> >> *Betreff:* Re: Flink UDF registration from jar at runtime >> >> Hi, Jakub >> In theory there should not be any problem because you >> could register the function object. >> So would you like to share your code and the shell >> command that you submit your job? >> Best, >> Guowei >> >> >> On Mon, Dec 7, 2020 at 3:19 AM Jakub N >> <jakub1...@hotmail.de <mailto:jakub1...@hotmail.de>> wrote: >> >> The current setup is: Data in Kafka -> Kafka >> Connector -> StreamTableEnvironment -> execute Flink >> SQL queries >> >> I would like to register Flink's User-defined >> Functions from a jar or java class file during >> runtime. What I have tried so far is using Java's >> Classloader getting an instance of a ScalarFunction >> (UDF) and registering it in the >> StreamTableEnvironment. When I try executing a query >> making use of the UDF I get the following exception: >> >> >> Exception in thread "main" >> java.lang.ClassNotFoundException: myFunction >> >> at >> >> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) >> >> at >> >> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) >> >> at >> >> java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522) >> >> at java.base/java.lang.Class.forName0(Native >> Method) >> >> at >> java.base/java.lang.Class.forName(Class.java:398) >> >> at >> >> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) >> >> at >> >> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943) >> >> at >> >> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829) >> >> at >> >> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117) >> >> at >> >> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646) >> >> at >> >> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464) >> >> at >> >> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422) >> >> at >> >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576) >> >> at >> >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562) >> >> at >> >> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550) >> >> at >> >> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626) >> >> at >> >> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648) >> >> at >> >> org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55) >> >> at >> >> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784) >> >> at >> >> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493) >> >> at >> >> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53) >> >> at >> >> org.apache.calcite.rex.RexCall.accept(RexCall.java:288) >> >> at >> >> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133) >> >> at >> >> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152) >> >> at >> >> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285) >> >> at >> >> scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) >> >> at >> >> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) >> >> at >> >> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) >> >> at >> >> scala.collection.TraversableLike.map(TraversableLike.scala:285) >> >> at >> >> scala.collection.TraversableLike.map$(TraversableLike.scala:278) >> >> at >> >> scala.collection.AbstractTraversable.map(Traversable.scala:108) >> ... >> >> >> I have verified that the generated instance of the >> UDF behaves as expected when invoking any of its methods. >> >> Do you have any ideas on why this is failing? >>
signature.asc
Description: OpenPGP digital signature