Hi Pierre, I wrote a PyFlink implementation, you can see if it meets your needs:
from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, EnvironmentSettings, DataTypes from pyflink.table.udf import udf def test(): env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env, environment_settings=EnvironmentSettings.new_instance() .in_streaming_mode().use_blink_planner().build()) t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m') # 10k nested columns num_field = 10_000 fields = ['f%s INT' % i for i in range(num_field)] field_str = ','.join(fields) t_env.execute_sql(f""" CREATE TABLE source_table ( f0 BIGINT, f1 DECIMAL(32,2), f2 ROW<${field_str}>, f3 TIMESTAMP(3) ) WITH ( 'connector' = 'datagen', 'number-of-rows' = '2' ) """) t_env.execute_sql(f""" CREATE TABLE print_table ( f0 BIGINT, f1 DECIMAL(32,2), f2 ROW<${field_str}>, f3 TIMESTAMP(3) ) WITH ( 'connector' = 'print' ) """) result_type = DataTypes.ROW( [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in range(num_field)]) func = udf(lambda x: x, result_type=result_type) source = t_env.from_path("source_table") result = source.select(source.f0, source.f1, func(source.f2), source.f3) result.execute_insert("print_table") if __name__ == '__main__': test() Best, Xingbo Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 下午6:10写道: > Hi Xingbo, > > That would mean giving up on using Flink (table) features on the content > of the parsed JSON objects, so definitely a big loss. Let me know if I > missed something. > > Thanks ! > > Le mar. 1 déc. 2020 à 07:26, Xingbo Huang <hxbks...@gmail.com> a écrit : > >> Hi Pierre, >> >> Have you ever thought of declaring your entire json as a string field in >> `Table` and putting the parsing work in UDF? >> >> Best, >> Xingbo >> >> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 上午4:13写道: >> >>> Hi Xingbo, >>> >>> Many thanks for your follow up. Yes you got it right. >>> So using Table API and a ROW object for the nested output of my UDF, and >>> since types are mandatory, I guess this boils down to: >>> - How to nicely specify the types for the 100k fields : shall I use >>> TypeInformation [1] or better retrieve it from Schema Registry [2] ? >>> - Do I have to put NULL values for all the fields that don't have a >>> value in my JSON ? >>> - Will the resulting Table be "sparse" and suffer performance >>> limitations ? >>> Let me know if Table API and ROW are the right candidates here, or if >>> other better alternatives exist. >>> As said I'd be glad to apply some downstream transformations using >>> key,value access (and possibly some Table <-> Pandas operations). Hope that >>> doesn't make it a too long wish list ;) >>> >>> Thanks a lot ! >>> >>> Best regards, >>> >>> [1] >>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly >>> [2] >>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html >>> >>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a >>> écrit : >>> >>>> Hi Pierre, >>>> >>>> Sorry for the late reply. >>>> Your requirement is that your `Table` has a `field` in `Json` format >>>> and its key has reached 100k, and then you want to use such a `field` as >>>> the input/output of `udf`, right? As to whether there is a limit on the >>>> number of nested key, I am not quite clear. Other contributors with >>>> experience in this area may have answers. On the part of `Python UDF`, if >>>> the type of key or value of your `Map` is `Any`, we do not support it now. >>>> You need to specify a specific type. For more information, please refer to >>>> the related document[1]. >>>> >>>> [1] >>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html >>>> >>>> Best, >>>> Xingbo >>>> >>>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: >>>> >>>> Hello Wei, Dian, Xingbo, >>>> >>>> Not really sure when it is appropriate to knock on the door of the >>>> community ;) >>>> I just wanted to mention that your feedback on the above topic will be >>>> highly appreciated as it will condition the choice of framework on our side >>>> for the months to come, and potentially help the community to cover sparse >>>> data with Flink. >>>> >>>> Thanks a lot ! >>>> >>>> Have a great week-end >>>> >>>> Best, >>>> >>>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer < >>>> pierre.oberhol...@gmail.com> a écrit : >>>> >>>>> Hi Wei, >>>>> >>>>> Thanks for the hint. May I please follow up by adding more context and >>>>> ask for your guidance. >>>>> >>>>> In case the bespoken Map[String,Any] object returned by Scala: >>>>> >>>>> - Has a defined schema (incl. nested) with up to 100k (!) different >>>>> possible keys >>>>> - Has only some portion of the keys populated for each record >>>>> - Is convertible to JSON >>>>> - Has to undergo downstream processing in Flink and/or Python UDF with >>>>> key value access >>>>> - Has to be ultimately stored in a Kafka/AVRO sink >>>>> >>>>> How would you declare the types explicitly in such a case ? >>>>> >>>>> Thanks for your support ! >>>>> >>>>> Pierre >>>>> >>>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a >>>>> écrit : >>>>> >>>>>> Hi Pierre, >>>>>> >>>>>> Currently there is no type hint like ‘Map[String, Any]’. The >>>>>> recommended way is declaring your type more explicitly. >>>>>> >>>>>> If you insist on doing this, you can try to declaring a RAW data type >>>>>> for java.util.HashMap [1], but you may encounter some troubles [2] >>>>>> related >>>>>> to the kryo serializers. >>>>>> >>>>>> Best, >>>>>> Wei >>>>>> >>>>>> [1] >>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw >>>>>> [2] >>>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class >>>>>> >>>>>> >>>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>> 写道: >>>>>> >>>>>> Hi Wei, >>>>>> >>>>>> It works ! Thanks a lot for your support. >>>>>> I hadn't tried this last combination for option 1, and I had wrong >>>>>> syntax for option 2. >>>>>> >>>>>> So to summarize.. >>>>>> >>>>>> Methods working: >>>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering >>>>>> - Outdated: override getResultType in UDF definition >>>>>> + t_env.register_java_function for UDF registering >>>>>> >>>>>> Type conversions working: >>>>>> - scala.collection.immutable.Map[String,String] => >>>>>> org.apache.flink.types.Row => ROW<STRING,STRING> >>>>>> - scala.collection.immutable.Map[String,String] => >>>>>> java.util.Map[String,String] => MAP<STRING,STRING> >>>>>> >>>>>> Any hint for Map[String,Any] ? >>>>>> >>>>>> Best regards, >>>>>> >>>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a >>>>>> écrit : >>>>>> >>>>>>> Hi Pierre, >>>>>>> >>>>>>> Those 2 approaches all work in my local machine, this is my code: >>>>>>> >>>>>>> Scala UDF: >>>>>>> >>>>>>> package com.dummy >>>>>>> >>>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation >>>>>>> import org.apache.flink.table.annotation.DataTypeHint >>>>>>> import org.apache.flink.table.api.Types >>>>>>> import org.apache.flink.table.functions.ScalarFunction >>>>>>> import org.apache.flink.types.Row >>>>>>> >>>>>>> /** >>>>>>> * The scala UDF. >>>>>>> */ >>>>>>> class dummyMap extends ScalarFunction { >>>>>>> >>>>>>> // If the udf would be registered by the SQL statement, you need add >>>>>>> this typehint >>>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>>> def eval(): Row = { >>>>>>> >>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>> java.lang.String.valueOf("bar")) >>>>>>> >>>>>>> } >>>>>>> >>>>>>> // If the udf would be registered by the method >>>>>>> 'register_java_function', you need override this >>>>>>> // method. >>>>>>> override def getResultType(signature: Array[Class[_]]): >>>>>>> TypeInformation[_] = { >>>>>>> // The type of the return values should be TypeInformation >>>>>>> Types.ROW(Array("s", "t"), >>>>>>> Array[TypeInformation[_]](Types.STRING(), Types.STRING())) >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> Python code: >>>>>>> >>>>>>> from pyflink.datastream import StreamExecutionEnvironment >>>>>>> from pyflink.table import StreamTableEnvironment >>>>>>> >>>>>>> s_env = StreamExecutionEnvironment.get_execution_environment() >>>>>>> st_env = StreamTableEnvironment.create(s_env) >>>>>>> >>>>>>> # load the scala udf jar file, the path should be modified to yours >>>>>>> # or your can also load the jar file via other approaches >>>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars", " >>>>>>> file:///Users/zhongwei/the-dummy-udf.jar") >>>>>>> >>>>>>> # register the udf via >>>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' >>>>>>> LANGUAGE SCALA") >>>>>>> # or register via the method >>>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap") >>>>>>> >>>>>>> # prepare source and sink >>>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], >>>>>>> ['a', 'b', 'c']) >>>>>>> st_env.execute_sql("""create table mySink ( >>>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>>> ) with ( >>>>>>> 'connector' = 'print' >>>>>>> )""") >>>>>>> >>>>>>> # execute query >>>>>>> >>>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result() >>>>>>> >>>>>>> Best, >>>>>>> Wei >>>>>>> >>>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>> 写道: >>>>>>> >>>>>>> Hi Wei, >>>>>>> >>>>>>> True, I'm using the method you mention, but glad to change. >>>>>>> I tried your suggestion instead, but got a similar error. >>>>>>> >>>>>>> Thanks for your support. That is much more tedious than I thought. >>>>>>> >>>>>>> *Option 1 - SQL UDF* >>>>>>> >>>>>>> *SQL UDF* >>>>>>> create_func_ddl = """ >>>>>>> CREATE FUNCTION dummyMap >>>>>>> AS 'com.dummy.dummyMap' LANGUAGE SCALA >>>>>>> """ >>>>>>> >>>>>>> t_env.execute_sql(create_func_ddl) >>>>>>> >>>>>>> *Error* >>>>>>> Py4JJavaError: An error occurred while calling o672.execute. >>>>>>> : org.apache.flink.table.api.TableException: Result field does not >>>>>>> match requested type. Requested: Row(s: String, t: String); Actual: >>>>>>> GenericType<org.apache.flink.types.Row> >>>>>>> >>>>>>> *Option 2 *- *Overriding getResultType* >>>>>>> >>>>>>> Back to the old registering method, but overriding getResultType: >>>>>>> >>>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap") >>>>>>> >>>>>>> *Scala UDF* >>>>>>> class dummyMap() extends ScalarFunction { >>>>>>> >>>>>>> def eval(): Row = { >>>>>>> >>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>> java.lang.String.valueOf("bar")) >>>>>>> >>>>>>> } >>>>>>> >>>>>>> override def getResultType(signature: Array[Class[_]]): >>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>>>> } >>>>>>> >>>>>>> *Error (on compilation)* >>>>>>> >>>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with >>>>>>> alternatives: >>>>>>> [error] (x$1: >>>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType >>>>>>> <and> >>>>>>> [error] ()org.apache.flink.table.types.DataType <and> >>>>>>> [error] (x$1: >>>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType >>>>>>> [error] cannot be applied to >>>>>>> (org.apache.flink.table.types.DataType, >>>>>>> org.apache.flink.table.types.DataType) >>>>>>> [error] override def getResultType(signature: Array[Class[_]]): >>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>>>> [error] >>>>>>> ^ >>>>>>> [error] one error found >>>>>>> [error] (Compile / compileIncremental) Compilation failed >>>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01 >>>>>>> >>>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a >>>>>>> écrit : >>>>>>> >>>>>>>> Hi Pierre, >>>>>>>> >>>>>>>> I guess your UDF is registered by the method >>>>>>>> 'register_java_function' which uses the old type system. In this >>>>>>>> situation >>>>>>>> you need to override the 'getResultType' method instead of adding type >>>>>>>> hint. >>>>>>>> >>>>>>>> You can also try to register your UDF via the "CREATE FUNCTION" sql >>>>>>>> statement, which accepts the type hint. >>>>>>>> >>>>>>>> Best, >>>>>>>> Wei >>>>>>>> >>>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>>> 写道: >>>>>>>> >>>>>>>> Hi Wei, >>>>>>>> >>>>>>>> Thanks for your suggestion. Same error. >>>>>>>> >>>>>>>> *Scala UDF* >>>>>>>> >>>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>")) >>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>> def eval(): Row = { >>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> Best regards, >>>>>>>> >>>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a >>>>>>>> écrit : >>>>>>>> >>>>>>>>> Hi Pierre, >>>>>>>>> >>>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t >>>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s >>>>>>>>> STRING,t >>>>>>>>> STRING>”))' >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Wei >>>>>>>>> >>>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>>>> 写道: >>>>>>>>> >>>>>>>>> Hi Dian, Community, >>>>>>>>> >>>>>>>>> (bringing the thread back to wider audience) >>>>>>>>> >>>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead >>>>>>>>> of Map but also this simple case leads to a type mismatch between >>>>>>>>> UDF and Table API. >>>>>>>>> I've also tried other Map objects from Flink (table.data.MapData, >>>>>>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to >>>>>>>>> Java >>>>>>>>> (java.util.Map) in combination with DataTypeHint, without success. >>>>>>>>> N.B. I'm using version 1.11. >>>>>>>>> >>>>>>>>> Am I doing something wrong or am I facing limitations in the >>>>>>>>> toolkit ? >>>>>>>>> >>>>>>>>> Thanks in advance for your support ! >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> >>>>>>>>> *Scala UDF* >>>>>>>>> >>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>> >>>>>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>>>>> def eval(): Row = { >>>>>>>>> >>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>> >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> *Table DDL* >>>>>>>>> >>>>>>>>> my_sink_ddl = f""" >>>>>>>>> create table mySink ( >>>>>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>>>>> ) with ( >>>>>>>>> ... >>>>>>>>> ) >>>>>>>>> """ >>>>>>>>> >>>>>>>>> *Error* >>>>>>>>> >>>>>>>>> Py4JJavaError: An error occurred while calling o2.execute. >>>>>>>>> : org.apache.flink.table.api.ValidationException: Field types of >>>>>>>>> query result and registered TableSink >>>>>>>>> `default_catalog`.`default_database`.`mySink` do not match. >>>>>>>>> Query result schema: [output_of_my_scala_udf: >>>>>>>>> GenericType<org.apache.flink.types.Row>] >>>>>>>>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: >>>>>>>>> String)] >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer < >>>>>>>>> pierre.oberhol...@gmail.com> a écrit : >>>>>>>>> >>>>>>>>>> Thanks Dian, but same error when using explicit returned type: >>>>>>>>>> >>>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>>> >>>>>>>>>> def eval() : util.Map[java.lang.String,java.lang.String] = { >>>>>>>>>> >>>>>>>>>> val states = Map("key1" -> "val1", "key2" -> "val2") >>>>>>>>>> >>>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a >>>>>>>>>> écrit : >>>>>>>>>> >>>>>>>>>>> You need to explicitly defined the result type the UDF. You >>>>>>>>>>> could refer to [1] for more details if you are using Flink 1.11. If >>>>>>>>>>> you are >>>>>>>>>>> using other versions of Flink, you need to refer to the >>>>>>>>>>> corresponding >>>>>>>>>>> documentation. >>>>>>>>>>> >>>>>>>>>>> [1] >>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >>>>>>>>>>> >>>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer < >>>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>>> >>>>>>>>>>> ScalarFunction >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Pierre >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Pierre >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Pierre >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Pierre >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Pierre >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Pierre >>>>> >>>> >>>> >>>> -- >>>> Pierre >>>> >>>> >>>> >>> >>> -- >>> Pierre >>> >> -- > Pierre >