The current setup is: Data in Kafka -> Kafka Connector -> 
StreamTableEnvironment -> execute Flink SQL queries

I would like to register Flink's User-defined Functions from a jar or java 
class file during runtime. What I have tried so far is using Java's Classloader 
getting an instance of a ScalarFunction (UDF) and registering it in the 
StreamTableEnvironment. When I try executing a query making use of the UDF I 
get the following exception:


Exception in thread "main" java.lang.ClassNotFoundException: myFunction
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at 
org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at 
java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1943)
at 
java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1829)
at 
java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2117)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1646)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:464)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableObjectInternal(CodeGeneratorContext.scala:626)
at 
org.apache.flink.table.planner.codegen.CodeGeneratorContext.addReusableFunction(CodeGeneratorContext.scala:648)
at 
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:55)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:784)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:493)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:53)
at org.apache.calcite.rex.RexCall.accept(RexCall.java:288)
at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:133)
at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$.$anonfun$generateProcessCode$5(CalcCodeGenerator.scala:152)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:285)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at scala.collection.TraversableLike.map(TraversableLike.scala:285)
at scala.collection.TraversableLike.map$(TraversableLike.scala:278)
at scala.collection.AbstractTraversable.map(Traversable.scala:108)
...

I have verified that the generated instance of the UDF behaves as expected when 
invoking any of its methods.

Do you have any ideas on why this is failing?

Reply via email to