Hi Pierre, You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))'
Best, Wei > 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: > > Hi Dian, Community, > > (bringing the thread back to wider audience) > > As you suggested, I've tried to use DataTypeHint with Row instead of Map but > also this simple case leads to a type mismatch between UDF and Table API. > I've also tried other Map objects from Flink (table.data.MapData, > flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java > (java.util.Map) in combination with DataTypeHint, without success. > N.B. I'm using version 1.11. > > Am I doing something wrong or am I facing limitations in the toolkit ? > > Thanks in advance for your support ! > > Best regards, > > Scala UDF > > class dummyMap() extends ScalarFunction { > > @DataTypeHint("ROW<s STRING,t STRING>") > def eval(): Row = { > > Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar")) > > } > } > > Table DDL > > my_sink_ddl = f""" > create table mySink ( > output_of_my_scala_udf ROW<s STRING,t STRING> > ) with ( > ... > ) > """ > > Error > > Py4JJavaError: An error occurred while calling o2.execute. > : org.apache.flink.table.api.ValidationException: Field types of query result > and registered TableSink `default_catalog`.`default_database`.`mySink` do not > match. > Query result schema: [output_of_my_scala_udf: > GenericType<org.apache.flink.types.Row>] > TableSink schema: [output_of_my_scala_udf: Row(s: String, t: String)] > > > > Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <pierre.oberhol...@gmail.com > <mailto:pierre.oberhol...@gmail.com>> a écrit : > Thanks Dian, but same error when using explicit returned type: > > class dummyMap() extends ScalarFunction { > > def eval() : util.Map[java.lang.String,java.lang.String] = { > > val states = Map("key1" -> "val1", "key2" -> "val2") > states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] > > } > } > > Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com > <mailto:dian0511...@gmail.com>> a écrit : > You need to explicitly defined the result type the UDF. You could refer to > [1] for more details if you are using Flink 1.11. If you are using other > versions of Flink, you need to refer to the corresponding documentation. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide > > <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide> >> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com >> <mailto:pierre.oberhol...@gmail.com>> 写道: >> >> ScalarFunction > > > > -- > Pierre > > -- > Pierre