Hi Pierre, Those 2 approaches all work in my local machine, this is my code:
Scala UDF: package com.dummy import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.table.annotation.DataTypeHint import org.apache.flink.table.api.Types import org.apache.flink.table.functions.ScalarFunction import org.apache.flink.types.Row /** * The scala UDF. */ class dummyMap extends ScalarFunction { // If the udf would be registered by the SQL statement, you need add this typehint @DataTypeHint("ROW<s STRING,t STRING>") def eval(): Row = { Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar")) } // If the udf would be registered by the method 'register_java_function', you need override this // method. override def getResultType(signature: Array[Class[_]]): TypeInformation[_] = { // The type of the return values should be TypeInformation Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), Types.STRING())) } } Python code: from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment s_env = StreamExecutionEnvironment.get_execution_environment() st_env = StreamTableEnvironment.create(s_env) # load the scala udf jar file, the path should be modified to yours # or your can also load the jar file via other approaches st_env.get_config().get_configuration().set_string("pipeline.jars", "file:///Users/zhongwei/the-dummy-udf.jar") # register the udf via st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' LANGUAGE SCALA") # or register via the method # st_env.register_java_function("dummyMap", "com.dummy.dummyMap") # prepare source and sink t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', 'b', 'c']) st_env.execute_sql("""create table mySink ( output_of_my_scala_udf ROW<s STRING,t STRING> ) with ( 'connector' = 'print' )""") # execute query t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result() Best, Wei > 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: > > Hi Wei, > > True, I'm using the method you mention, but glad to change. > I tried your suggestion instead, but got a similar error. > > Thanks for your support. That is much more tedious than I thought. > > Option 1 - SQL UDF > > SQL UDF > create_func_ddl = """ > CREATE FUNCTION dummyMap > AS 'com.dummy.dummyMap' LANGUAGE SCALA > """ > > t_env.execute_sql(create_func_ddl) > > Error > Py4JJavaError: An error occurred while calling o672.execute. > : org.apache.flink.table.api.TableException: Result field does not match > requested type. Requested: Row(s: String, t: String); Actual: > GenericType<org.apache.flink.types.Row> > > Option 2 - Overriding getResultType > > Back to the old registering method, but overriding getResultType: > > t_env.register_java_function("dummyMap","com.dummy.dummyMap") > > Scala UDF > class dummyMap() extends ScalarFunction { > > def eval(): Row = { > > Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar")) > > } > > override def getResultType(signature: Array[Class[_]]): TypeInformation[_] > = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) > } > > Error (on compilation) > > [error] dummyMap.scala:66:90: overloaded method value ROW with alternatives: > [error] (x$1: > org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType > <and> > [error] ()org.apache.flink.table.types.DataType <and> > [error] (x$1: > org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType > [error] cannot be applied to (org.apache.flink.table.types.DataType, > org.apache.flink.table.types.DataType) > [error] override def getResultType(signature: Array[Class[_]]): > TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) > [error] > ^ > [error] one error found > [error] (Compile / compileIncremental) Compilation failed > [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01 > > Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com > <mailto:weizhong0...@gmail.com>> a écrit : > Hi Pierre, > > I guess your UDF is registered by the method 'register_java_function' which > uses the old type system. In this situation you need to override the > 'getResultType' method instead of adding type hint. > > You can also try to register your UDF via the "CREATE FUNCTION" sql > statement, which accepts the type hint. > > Best, > Wei > >> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com >> <mailto:pierre.oberhol...@gmail.com>> 写道: >> >> Hi Wei, >> >> Thanks for your suggestion. Same error. >> >> Scala UDF >> >> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>")) >> class dummyMap() extends ScalarFunction { >> def eval(): Row = { >> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar")) >> } >> } >> >> Best regards, >> >> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com >> <mailto:weizhong0...@gmail.com>> a écrit : >> Hi Pierre, >> >> You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with >> '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))' >> >> Best, >> Wei >> >>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com >>> <mailto:pierre.oberhol...@gmail.com>> 写道: >>> >>> Hi Dian, Community, >>> >>> (bringing the thread back to wider audience) >>> >>> As you suggested, I've tried to use DataTypeHint with Row instead of Map >>> but also this simple case leads to a type mismatch between UDF and Table >>> API. >>> I've also tried other Map objects from Flink (table.data.MapData, >>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java >>> (java.util.Map) in combination with DataTypeHint, without success. >>> N.B. I'm using version 1.11. >>> >>> Am I doing something wrong or am I facing limitations in the toolkit ? >>> >>> Thanks in advance for your support ! >>> >>> Best regards, >>> >>> Scala UDF >>> >>> class dummyMap() extends ScalarFunction { >>> >>> @DataTypeHint("ROW<s STRING,t STRING>") >>> def eval(): Row = { >>> >>> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar")) >>> >>> } >>> } >>> >>> Table DDL >>> >>> my_sink_ddl = f""" >>> create table mySink ( >>> output_of_my_scala_udf ROW<s STRING,t STRING> >>> ) with ( >>> ... >>> ) >>> """ >>> >>> Error >>> >>> Py4JJavaError: An error occurred while calling o2.execute. >>> : org.apache.flink.table.api.ValidationException: Field types of query >>> result and registered TableSink >>> `default_catalog`.`default_database`.`mySink` do not match. >>> Query result schema: [output_of_my_scala_udf: >>> GenericType<org.apache.flink.types.Row>] >>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: String)] >>> >>> >>> >>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer >>> <pierre.oberhol...@gmail.com <mailto:pierre.oberhol...@gmail.com>> a écrit : >>> Thanks Dian, but same error when using explicit returned type: >>> >>> class dummyMap() extends ScalarFunction { >>> >>> def eval() : util.Map[java.lang.String,java.lang.String] = { >>> >>> val states = Map("key1" -> "val1", "key2" -> "val2") >>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] >>> >>> } >>> } >>> >>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com >>> <mailto:dian0511...@gmail.com>> a écrit : >>> You need to explicitly defined the result type the UDF. You could refer to >>> [1] for more details if you are using Flink 1.11. If you are using other >>> versions of Flink, you need to refer to the corresponding documentation. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >>> >>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide> >>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com >>>> <mailto:pierre.oberhol...@gmail.com>> 写道: >>>> >>>> ScalarFunction >>> >>> >>> >>> -- >>> Pierre >>> >>> -- >>> Pierre >> >> >> >> -- >> Pierre > > > > -- > Pierre