Hi Pierre, Have you ever thought of declaring your entire json as a string field in `Table` and putting the parsing work in UDF?
Best, Xingbo Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 上午4:13写道: > Hi Xingbo, > > Many thanks for your follow up. Yes you got it right. > So using Table API and a ROW object for the nested output of my UDF, and > since types are mandatory, I guess this boils down to: > - How to nicely specify the types for the 100k fields : shall I use > TypeInformation [1] or better retrieve it from Schema Registry [2] ? > - Do I have to put NULL values for all the fields that don't have a value > in my JSON ? > - Will the resulting Table be "sparse" and suffer performance limitations ? > Let me know if Table API and ROW are the right candidates here, or if > other better alternatives exist. > As said I'd be glad to apply some downstream transformations using > key,value access (and possibly some Table <-> Pandas operations). Hope that > doesn't make it a too long wish list ;) > > Thanks a lot ! > > Best regards, > > [1] > https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly > [2] > https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html > > Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a écrit : > >> Hi Pierre, >> >> Sorry for the late reply. >> Your requirement is that your `Table` has a `field` in `Json` format and >> its key has reached 100k, and then you want to use such a `field` as the >> input/output of `udf`, right? As to whether there is a limit on the number >> of nested key, I am not quite clear. Other contributors with experience in >> this area may have answers. On the part of `Python UDF`, if the type of key >> or value of your `Map` is `Any`, we do not support it now. You need to >> specify a specific type. For more information, please refer to the related >> document[1]. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html >> >> Best, >> Xingbo >> >> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: >> >> Hello Wei, Dian, Xingbo, >> >> Not really sure when it is appropriate to knock on the door of the >> community ;) >> I just wanted to mention that your feedback on the above topic will be >> highly appreciated as it will condition the choice of framework on our side >> for the months to come, and potentially help the community to cover sparse >> data with Flink. >> >> Thanks a lot ! >> >> Have a great week-end >> >> Best, >> >> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer < >> pierre.oberhol...@gmail.com> a écrit : >> >>> Hi Wei, >>> >>> Thanks for the hint. May I please follow up by adding more context and >>> ask for your guidance. >>> >>> In case the bespoken Map[String,Any] object returned by Scala: >>> >>> - Has a defined schema (incl. nested) with up to 100k (!) different >>> possible keys >>> - Has only some portion of the keys populated for each record >>> - Is convertible to JSON >>> - Has to undergo downstream processing in Flink and/or Python UDF with >>> key value access >>> - Has to be ultimately stored in a Kafka/AVRO sink >>> >>> How would you declare the types explicitly in such a case ? >>> >>> Thanks for your support ! >>> >>> Pierre >>> >>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a >>> écrit : >>> >>>> Hi Pierre, >>>> >>>> Currently there is no type hint like ‘Map[String, Any]’. The >>>> recommended way is declaring your type more explicitly. >>>> >>>> If you insist on doing this, you can try to declaring a RAW data type >>>> for java.util.HashMap [1], but you may encounter some troubles [2] related >>>> to the kryo serializers. >>>> >>>> Best, >>>> Wei >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw >>>> [2] >>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class >>>> >>>> >>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: >>>> >>>> Hi Wei, >>>> >>>> It works ! Thanks a lot for your support. >>>> I hadn't tried this last combination for option 1, and I had wrong >>>> syntax for option 2. >>>> >>>> So to summarize.. >>>> >>>> Methods working: >>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering >>>> - Outdated: override getResultType in UDF definition >>>> + t_env.register_java_function for UDF registering >>>> >>>> Type conversions working: >>>> - scala.collection.immutable.Map[String,String] => >>>> org.apache.flink.types.Row => ROW<STRING,STRING> >>>> - scala.collection.immutable.Map[String,String] => >>>> java.util.Map[String,String] => MAP<STRING,STRING> >>>> >>>> Any hint for Map[String,Any] ? >>>> >>>> Best regards, >>>> >>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a >>>> écrit : >>>> >>>>> Hi Pierre, >>>>> >>>>> Those 2 approaches all work in my local machine, this is my code: >>>>> >>>>> Scala UDF: >>>>> >>>>> package com.dummy >>>>> >>>>> import org.apache.flink.api.common.typeinfo.TypeInformation >>>>> import org.apache.flink.table.annotation.DataTypeHint >>>>> import org.apache.flink.table.api.Types >>>>> import org.apache.flink.table.functions.ScalarFunction >>>>> import org.apache.flink.types.Row >>>>> >>>>> /** >>>>> * The scala UDF. >>>>> */ >>>>> class dummyMap extends ScalarFunction { >>>>> >>>>> // If the udf would be registered by the SQL statement, you need add >>>>> this typehint >>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>> def eval(): Row = { >>>>> >>>>> Row.of(java.lang.String.valueOf("foo"), >>>>> java.lang.String.valueOf("bar")) >>>>> >>>>> } >>>>> >>>>> // If the udf would be registered by the method >>>>> 'register_java_function', you need override this >>>>> // method. >>>>> override def getResultType(signature: Array[Class[_]]): >>>>> TypeInformation[_] = { >>>>> // The type of the return values should be TypeInformation >>>>> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), >>>>> Types.STRING())) >>>>> } >>>>> } >>>>> >>>>> Python code: >>>>> >>>>> from pyflink.datastream import StreamExecutionEnvironment >>>>> from pyflink.table import StreamTableEnvironment >>>>> >>>>> s_env = StreamExecutionEnvironment.get_execution_environment() >>>>> st_env = StreamTableEnvironment.create(s_env) >>>>> >>>>> # load the scala udf jar file, the path should be modified to yours >>>>> # or your can also load the jar file via other approaches >>>>> st_env.get_config().get_configuration().set_string("pipeline.jars", " >>>>> file:///Users/zhongwei/the-dummy-udf.jar") >>>>> >>>>> # register the udf via >>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' >>>>> LANGUAGE SCALA") >>>>> # or register via the method >>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap") >>>>> >>>>> # prepare source and sink >>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], >>>>> ['a', 'b', 'c']) >>>>> st_env.execute_sql("""create table mySink ( >>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>> ) with ( >>>>> 'connector' = 'print' >>>>> )""") >>>>> >>>>> # execute query >>>>> >>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result() >>>>> >>>>> Best, >>>>> Wei >>>>> >>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>> 写道: >>>>> >>>>> Hi Wei, >>>>> >>>>> True, I'm using the method you mention, but glad to change. >>>>> I tried your suggestion instead, but got a similar error. >>>>> >>>>> Thanks for your support. That is much more tedious than I thought. >>>>> >>>>> *Option 1 - SQL UDF* >>>>> >>>>> *SQL UDF* >>>>> create_func_ddl = """ >>>>> CREATE FUNCTION dummyMap >>>>> AS 'com.dummy.dummyMap' LANGUAGE SCALA >>>>> """ >>>>> >>>>> t_env.execute_sql(create_func_ddl) >>>>> >>>>> *Error* >>>>> Py4JJavaError: An error occurred while calling o672.execute. >>>>> : org.apache.flink.table.api.TableException: Result field does not >>>>> match requested type. Requested: Row(s: String, t: String); Actual: >>>>> GenericType<org.apache.flink.types.Row> >>>>> >>>>> *Option 2 *- *Overriding getResultType* >>>>> >>>>> Back to the old registering method, but overriding getResultType: >>>>> >>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap") >>>>> >>>>> *Scala UDF* >>>>> class dummyMap() extends ScalarFunction { >>>>> >>>>> def eval(): Row = { >>>>> >>>>> Row.of(java.lang.String.valueOf("foo"), >>>>> java.lang.String.valueOf("bar")) >>>>> >>>>> } >>>>> >>>>> override def getResultType(signature: Array[Class[_]]): >>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>> } >>>>> >>>>> *Error (on compilation)* >>>>> >>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with >>>>> alternatives: >>>>> [error] (x$1: >>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType >>>>> <and> >>>>> [error] ()org.apache.flink.table.types.DataType <and> >>>>> [error] (x$1: >>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType >>>>> [error] cannot be applied to (org.apache.flink.table.types.DataType, >>>>> org.apache.flink.table.types.DataType) >>>>> [error] override def getResultType(signature: Array[Class[_]]): >>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>> [error] >>>>> ^ >>>>> [error] one error found >>>>> [error] (Compile / compileIncremental) Compilation failed >>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01 >>>>> >>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a >>>>> écrit : >>>>> >>>>>> Hi Pierre, >>>>>> >>>>>> I guess your UDF is registered by the method 'register_java_function' >>>>>> which uses the old type system. In this situation you need to override >>>>>> the >>>>>> 'getResultType' method instead of adding type hint. >>>>>> >>>>>> You can also try to register your UDF via the "CREATE FUNCTION" sql >>>>>> statement, which accepts the type hint. >>>>>> >>>>>> Best, >>>>>> Wei >>>>>> >>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>> 写道: >>>>>> >>>>>> Hi Wei, >>>>>> >>>>>> Thanks for your suggestion. Same error. >>>>>> >>>>>> *Scala UDF* >>>>>> >>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>")) >>>>>> class dummyMap() extends ScalarFunction { >>>>>> def eval(): Row = { >>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>> java.lang.String.valueOf("bar")) >>>>>> } >>>>>> } >>>>>> >>>>>> Best regards, >>>>>> >>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a >>>>>> écrit : >>>>>> >>>>>>> Hi Pierre, >>>>>>> >>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t >>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t >>>>>>> STRING>”))' >>>>>>> >>>>>>> Best, >>>>>>> Wei >>>>>>> >>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>> 写道: >>>>>>> >>>>>>> Hi Dian, Community, >>>>>>> >>>>>>> (bringing the thread back to wider audience) >>>>>>> >>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead >>>>>>> of Map but also this simple case leads to a type mismatch between >>>>>>> UDF and Table API. >>>>>>> I've also tried other Map objects from Flink (table.data.MapData, >>>>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java >>>>>>> (java.util.Map) in combination with DataTypeHint, without success. >>>>>>> N.B. I'm using version 1.11. >>>>>>> >>>>>>> Am I doing something wrong or am I facing limitations in the toolkit >>>>>>> ? >>>>>>> >>>>>>> Thanks in advance for your support ! >>>>>>> >>>>>>> Best regards, >>>>>>> >>>>>>> *Scala UDF* >>>>>>> >>>>>>> class dummyMap() extends ScalarFunction { >>>>>>> >>>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>>> def eval(): Row = { >>>>>>> >>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>> java.lang.String.valueOf("bar")) >>>>>>> >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> *Table DDL* >>>>>>> >>>>>>> my_sink_ddl = f""" >>>>>>> create table mySink ( >>>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>>> ) with ( >>>>>>> ... >>>>>>> ) >>>>>>> """ >>>>>>> >>>>>>> *Error* >>>>>>> >>>>>>> Py4JJavaError: An error occurred while calling o2.execute. >>>>>>> : org.apache.flink.table.api.ValidationException: Field types of >>>>>>> query result and registered TableSink >>>>>>> `default_catalog`.`default_database`.`mySink` do not match. >>>>>>> Query result schema: [output_of_my_scala_udf: >>>>>>> GenericType<org.apache.flink.types.Row>] >>>>>>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: >>>>>>> String)] >>>>>>> >>>>>>> >>>>>>> >>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer < >>>>>>> pierre.oberhol...@gmail.com> a écrit : >>>>>>> >>>>>>>> Thanks Dian, but same error when using explicit returned type: >>>>>>>> >>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>> >>>>>>>> def eval() : util.Map[java.lang.String,java.lang.String] = { >>>>>>>> >>>>>>>> val states = Map("key1" -> "val1", "key2" -> "val2") >>>>>>>> >>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] >>>>>>>> >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a >>>>>>>> écrit : >>>>>>>> >>>>>>>>> You need to explicitly defined the result type the UDF. You could >>>>>>>>> refer to [1] for more details if you are using Flink 1.11. If you are >>>>>>>>> using >>>>>>>>> other versions of Flink, you need to refer to the corresponding >>>>>>>>> documentation. >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >>>>>>>>> >>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer < >>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>> >>>>>>>>> ScalarFunction >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Pierre >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Pierre >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Pierre >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Pierre >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Pierre >>>> >>>> >>>> >>> >>> -- >>> Pierre >>> >> >> >> -- >> Pierre >> >> >> > > -- > Pierre >