Hi Wei, Thanks for the hint. May I please follow up by adding more context and ask for your guidance.
In case the bespoken Map[String,Any] object returned by Scala: - Has a defined schema (incl. nested) with up to 100k (!) different possible keys - Has only some portion of the keys populated for each record - Is convertible to JSON - Has to undergo downstream processing in Flink and/or Python UDF with key value access - Has to be ultimately stored in a Kafka/AVRO sink How would you declare the types explicitly in such a case ? Thanks for your support ! Pierre Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a écrit : > Hi Pierre, > > Currently there is no type hint like ‘Map[String, Any]’. The recommended > way is declaring your type more explicitly. > > If you insist on doing this, you can try to declaring a RAW data type for > java.util.HashMap [1], but you may encounter some troubles [2] related to > the kryo serializers. > > Best, > Wei > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw > [2] > https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class > > > 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: > > Hi Wei, > > It works ! Thanks a lot for your support. > I hadn't tried this last combination for option 1, and I had wrong > syntax for option 2. > > So to summarize.. > > Methods working: > - Current: DataTypeHint in UDF definition + SQL for UDF registering > - Outdated: override getResultType in UDF definition > + t_env.register_java_function for UDF registering > > Type conversions working: > - scala.collection.immutable.Map[String,String] => > org.apache.flink.types.Row => ROW<STRING,STRING> > - scala.collection.immutable.Map[String,String] => > java.util.Map[String,String] => MAP<STRING,STRING> > > Any hint for Map[String,Any] ? > > Best regards, > > Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a écrit : > >> Hi Pierre, >> >> Those 2 approaches all work in my local machine, this is my code: >> >> Scala UDF: >> >> package com.dummy >> >> import org.apache.flink.api.common.typeinfo.TypeInformation >> import org.apache.flink.table.annotation.DataTypeHint >> import org.apache.flink.table.api.Types >> import org.apache.flink.table.functions.ScalarFunction >> import org.apache.flink.types.Row >> >> /** >> * The scala UDF. >> */ >> class dummyMap extends ScalarFunction { >> >> // If the udf would be registered by the SQL statement, you need add this >> typehint >> @DataTypeHint("ROW<s STRING,t STRING>") >> def eval(): Row = { >> >> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar")) >> >> } >> >> // If the udf would be registered by the method 'register_java_function', >> you need override this >> // method. >> override def getResultType(signature: Array[Class[_]]): TypeInformation[_] >> = { >> // The type of the return values should be TypeInformation >> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), >> Types.STRING())) >> } >> } >> >> Python code: >> >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import StreamTableEnvironment >> >> s_env = StreamExecutionEnvironment.get_execution_environment() >> st_env = StreamTableEnvironment.create(s_env) >> >> # load the scala udf jar file, the path should be modified to yours >> # or your can also load the jar file via other approaches >> st_env.get_config().get_configuration().set_string("pipeline.jars", " >> file:///Users/zhongwei/the-dummy-udf.jar") >> >> # register the udf via >> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' >> LANGUAGE SCALA") >> # or register via the method >> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap") >> >> # prepare source and sink >> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', >> 'b', 'c']) >> st_env.execute_sql("""create table mySink ( >> output_of_my_scala_udf ROW<s STRING,t STRING> >> ) with ( >> 'connector' = 'print' >> )""") >> >> # execute query >> >> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result() >> >> Best, >> Wei >> >> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: >> >> Hi Wei, >> >> True, I'm using the method you mention, but glad to change. >> I tried your suggestion instead, but got a similar error. >> >> Thanks for your support. That is much more tedious than I thought. >> >> *Option 1 - SQL UDF* >> >> *SQL UDF* >> create_func_ddl = """ >> CREATE FUNCTION dummyMap >> AS 'com.dummy.dummyMap' LANGUAGE SCALA >> """ >> >> t_env.execute_sql(create_func_ddl) >> >> *Error* >> Py4JJavaError: An error occurred while calling o672.execute. >> : org.apache.flink.table.api.TableException: Result field does not match >> requested type. Requested: Row(s: String, t: String); Actual: >> GenericType<org.apache.flink.types.Row> >> >> *Option 2 *- *Overriding getResultType* >> >> Back to the old registering method, but overriding getResultType: >> >> t_env.register_java_function("dummyMap","com.dummy.dummyMap") >> >> *Scala UDF* >> class dummyMap() extends ScalarFunction { >> >> def eval(): Row = { >> >> Row.of(java.lang.String.valueOf("foo"), >> java.lang.String.valueOf("bar")) >> >> } >> >> override def getResultType(signature: Array[Class[_]]): >> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >> } >> >> *Error (on compilation)* >> >> [error] dummyMap.scala:66:90: overloaded method value ROW with >> alternatives: >> [error] (x$1: >> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType >> <and> >> [error] ()org.apache.flink.table.types.DataType <and> >> [error] (x$1: >> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType >> [error] cannot be applied to (org.apache.flink.table.types.DataType, >> org.apache.flink.table.types.DataType) >> [error] override def getResultType(signature: Array[Class[_]]): >> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >> [error] >> ^ >> [error] one error found >> [error] (Compile / compileIncremental) Compilation failed >> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01 >> >> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a >> écrit : >> >>> Hi Pierre, >>> >>> I guess your UDF is registered by the method 'register_java_function' >>> which uses the old type system. In this situation you need to override the >>> 'getResultType' method instead of adding type hint. >>> >>> You can also try to register your UDF via the "CREATE FUNCTION" sql >>> statement, which accepts the type hint. >>> >>> Best, >>> Wei >>> >>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: >>> >>> Hi Wei, >>> >>> Thanks for your suggestion. Same error. >>> >>> *Scala UDF* >>> >>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>")) >>> class dummyMap() extends ScalarFunction { >>> def eval(): Row = { >>> Row.of(java.lang.String.valueOf("foo"), >>> java.lang.String.valueOf("bar")) >>> } >>> } >>> >>> Best regards, >>> >>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a >>> écrit : >>> >>>> Hi Pierre, >>>> >>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t >>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t >>>> STRING>”))' >>>> >>>> Best, >>>> Wei >>>> >>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: >>>> >>>> Hi Dian, Community, >>>> >>>> (bringing the thread back to wider audience) >>>> >>>> As you suggested, I've tried to use DataTypeHint with Row instead of >>>> Map but also this simple case leads to a type mismatch between UDF and >>>> Table API. >>>> I've also tried other Map objects from Flink (table.data.MapData, >>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java >>>> (java.util.Map) in combination with DataTypeHint, without success. >>>> N.B. I'm using version 1.11. >>>> >>>> Am I doing something wrong or am I facing limitations in the toolkit ? >>>> >>>> Thanks in advance for your support ! >>>> >>>> Best regards, >>>> >>>> *Scala UDF* >>>> >>>> class dummyMap() extends ScalarFunction { >>>> >>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>> def eval(): Row = { >>>> >>>> Row.of(java.lang.String.valueOf("foo"), >>>> java.lang.String.valueOf("bar")) >>>> >>>> } >>>> } >>>> >>>> *Table DDL* >>>> >>>> my_sink_ddl = f""" >>>> create table mySink ( >>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>> ) with ( >>>> ... >>>> ) >>>> """ >>>> >>>> *Error* >>>> >>>> Py4JJavaError: An error occurred while calling o2.execute. >>>> : org.apache.flink.table.api.ValidationException: Field types of query >>>> result and registered TableSink >>>> `default_catalog`.`default_database`.`mySink` do not match. >>>> Query result schema: [output_of_my_scala_udf: >>>> GenericType<org.apache.flink.types.Row>] >>>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: String)] >>>> >>>> >>>> >>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer < >>>> pierre.oberhol...@gmail.com> a écrit : >>>> >>>>> Thanks Dian, but same error when using explicit returned type: >>>>> >>>>> class dummyMap() extends ScalarFunction { >>>>> >>>>> def eval() : util.Map[java.lang.String,java.lang.String] = { >>>>> >>>>> val states = Map("key1" -> "val1", "key2" -> "val2") >>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] >>>>> >>>>> } >>>>> } >>>>> >>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a >>>>> écrit : >>>>> >>>>>> You need to explicitly defined the result type the UDF. You could >>>>>> refer to [1] for more details if you are using Flink 1.11. If you are >>>>>> using >>>>>> other versions of Flink, you need to refer to the corresponding >>>>>> documentation. >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >>>>>> >>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>> 写道: >>>>>> >>>>>> ScalarFunction >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Pierre >>>>> >>>> >>>> -- >>>> Pierre >>>> >>>> >>>> >>> >>> -- >>> Pierre >>> >>> >>> >> >> -- >> Pierre >> >> >> > > -- > Pierre > > > -- Pierre