Hey Rui, For geospatial udfs, I've configured these jars to my flink deployment:
# Flink-Hive RUN wget -q -O /opt/flink/lib/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.12.2/flink-sql-connector-hive-3.1.2_2.12-1.12.2.jar \ && wget -q -O /opt/flink/lib/hive-exec-3.1.2.jar https://repo1.maven.org/maven2/org/apache/hive/hive-exec/3.1.2/hive-exec-3.1.2.jar \ && wget -q -O /opt/flink/lib/libfb303-0.9.3.jar http://databus.dbpedia.org:8081/repository/internal/org/apache/thrift/libfb303/0.9.3/libfb303-0.9.3.jar # Hive geospatial udf, https://github.com/Esri/spatial-framework-for-hadoop RUN wget -q -O /opt/flink/lib/spatial-sdk-hive.jar https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-hive-2.2.0.jar \ && wget -q -O /opt/flink/lib/spatial-sdk-json.jar https://github.com/Esri/spatial-framework-for-hadoop/releases/download/v2.2.0/spatial-sdk-json-2.2.0.jar \ && wget -q -O /opt/flink/lib/esri-geometry-api.jar https://repo1.maven.org/maven2/com/esri/geometry/esri-geometry-api/2.2.4/esri-geometry-api-2.2.4.jar As I mentioned above, I did not register the functions explicitly because the 'CREATE FUNCTION ...' statement did not work for me. If I run this statement, e.g., "CREATE FUNCTION ST_GeomFromText AS 'com.esri.hadoop.hive.ST_GeomFromText'" : org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: org.apache.flink.table.api.ValidationException: Function flink_gaia.ST_GeomFromText already exists in Catalog flink-hive. at org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at com.skt.chiron.FlinkApp.main(FlinkApp.java:58) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) ... 11 more Thanks, Youngwoo On Wed, Apr 28, 2021 at 3:05 PM Rui Li <lirui.fu...@gmail.com> wrote: > Hi Youngwoo, > > Could you please share the function jar and DDL you used to create the > function? I can try reproducing this issue locally. > > On Wed, Apr 28, 2021 at 1:33 PM Youngwoo Kim (김영우) <yw...@apache.org> > wrote: > >> Thanks Shengkai and Rui for looking into this. >> >> A snippet from my app. looks like following: >> >> HiveCatalog hive = *new* HiveCatalog("flink-hive", "default", >> "/tmp/hive"); >> >> tableEnv.registerCatalog("flink-hive", hive); >> >> >> tableEnv.useCatalog("flink-hive"); >> >> tableEnv.loadModule("flink-hive", *new* HiveModule("3.1.2")); >> >> >> tableEnv.getConfig().setSqlDialect(SqlDialect.*HIVE*); >> >> >> tableEnv.executeSql("CREATE DATABASE IF NOT EXISTS flink_gaia"); >> >> tableEnv.executeSql("USE flink_gaia"); >> >> tableEnv.executeSql("SHOW CURRENT CATALOG").print(); >> >> tableEnv.executeSql("SHOW CURRENT DATABASE").print(); >> >> tableEnv.executeSql("SHOW TABLES").print(); >> >> tableEnv.executeSql("SHOW FUNCTIONS").print(); >> >> >> >> // Test Hive UDF >> >> tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))"); >> >> >> And I got the following output and exception: >> >> >> +----------------------+ >> >> | current catalog name | >> >> +----------------------+ >> >> | flink-hive | >> >> +----------------------+ >> >> 1 row in set >> >> +-----------------------+ >> >> | current database name | >> >> +-----------------------+ >> >> | flink_gaia | >> >> +-----------------------+ >> >> 1 row in set >> >> +----------------------+ >> >> | table name | >> >> +----------------------+ >> >> | geofence | >> >> | lcap | >> >> | lcap_temporal_fenced | >> >> +----------------------+ >> >> >> +--------------------------------+ >> >> | function name | >> >> +--------------------------------+ >> >> | regr_sxy | >> >> ...... >> >> >> 380 rows in set >> >> >> (snip) >> >> >> org.apache.flink.client.program.ProgramInvocationException: The main >> method caused an error: SQL validation failed. From line 1, column 18 to >> line 1, column 31: No match found for function signature >> ST_Point(<NUMERIC>, <NUMERIC>) >> >> at >> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) >> >> at >> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) >> >> at >> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >> >> at >> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) >> >> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) >> >> at >> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) >> >> at >> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) >> >> at java.security.AccessController.doPrivileged(Native Method) >> >> at javax.security.auth.Subject.doAs(Subject.java:422) >> >> at >> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) >> >> at >> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >> >> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) >> >> >> >> >> Thanks, >> >> Youngwoo >> >> >> >> On Wed, Apr 28, 2021 at 1:44 PM Rui Li <lirui.fu...@gmail.com> wrote: >> >>> Hi Youngwoo, >>> >>> The catalog function is associated with a catalog and DB. Assuming you >>> have created the function ST_Point in your metastore, could you verify >>> whether the current catalog is your HiveCatalog and the current database is >>> the database in which ST_Point is registered? >>> >>> On Wed, Apr 28, 2021 at 12:24 PM Shengkai Fang <fskm...@gmail.com> >>> wrote: >>> >>>> Hi. >>>> >>>> The order of the module may influence the load of the function. >>>> >>>> [1] https://issues.apache.org/jira/browse/FLINK-22383 >>>> >>>> Youngwoo Kim (김영우) <yw...@apache.org> 于2021年4月28日周三 上午10:50写道: >>>> >>>>> Hi, >>>>> >>>>> I've configured Hive metastore to use HiveCatalog in streaming >>>>> application. So far, most of the features are working fine in hive >>>>> integration. >>>>> >>>>> However, I have a problem in using Hive UDFs. Already done >>>>> prerequisites to use Hive geospatial UDFs[1] >>>>> >>>>> To sanity check, I did run a query like below: >>>>> >>>>> tableEnv.executeSql("SELECT ST_AsText(ST_Point(1, 2))"); >>>>> >>>>> >>>>> Got an exception like this: >>>>> >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error: SQL validation failed. From line 1, column 18 to >>>>> line 1, column 63: No match found for function signature >>>>> ST_Point(<NUMERIC>, <NUMERIC>) >>>>> >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) >>>>> >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) >>>>> >>>>> at >>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >>>>> >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) >>>>> >>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) >>>>> >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) >>>>> >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) >>>>> >>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>> >>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>> >>>>> at >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) >>>>> >>>>> at >>>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>>> >>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) >>>>> >>>>> Caused by: org.apache.flink.table.api.ValidationException: SQL >>>>> validation failed. From line 1, column 18 to line 1, column 63: No match >>>>> found for function signature ST_Point(<NUMERIC>, <NUMERIC>) >>>>> >>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) >>>>> >>>>> at >>>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) >>>>> >>>>> at >>>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:189) >>>>> >>>>> at >>>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:77) >>>>> >>>>> at >>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:660) >>>>> >>>>> at com.skt.chiron.FlinkApp.main(FlinkApp.java:67) >>>>> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) >>>>> >>>>> ... 11 more >>>>> >>>>> Caused by: org.apache.calcite.runtime.CalciteContextException: From >>>>> line 1, column 18 to line 1, column 63: No match found for function >>>>> signature ST_Point(<NUMERIC>, <NUMERIC>) >>>>> >>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>> Method) >>>>> >>>>> at >>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >>>>> >>>>> at >>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>> >>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>>>> >>>>> at >>>>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) >>>>> >>>>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:883) >>>>> >>>>> at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:868) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5043) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.handleUnresolvedFunction(SqlValidatorImpl.java:1838) >>>>> >>>>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:321) >>>>> >>>>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869) >>>>> >>>>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741) >>>>> >>>>> at >>>>> org.apache.calcite.sql.SqlOperator.constructArgTypeList(SqlOperator.java:606) >>>>> >>>>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:244) >>>>> >>>>> at org.apache.calcite.sql.SqlFunction.deriveType(SqlFunction.java:226) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5882) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl$DeriveTypeVisitor.visit(SqlValidatorImpl.java:5869) >>>>> >>>>> at org.apache.calcite.sql.SqlCall.accept(SqlCall.java:139) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveTypeImpl(SqlValidatorImpl.java:1756) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.deriveType(SqlValidatorImpl.java:1741) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.expandSelectItem(SqlValidatorImpl.java:440) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelectList(SqlValidatorImpl.java:4205) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3474) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1067) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1041) >>>>> >>>>> at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1016) >>>>> >>>>> at >>>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:724) >>>>> >>>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:147) >>>>> >>>>> ... 21 more >>>>> >>>>> Caused by: org.apache.calcite.sql.validate.SqlValidatorException: No >>>>> match found for function signature ST_Point(<NUMERIC>, <NUMERIC>) >>>>> >>>>> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native >>>>> Method) >>>>> >>>>> at >>>>> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) >>>>> >>>>> at >>>>> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) >>>>> >>>>> at java.lang.reflect.Constructor.newInstance(Constructor.java:423) >>>>> >>>>> at >>>>> org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:467) >>>>> >>>>> at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:560) >>>>> >>>>> ... 51 more >>>>> >>>>> >>>>> (snip) >>>>> >>>>> >>>>> And also, there are no such functions from 'SHOW FUNCTIONS': >>>>> >>>>> tableEnv.executeSql("SHOW FUNCTIONS").print(); >>>>> >>>>> >>>>> ...... >>>>> >>>>> (snip) >>>>> >>>>> >>>>> >>>>> Registering the functions explicitly does not work for me: >>>>> >>>>> >>>>> org.apache.flink.client.program.ProgramInvocationException: The main >>>>> method caused an error: Function flink_gaia.ST_GeomFromText already exists >>>>> in Catalog flink-hive. >>>>> >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:366) >>>>> >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:219) >>>>> >>>>> at >>>>> org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) >>>>> >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) >>>>> >>>>> at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) >>>>> >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) >>>>> >>>>> at >>>>> org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) >>>>> >>>>> at java.security.AccessController.doPrivileged(Native Method) >>>>> >>>>> at javax.security.auth.Subject.doAs(Subject.java:422) >>>>> >>>>> at >>>>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1682) >>>>> >>>>> at >>>>> org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) >>>>> >>>>> at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) >>>>> >>>>> Caused by: org.apache.flink.table.api.ValidationException: Function >>>>> flink_gaia.ST_GeomFromText already exists in Catalog flink-hive. >>>>> >>>>> at >>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.createCatalogFunction(TableEnvironmentImpl.java:1459) >>>>> >>>>> at >>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:1009) >>>>> >>>>> at >>>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) >>>>> >>>>> at com.skt.chiron.FlinkApp.main(FlinkApp.java:58) >>>>> >>>>> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >>>>> >>>>> at >>>>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) >>>>> >>>>> at >>>>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >>>>> >>>>> at java.lang.reflect.Method.invoke(Method.java:498) >>>>> >>>>> at >>>>> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:349) >>>>> >>>>> ... 11 more >>>>> >>>>> (snip) >>>>> >>>>> >>>>> >>>>> I hope to find out why the functions are missing. Flink(Ver. 1.12.2) >>>>> job cluster is running on Kubernetes cluster via flink operator and the >>>>> standalone metastore is running for only the Flink cluster without Hive >>>>> deployments. >>>>> >>>>> >>>>> Thanks, >>>>> >>>>> Youngwoo >>>>> >>>>> 1. https://github.com/Esri/spatial-framework-for-hadoop >>>>> >>>> >>> >>> -- >>> Best regards! >>> Rui Li >>> >> > > -- > Best regards! > Rui Li >