Hi, Jakub
In theory there should not be any problem because you could register the
function object.
So would you like to share your code and the shell command that you submit
your job?
Best,
Guowei


On Mon, Dec 7, 2020 at 3:19 AM Jakub N <jakub1...@hotmail.de> wrote:

> The current setup is: Data in Kafka -> Kafka Connector ->
> StreamTableEnvironment -> execute Flink SQL queries
>
> I would like to register Flink's User-defined Functions from a jar or java
> class file during runtime. What I have tried so far is using Java's
> Classloader getting an instance of a ScalarFunction (UDF) and registering
> it in the StreamTableEnvironment. When I try executing a query making use
> of the UDF I get the following exception:
>
>
> Exception in thread "main" java.lang.ClassNotFoundException: myFunction
>
> at
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>
> at
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>
> at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>
> at java.base/java.lang.Class.forName0(Native Method)
>
> at java.base/java.lang.Class.forName(Class.java:398)
>
> at
> org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
>
> at
> java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
>
> at
> java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
>
> at
> java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
>
> at
> java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
>
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
>
> at
> java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
>
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
>
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
>
> at
> org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
>
> at
> org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
>
> at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
>
> at
> org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
>
> at
> org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
>
> at
> scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
>
> at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
>
> at
> scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
>
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
>
> at scala.collection.TraversableLike.map(TraversableLike.scala:285)
>
> at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
>
> at scala.collection.AbstractTraversable.map(Traversable.scala:108)
> ...
>
>
> I have verified that the generated instance of the UDF behaves as expected
> when invoking any of its methods.
>
> Do you have any ideas on why this is failing?
>

Reply via email to