Hi Cody, actually, there is no functionality to add a UDF in the Table API / SQL. This feature is definitely on the roadmap but not implemented, yet. The related JIRA is FLINK-3097.
In fact, the FunctionCatalog and FrameworkConfig should not be publicly accessible from the TableEnvironment, IMO. Best, Fabian 2016-06-22 14:58 GMT+02:00 Cody Innowhere <e.neve...@gmail.com>: > Hi guys, > I'm trying to add a UDF in Flink table API, say, in DataSet table API. > My example code is as follows: > --------------------------- > object WordCountTable { > case class WC(word: String, num: Int) > > def main(args: Array[String]): Unit = { > > // set up execution environment > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env) > > val input = env.fromElements(WC("hello", 1), WC("hello", 2), WC("ciao", > 3)) > val table = input.toTable(tEnv) > > tEnv.registerTable("WC", table) > > //val funcCatalog = tblEnv.getFunctionCatalog > //funcCatalog.registerFunction("MyAdd1", classOf[MyAdds.MyAdd1]) > > val schema = tEnv.getFrameworkConfig.getDefaultSchema > schema.add("MyAdd1", ScalarFunctionImpl.create(classOf[MyAdds.MyAdd1], > "eval")) > > tEnv.sql("SELECT word, MyAdd1(num) as num FROM WC WHERE num > > 1").toDataSet[WC].print() > } > } > > And MyAdds looks like this: > --------------------------- > public class MyAdds { > public static class MyAdd1 { > public int eval(int x) { > return x + 1; > } > } > } > > But when running, it gives the error: > Exception in thread "main" org.apache.calcite.tools.ValidationException: > org.apache.calcite.runtime.CalciteContextException: From line 1, column 14 > to line 1, column 24: No match found for function signature > MyAdd1(<NUMERIC>) > at > > org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:99) > at > > org.apache.flink.api.table.BatchTableEnvironment.sql(BatchTableEnvironment.scala:130) > at > > org.apache.flink.examples.scala.WordCountTable$.main(WordCountTable.scala:54) > at > org.apache.flink.examples.scala.WordCountTable.main(WordCountTable.scala) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, > column 14 to line 1, column 24: No match found for function signature > MyAdd1(<NUMERIC>) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:765) > at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:753) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:3929) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1544) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:278) > at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:222) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445) > at org.apache.calcite.sql.SqlAsOperator.deriveType(SqlAsOperator.java:132) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4266) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:4253) > at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:135) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1462) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1445) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:439) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:3428) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:2966) > at > > org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) > at > > org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:86) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:845) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:831) > at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:208) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:807) > at > > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:523) > at > > org.apache.flink.api.table.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:95) > ... 8 more > Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No match > found for function signature MyAdd1(<NUMERIC>) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) > at > > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:526) > at > org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:405) > at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:514) > ... 36 more > > > I'm not sure if it's the right way to add UDF in table API, or am I missing > something? Thanks~ >