Hi Pierre, Sorry for the late reply. Your requirement is that your `Table` has a `field` in `Json` format and its key has reached 100k, and then you want to use such a `field` as the input/output of `udf`, right? As to whether there is a limit on the number of nested key, I am not quite clear. Other contributors with experience in this area may have answers. On the part of `Python UDF`, if the type of key or value of your `Map` is `Any`, we do not support it now. You need to specify a specific type. For more information, please refer to the related document[1].
[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html Best, Xingbo > 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: > > Hello Wei, Dian, Xingbo, > > Not really sure when it is appropriate to knock on the door of the community > ;) > I just wanted to mention that your feedback on the above topic will be highly > appreciated as it will condition the choice of framework on our side for the > months to come, and potentially help the community to cover sparse data with > Flink. > > Thanks a lot ! > > Have a great week-end > > Best, > > Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <pierre.oberhol...@gmail.com > <mailto:pierre.oberhol...@gmail.com>> a écrit : > Hi Wei, > > Thanks for the hint. May I please follow up by adding more context and ask > for your guidance. > > In case the bespoken Map[String,Any] object returned by Scala: > > - Has a defined schema (incl. nested) with up to 100k (!) different possible > keys > - Has only some portion of the keys populated for each record > - Is convertible to JSON > - Has to undergo downstream processing in Flink and/or Python UDF with key > value access > - Has to be ultimately stored in a Kafka/AVRO sink > > How would you declare the types explicitly in such a case ? > > Thanks for your support ! > > Pierre > > Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com > <mailto:weizhong0...@gmail.com>> a écrit : > Hi Pierre, > > Currently there is no type hint like ‘Map[String, Any]’. The recommended way > is declaring your type more explicitly. > > If you insist on doing this, you can try to declaring a RAW data type for > java.util.HashMap [1], but you may encounter some troubles [2] related to the > kryo serializers. > > Best, > Wei > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw > > <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw> > [2] > https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class > > <https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class> > > >> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com >> <mailto:pierre.oberhol...@gmail.com>> 写道: >> >> Hi Wei, >> >> It works ! Thanks a lot for your support. >> I hadn't tried this last combination for option 1, and I had wrong syntax >> for option 2. >> >> So to summarize.. >> >> Methods working: >> - Current: DataTypeHint in UDF definition + SQL for UDF registering >> - Outdated: override getResultType in UDF definition + >> t_env.register_java_function for UDF registering >> >> Type conversions working: >> - scala.collection.immutable.Map[String,String] => >> org.apache.flink.types.Row => ROW<STRING,STRING> >> - scala.collection.immutable.Map[String,String] => >> java.util.Map[String,String] => MAP<STRING,STRING> >> >> Any hint for Map[String,Any] ? >> >> Best regards, >> >> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com >> <mailto:weizhong0...@gmail.com>> a écrit : >> Hi Pierre, >> >> Those 2 approaches all work in my local machine, this is my code: >> >> Scala UDF: >> package com.dummy >> >> import org.apache.flink.api.common.typeinfo.TypeInformation >> import org.apache.flink.table.annotation.DataTypeHint >> import org.apache.flink.table.api.Types >> import org.apache.flink.table.functions.ScalarFunction >> import org.apache.flink.types.Row >> >> /** >> * The scala UDF. >> */ >> class dummyMap extends ScalarFunction { >> >> // If the udf would be registered by the SQL statement, you need add this >> typehint >> @DataTypeHint("ROW<s STRING,t STRING>") >> def eval(): Row = { >> >> Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar")) >> >> } >> >> // If the udf would be registered by the method 'register_java_function', >> you need override this >> // method. >> override def getResultType(signature: Array[Class[_]]): TypeInformation[_] >> = { >> // The type of the return values should be TypeInformation >> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), >> Types.STRING())) >> } >> } >> Python code: >> >> from pyflink.datastream import StreamExecutionEnvironment >> from pyflink.table import StreamTableEnvironment >> >> s_env = StreamExecutionEnvironment.get_execution_environment() >> st_env = StreamTableEnvironment.create(s_env) >> >> # load the scala udf jar file, the path should be modified to yours >> # or your can also load the jar file via other approaches >> st_env.get_config().get_configuration().set_string("pipeline.jars", >> "file:///Users/zhongwei/the-dummy-udf.jar <>") >> >> # register the udf via >> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' >> LANGUAGE SCALA") >> # or register via the method >> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap") >> >> # prepare source and sink >> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a', >> 'b', 'c']) >> st_env.execute_sql("""create table mySink ( >> output_of_my_scala_udf ROW<s STRING,t STRING> >> ) with ( >> 'connector' = 'print' >> )""") >> >> # execute query >> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result() >> >> Best, >> Wei >> >>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com >>> <mailto:pierre.oberhol...@gmail.com>> 写道: >>> >>> Hi Wei, >>> >>> True, I'm using the method you mention, but glad to change. >>> I tried your suggestion instead, but got a similar error. >>> >>> Thanks for your support. That is much more tedious than I thought. >>> >>> Option 1 - SQL UDF >>> >>> SQL UDF >>> create_func_ddl = """ >>> CREATE FUNCTION dummyMap >>> AS 'com.dummy.dummyMap' LANGUAGE SCALA >>> """ >>> >>> t_env.execute_sql(create_func_ddl) >>> >>> Error >>> Py4JJavaError: An error occurred while calling o672.execute. >>> : org.apache.flink.table.api.TableException: Result field does not match >>> requested type. Requested: Row(s: String, t: String); Actual: >>> GenericType<org.apache.flink.types.Row> >>> >>> Option 2 - Overriding getResultType >>> >>> Back to the old registering method, but overriding getResultType: >>> >>> t_env.register_java_function("dummyMap","com.dummy.dummyMap") >>> >>> Scala UDF >>> class dummyMap() extends ScalarFunction { >>> >>> def eval(): Row = { >>> >>> Row.of(java.lang.String.valueOf("foo"), >>> java.lang.String.valueOf("bar")) >>> >>> } >>> >>> override def getResultType(signature: Array[Class[_]]): >>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>> } >>> >>> Error (on compilation) >>> >>> [error] dummyMap.scala:66:90: overloaded method value ROW with alternatives: >>> [error] (x$1: >>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType >>> <and> >>> [error] ()org.apache.flink.table.types.DataType <and> >>> [error] (x$1: >>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType >>> [error] cannot be applied to (org.apache.flink.table.types.DataType, >>> org.apache.flink.table.types.DataType) >>> [error] override def getResultType(signature: Array[Class[_]]): >>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>> [error] >>> ^ >>> [error] one error found >>> [error] (Compile / compileIncremental) Compilation failed >>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01 >>> >>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com >>> <mailto:weizhong0...@gmail.com>> a écrit : >>> Hi Pierre, >>> >>> I guess your UDF is registered by the method 'register_java_function' which >>> uses the old type system. In this situation you need to override the >>> 'getResultType' method instead of adding type hint. >>> >>> You can also try to register your UDF via the "CREATE FUNCTION" sql >>> statement, which accepts the type hint. >>> >>> Best, >>> Wei >>> >>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com >>>> <mailto:pierre.oberhol...@gmail.com>> 写道: >>>> >>>> Hi Wei, >>>> >>>> Thanks for your suggestion. Same error. >>>> >>>> Scala UDF >>>> >>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>")) >>>> class dummyMap() extends ScalarFunction { >>>> def eval(): Row = { >>>> Row.of(java.lang.String.valueOf("foo"), >>>> java.lang.String.valueOf("bar")) >>>> } >>>> } >>>> >>>> Best regards, >>>> >>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com >>>> <mailto:weizhong0...@gmail.com>> a écrit : >>>> Hi Pierre, >>>> >>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t STRING>")' with >>>> '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>”))' >>>> >>>> Best, >>>> Wei >>>> >>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com >>>>> <mailto:pierre.oberhol...@gmail.com>> 写道: >>>>> >>>>> Hi Dian, Community, >>>>> >>>>> (bringing the thread back to wider audience) >>>>> >>>>> As you suggested, I've tried to use DataTypeHint with Row instead of Map >>>>> but also this simple case leads to a type mismatch between UDF and Table >>>>> API. >>>>> I've also tried other Map objects from Flink (table.data.MapData, >>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java >>>>> (java.util.Map) in combination with DataTypeHint, without success. >>>>> N.B. I'm using version 1.11. >>>>> >>>>> Am I doing something wrong or am I facing limitations in the toolkit ? >>>>> >>>>> Thanks in advance for your support ! >>>>> >>>>> Best regards, >>>>> >>>>> Scala UDF >>>>> >>>>> class dummyMap() extends ScalarFunction { >>>>> >>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>> def eval(): Row = { >>>>> >>>>> Row.of(java.lang.String.valueOf("foo"), >>>>> java.lang.String.valueOf("bar")) >>>>> >>>>> } >>>>> } >>>>> >>>>> Table DDL >>>>> >>>>> my_sink_ddl = f""" >>>>> create table mySink ( >>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>> ) with ( >>>>> ... >>>>> ) >>>>> """ >>>>> >>>>> Error >>>>> >>>>> Py4JJavaError: An error occurred while calling o2.execute. >>>>> : org.apache.flink.table.api.ValidationException: Field types of query >>>>> result and registered TableSink >>>>> `default_catalog`.`default_database`.`mySink` do not match. >>>>> Query result schema: [output_of_my_scala_udf: >>>>> GenericType<org.apache.flink.types.Row>] >>>>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: String)] >>>>> >>>>> >>>>> >>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer >>>>> <pierre.oberhol...@gmail.com <mailto:pierre.oberhol...@gmail.com>> a >>>>> écrit : >>>>> Thanks Dian, but same error when using explicit returned type: >>>>> >>>>> class dummyMap() extends ScalarFunction { >>>>> >>>>> def eval() : util.Map[java.lang.String,java.lang.String] = { >>>>> >>>>> val states = Map("key1" -> "val1", "key2" -> "val2") >>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] >>>>> >>>>> } >>>>> } >>>>> >>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com >>>>> <mailto:dian0511...@gmail.com>> a écrit : >>>>> You need to explicitly defined the result type the UDF. You could refer >>>>> to [1] for more details if you are using Flink 1.11. If you are using >>>>> other versions of Flink, you need to refer to the corresponding >>>>> documentation. >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >>>>> >>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide> >>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com >>>>>> <mailto:pierre.oberhol...@gmail.com>> 写道: >>>>>> >>>>>> ScalarFunction >>>>> >>>>> >>>>> >>>>> -- >>>>> Pierre >>>>> >>>>> -- >>>>> Pierre >>>> >>>> >>>> >>>> -- >>>> Pierre >>> >>> >>> >>> -- >>> Pierre >> >> >> >> -- >> Pierre > > > > -- > Pierre > > > -- > Pierre