Hi Xingbo, Many thanks for your follow up. Yes you got it right. So using Table API and a ROW object for the nested output of my UDF, and since types are mandatory, I guess this boils down to: - How to nicely specify the types for the 100k fields : shall I use TypeInformation [1] or better retrieve it from Schema Registry [2] ? - Do I have to put NULL values for all the fields that don't have a value in my JSON ? - Will the resulting Table be "sparse" and suffer performance limitations ? Let me know if Table API and ROW are the right candidates here, or if other better alternatives exist. As said I'd be glad to apply some downstream transformations using key,value access (and possibly some Table <-> Pandas operations). Hope that doesn't make it a too long wish list ;)
Thanks a lot ! Best regards, [1] https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly [2] https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a écrit : > Hi Pierre, > > Sorry for the late reply. > Your requirement is that your `Table` has a `field` in `Json` format and > its key has reached 100k, and then you want to use such a `field` as the > input/output of `udf`, right? As to whether there is a limit on the number > of nested key, I am not quite clear. Other contributors with experience in > this area may have answers. On the part of `Python UDF`, if the type of key > or value of your `Map` is `Any`, we do not support it now. You need to > specify a specific type. For more information, please refer to the related > document[1]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html > > Best, > Xingbo > > 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: > > Hello Wei, Dian, Xingbo, > > Not really sure when it is appropriate to knock on the door of the > community ;) > I just wanted to mention that your feedback on the above topic will be > highly appreciated as it will condition the choice of framework on our side > for the months to come, and potentially help the community to cover sparse > data with Flink. > > Thanks a lot ! > > Have a great week-end > > Best, > > Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer < > pierre.oberhol...@gmail.com> a écrit : > >> Hi Wei, >> >> Thanks for the hint. May I please follow up by adding more context and >> ask for your guidance. >> >> In case the bespoken Map[String,Any] object returned by Scala: >> >> - Has a defined schema (incl. nested) with up to 100k (!) different >> possible keys >> - Has only some portion of the keys populated for each record >> - Is convertible to JSON >> - Has to undergo downstream processing in Flink and/or Python UDF with >> key value access >> - Has to be ultimately stored in a Kafka/AVRO sink >> >> How would you declare the types explicitly in such a case ? >> >> Thanks for your support ! >> >> Pierre >> >> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a >> écrit : >> >>> Hi Pierre, >>> >>> Currently there is no type hint like ‘Map[String, Any]’. The recommended >>> way is declaring your type more explicitly. >>> >>> If you insist on doing this, you can try to declaring a RAW data type >>> for java.util.HashMap [1], but you may encounter some troubles [2] related >>> to the kryo serializers. >>> >>> Best, >>> Wei >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw >>> [2] >>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class >>> >>> >>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: >>> >>> Hi Wei, >>> >>> It works ! Thanks a lot for your support. >>> I hadn't tried this last combination for option 1, and I had wrong >>> syntax for option 2. >>> >>> So to summarize.. >>> >>> Methods working: >>> - Current: DataTypeHint in UDF definition + SQL for UDF registering >>> - Outdated: override getResultType in UDF definition >>> + t_env.register_java_function for UDF registering >>> >>> Type conversions working: >>> - scala.collection.immutable.Map[String,String] => >>> org.apache.flink.types.Row => ROW<STRING,STRING> >>> - scala.collection.immutable.Map[String,String] => >>> java.util.Map[String,String] => MAP<STRING,STRING> >>> >>> Any hint for Map[String,Any] ? >>> >>> Best regards, >>> >>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a >>> écrit : >>> >>>> Hi Pierre, >>>> >>>> Those 2 approaches all work in my local machine, this is my code: >>>> >>>> Scala UDF: >>>> >>>> package com.dummy >>>> >>>> import org.apache.flink.api.common.typeinfo.TypeInformation >>>> import org.apache.flink.table.annotation.DataTypeHint >>>> import org.apache.flink.table.api.Types >>>> import org.apache.flink.table.functions.ScalarFunction >>>> import org.apache.flink.types.Row >>>> >>>> /** >>>> * The scala UDF. >>>> */ >>>> class dummyMap extends ScalarFunction { >>>> >>>> // If the udf would be registered by the SQL statement, you need add >>>> this typehint >>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>> def eval(): Row = { >>>> >>>> Row.of(java.lang.String.valueOf("foo"), >>>> java.lang.String.valueOf("bar")) >>>> >>>> } >>>> >>>> // If the udf would be registered by the method >>>> 'register_java_function', you need override this >>>> // method. >>>> override def getResultType(signature: Array[Class[_]]): >>>> TypeInformation[_] = { >>>> // The type of the return values should be TypeInformation >>>> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), >>>> Types.STRING())) >>>> } >>>> } >>>> >>>> Python code: >>>> >>>> from pyflink.datastream import StreamExecutionEnvironment >>>> from pyflink.table import StreamTableEnvironment >>>> >>>> s_env = StreamExecutionEnvironment.get_execution_environment() >>>> st_env = StreamTableEnvironment.create(s_env) >>>> >>>> # load the scala udf jar file, the path should be modified to yours >>>> # or your can also load the jar file via other approaches >>>> st_env.get_config().get_configuration().set_string("pipeline.jars", " >>>> file:///Users/zhongwei/the-dummy-udf.jar") >>>> >>>> # register the udf via >>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' >>>> LANGUAGE SCALA") >>>> # or register via the method >>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap") >>>> >>>> # prepare source and sink >>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], >>>> ['a', 'b', 'c']) >>>> st_env.execute_sql("""create table mySink ( >>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>> ) with ( >>>> 'connector' = 'print' >>>> )""") >>>> >>>> # execute query >>>> >>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result() >>>> >>>> Best, >>>> Wei >>>> >>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: >>>> >>>> Hi Wei, >>>> >>>> True, I'm using the method you mention, but glad to change. >>>> I tried your suggestion instead, but got a similar error. >>>> >>>> Thanks for your support. That is much more tedious than I thought. >>>> >>>> *Option 1 - SQL UDF* >>>> >>>> *SQL UDF* >>>> create_func_ddl = """ >>>> CREATE FUNCTION dummyMap >>>> AS 'com.dummy.dummyMap' LANGUAGE SCALA >>>> """ >>>> >>>> t_env.execute_sql(create_func_ddl) >>>> >>>> *Error* >>>> Py4JJavaError: An error occurred while calling o672.execute. >>>> : org.apache.flink.table.api.TableException: Result field does not >>>> match requested type. Requested: Row(s: String, t: String); Actual: >>>> GenericType<org.apache.flink.types.Row> >>>> >>>> *Option 2 *- *Overriding getResultType* >>>> >>>> Back to the old registering method, but overriding getResultType: >>>> >>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap") >>>> >>>> *Scala UDF* >>>> class dummyMap() extends ScalarFunction { >>>> >>>> def eval(): Row = { >>>> >>>> Row.of(java.lang.String.valueOf("foo"), >>>> java.lang.String.valueOf("bar")) >>>> >>>> } >>>> >>>> override def getResultType(signature: Array[Class[_]]): >>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>> } >>>> >>>> *Error (on compilation)* >>>> >>>> [error] dummyMap.scala:66:90: overloaded method value ROW with >>>> alternatives: >>>> [error] (x$1: >>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType >>>> <and> >>>> [error] ()org.apache.flink.table.types.DataType <and> >>>> [error] (x$1: >>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType >>>> [error] cannot be applied to (org.apache.flink.table.types.DataType, >>>> org.apache.flink.table.types.DataType) >>>> [error] override def getResultType(signature: Array[Class[_]]): >>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>> [error] >>>> ^ >>>> [error] one error found >>>> [error] (Compile / compileIncremental) Compilation failed >>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01 >>>> >>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a >>>> écrit : >>>> >>>>> Hi Pierre, >>>>> >>>>> I guess your UDF is registered by the method 'register_java_function' >>>>> which uses the old type system. In this situation you need to override the >>>>> 'getResultType' method instead of adding type hint. >>>>> >>>>> You can also try to register your UDF via the "CREATE FUNCTION" sql >>>>> statement, which accepts the type hint. >>>>> >>>>> Best, >>>>> Wei >>>>> >>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>> 写道: >>>>> >>>>> Hi Wei, >>>>> >>>>> Thanks for your suggestion. Same error. >>>>> >>>>> *Scala UDF* >>>>> >>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>")) >>>>> class dummyMap() extends ScalarFunction { >>>>> def eval(): Row = { >>>>> Row.of(java.lang.String.valueOf("foo"), >>>>> java.lang.String.valueOf("bar")) >>>>> } >>>>> } >>>>> >>>>> Best regards, >>>>> >>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a >>>>> écrit : >>>>> >>>>>> Hi Pierre, >>>>>> >>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t >>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t >>>>>> STRING>”))' >>>>>> >>>>>> Best, >>>>>> Wei >>>>>> >>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>> 写道: >>>>>> >>>>>> Hi Dian, Community, >>>>>> >>>>>> (bringing the thread back to wider audience) >>>>>> >>>>>> As you suggested, I've tried to use DataTypeHint with Row instead of >>>>>> Map but also this simple case leads to a type mismatch between UDF >>>>>> and Table API. >>>>>> I've also tried other Map objects from Flink (table.data.MapData, >>>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java >>>>>> (java.util.Map) in combination with DataTypeHint, without success. >>>>>> N.B. I'm using version 1.11. >>>>>> >>>>>> Am I doing something wrong or am I facing limitations in the toolkit ? >>>>>> >>>>>> Thanks in advance for your support ! >>>>>> >>>>>> Best regards, >>>>>> >>>>>> *Scala UDF* >>>>>> >>>>>> class dummyMap() extends ScalarFunction { >>>>>> >>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>> def eval(): Row = { >>>>>> >>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>> java.lang.String.valueOf("bar")) >>>>>> >>>>>> } >>>>>> } >>>>>> >>>>>> *Table DDL* >>>>>> >>>>>> my_sink_ddl = f""" >>>>>> create table mySink ( >>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>> ) with ( >>>>>> ... >>>>>> ) >>>>>> """ >>>>>> >>>>>> *Error* >>>>>> >>>>>> Py4JJavaError: An error occurred while calling o2.execute. >>>>>> : org.apache.flink.table.api.ValidationException: Field types of >>>>>> query result and registered TableSink >>>>>> `default_catalog`.`default_database`.`mySink` do not match. >>>>>> Query result schema: [output_of_my_scala_udf: >>>>>> GenericType<org.apache.flink.types.Row>] >>>>>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: >>>>>> String)] >>>>>> >>>>>> >>>>>> >>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer < >>>>>> pierre.oberhol...@gmail.com> a écrit : >>>>>> >>>>>>> Thanks Dian, but same error when using explicit returned type: >>>>>>> >>>>>>> class dummyMap() extends ScalarFunction { >>>>>>> >>>>>>> def eval() : util.Map[java.lang.String,java.lang.String] = { >>>>>>> >>>>>>> val states = Map("key1" -> "val1", "key2" -> "val2") >>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] >>>>>>> >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a >>>>>>> écrit : >>>>>>> >>>>>>>> You need to explicitly defined the result type the UDF. You could >>>>>>>> refer to [1] for more details if you are using Flink 1.11. If you are >>>>>>>> using >>>>>>>> other versions of Flink, you need to refer to the corresponding >>>>>>>> documentation. >>>>>>>> >>>>>>>> [1] >>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >>>>>>>> >>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>>> 写道: >>>>>>>> >>>>>>>> ScalarFunction >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Pierre >>>>>>> >>>>>> >>>>>> -- >>>>>> Pierre >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Pierre >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Pierre >>>> >>>> >>>> >>> >>> -- >>> Pierre >>> >>> >>> >> >> -- >> Pierre >> > > > -- > Pierre > > > -- Pierre