Hi Dian, I changed the udf to:
```python @udf( input_types=[DataTypes.BIGINT(), DataTypes.BIGINT()], result_type=DataTypes.BIGINT(), ) def add(i, j): return i + j ``` But I still get the same error. On Tue, May 18, 2021 at 5:47 PM Dian Fu <dian0511...@gmail.com> wrote: > Hi Yik San, > > The expected input types for add are DataTypes.INT, however, the schema of > aiinfra.mysource is: a bigint and b bigint. > > Regards, > Dian > > 2021年5月18日 下午5:38,Yik San Chan <evan.chanyik...@gmail.com> 写道: > > Hi, > > I have a PyFlink script that fails to use a simple UDF. The full script > can be found below: > > ```python > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import ( > DataTypes, > EnvironmentSettings, > SqlDialect, > StreamTableEnvironment, > ) > from pyflink.table.udf import udf > > > @udf( > input_types=[DataTypes.INT(), DataTypes.INT()], > result_type=DataTypes.BIGINT(), > ) > def add(i, j): > return i + j > > > TRANSFORM = """ > INSERT INTO aiinfra.mysink > SELECT ADD(a, b) > FROM aiinfra.mysource > """ > > CREATE_CATALOG = """ > CREATE CATALOG hive WITH ( > 'type' = 'hive', > 'hive-conf-dir' = '/data/software/hive-2.1.0/conf' > )""" > > USE_CATALOG = "USE CATALOG hive" > > > exec_env = StreamExecutionEnvironment.get_execution_environment() > env_settings = EnvironmentSettings.Builder().use_blink_planner().build() > t_env = StreamTableEnvironment.create( > stream_execution_environment=exec_env, environment_settings=env_settings > ) > > t_env.create_temporary_function("add", add) > > t_env.get_config().set_sql_dialect(SqlDialect.HIVE) > t_env.execute_sql(CREATE_CATALOG) > t_env.execute_sql(USE_CATALOG) > > t_env.get_config().set_sql_dialect(SqlDialect.DEFAULT) > t_result = t_env.execute_sql(TRANSFORM) > ``` > > However, when I submit the python file to my flink cluster, it throws > exception: > > ``` > [INFO] 2021-05-18 17:27:47.758 - > [taskAppId=TASK-90019-86729-380519]:[152] - -> Traceback (most recent call > last): > File "aiinfra/batch_example.py", line 50, in <module> > t_result = t_env.execute_sql(TRANSFORM) > File > "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/table/table_environment.py", > line 766, in execute_sql > File > "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", > line 1286, in __call__ > File > "/data/apache/flink/flink-1.12.0/opt/python/pyflink.zip/pyflink/util/exceptions.py", > line 147, in deco > File > "/data/apache/flink/flink-1.12.0/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py", > line 328, in get_return_value > py4j.protocol.Py4JJavaError: An error occurred while calling o4.executeSql. > : org.apache.flink.table.api.ValidationException: SQL validation failed. > From line 3, column 8 to line 3, column 16: No match found for function > signature ADD(<NUMERIC>, <NUMERIC>) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > <http://org.apache.flink.table.planner.calcite.flinkplannerimpl.org/> > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:111) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:193) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(SqlToOperationConverter.java:536) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:248) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:659) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) > at > org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) > at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) > at > org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) > at > org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) > at > org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) > at java.lang.Thread.run(Thread.java:748) > ``` > > Seems it has difficulties knowing the "add" function has already been > registered. Changing "ADD(a, b)" to "add(a, b)" doesn't help, therefore I > don't think it is a upper-or-lower case issue. > > Also, if I replace "ADD(a, b)" with the simple "a + b", the script > produces exactly what I need. > > Regarding aiinfra.mysource and aiinfra.mysink: aiinfra.mysource has 2 > columns, a bigint and b bigint. aiinfra.mysink has 1 column, c bigint. > > Any help? Thanks! > > Best, > Yik San > > >