Hi Xingbo, Nice ! This looks a bit hacky, but shows that it can be done ;)
I just got an exception preventing me running your code, apparently from udf.py: TypeError: Invalid input_type: input_type should be DataType but contains None Can you pls check again ? If the schema is defined is a .avsc file, do we have to parse it and rebuild those syntax (ddl and udf) and or is there an existing component that could be used ? Thanks a lot ! Best, Le mer. 2 déc. 2020 à 04:50, Xingbo Huang <hxbks...@gmail.com> a écrit : > Hi Pierre, > > I wrote a PyFlink implementation, you can see if it meets your needs: > > > from pyflink.datastream import StreamExecutionEnvironment > from pyflink.table import StreamTableEnvironment, EnvironmentSettings, > DataTypes > from pyflink.table.udf import udf > > > def test(): > env = StreamExecutionEnvironment.get_execution_environment() > env.set_parallelism(1) > t_env = StreamTableEnvironment.create(env, > > environment_settings=EnvironmentSettings.new_instance() > > .in_streaming_mode().use_blink_planner().build()) > > t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", > '80m') > > # 10k nested columns > num_field = 10_000 > fields = ['f%s INT' % i for i in range(num_field)] > field_str = ','.join(fields) > t_env.execute_sql(f""" > CREATE TABLE source_table ( > f0 BIGINT, > f1 DECIMAL(32,2), > f2 ROW<${field_str}>, > f3 TIMESTAMP(3) > ) WITH ( > 'connector' = 'datagen', > 'number-of-rows' = '2' > ) > """) > > t_env.execute_sql(f""" > CREATE TABLE print_table ( > f0 BIGINT, > f1 DECIMAL(32,2), > f2 ROW<${field_str}>, > f3 TIMESTAMP(3) > ) WITH ( > 'connector' = 'print' > ) > """) > result_type = DataTypes.ROW( > [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in > range(num_field)]) > > func = udf(lambda x: x, result_type=result_type) > > source = t_env.from_path("source_table") > result = source.select(source.f0, source.f1, func(source.f2), > source.f3) > result.execute_insert("print_table") > > > if __name__ == '__main__': > test() > > > Best, > Xingbo > > Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 下午6:10写道: > >> Hi Xingbo, >> >> That would mean giving up on using Flink (table) features on the content >> of the parsed JSON objects, so definitely a big loss. Let me know if I >> missed something. >> >> Thanks ! >> >> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang <hxbks...@gmail.com> a écrit : >> >>> Hi Pierre, >>> >>> Have you ever thought of declaring your entire json as a string field in >>> `Table` and putting the parsing work in UDF? >>> >>> Best, >>> Xingbo >>> >>> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 上午4:13写道: >>> >>>> Hi Xingbo, >>>> >>>> Many thanks for your follow up. Yes you got it right. >>>> So using Table API and a ROW object for the nested output of my UDF, >>>> and since types are mandatory, I guess this boils down to: >>>> - How to nicely specify the types for the 100k fields : shall I use >>>> TypeInformation [1] or better retrieve it from Schema Registry [2] ? >>>> - Do I have to put NULL values for all the fields that don't have a >>>> value in my JSON ? >>>> - Will the resulting Table be "sparse" and suffer performance >>>> limitations ? >>>> Let me know if Table API and ROW are the right candidates here, or if >>>> other better alternatives exist. >>>> As said I'd be glad to apply some downstream transformations using >>>> key,value access (and possibly some Table <-> Pandas operations). Hope that >>>> doesn't make it a too long wish list ;) >>>> >>>> Thanks a lot ! >>>> >>>> Best regards, >>>> >>>> [1] >>>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly >>>> [2] >>>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html >>>> >>>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a >>>> écrit : >>>> >>>>> Hi Pierre, >>>>> >>>>> Sorry for the late reply. >>>>> Your requirement is that your `Table` has a `field` in `Json` format >>>>> and its key has reached 100k, and then you want to use such a `field` as >>>>> the input/output of `udf`, right? As to whether there is a limit on the >>>>> number of nested key, I am not quite clear. Other contributors with >>>>> experience in this area may have answers. On the part of `Python UDF`, if >>>>> the type of key or value of your `Map` is `Any`, we do not support it now. >>>>> You need to specify a specific type. For more information, please refer to >>>>> the related document[1]. >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html >>>>> >>>>> Best, >>>>> Xingbo >>>>> >>>>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>> 写道: >>>>> >>>>> Hello Wei, Dian, Xingbo, >>>>> >>>>> Not really sure when it is appropriate to knock on the door of the >>>>> community ;) >>>>> I just wanted to mention that your feedback on the above topic will be >>>>> highly appreciated as it will condition the choice of framework on our >>>>> side >>>>> for the months to come, and potentially help the community to cover sparse >>>>> data with Flink. >>>>> >>>>> Thanks a lot ! >>>>> >>>>> Have a great week-end >>>>> >>>>> Best, >>>>> >>>>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer < >>>>> pierre.oberhol...@gmail.com> a écrit : >>>>> >>>>>> Hi Wei, >>>>>> >>>>>> Thanks for the hint. May I please follow up by adding more context >>>>>> and ask for your guidance. >>>>>> >>>>>> In case the bespoken Map[String,Any] object returned by Scala: >>>>>> >>>>>> - Has a defined schema (incl. nested) with up to 100k (!) different >>>>>> possible keys >>>>>> - Has only some portion of the keys populated for each record >>>>>> - Is convertible to JSON >>>>>> - Has to undergo downstream processing in Flink and/or Python UDF >>>>>> with key value access >>>>>> - Has to be ultimately stored in a Kafka/AVRO sink >>>>>> >>>>>> How would you declare the types explicitly in such a case ? >>>>>> >>>>>> Thanks for your support ! >>>>>> >>>>>> Pierre >>>>>> >>>>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a >>>>>> écrit : >>>>>> >>>>>>> Hi Pierre, >>>>>>> >>>>>>> Currently there is no type hint like ‘Map[String, Any]’. The >>>>>>> recommended way is declaring your type more explicitly. >>>>>>> >>>>>>> If you insist on doing this, you can try to declaring a RAW data >>>>>>> type for java.util.HashMap [1], but you may encounter some troubles [2] >>>>>>> related to the kryo serializers. >>>>>>> >>>>>>> Best, >>>>>>> Wei >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw >>>>>>> [2] >>>>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class >>>>>>> >>>>>>> >>>>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>> 写道: >>>>>>> >>>>>>> Hi Wei, >>>>>>> >>>>>>> It works ! Thanks a lot for your support. >>>>>>> I hadn't tried this last combination for option 1, and I had wrong >>>>>>> syntax for option 2. >>>>>>> >>>>>>> So to summarize.. >>>>>>> >>>>>>> Methods working: >>>>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering >>>>>>> - Outdated: override getResultType in UDF definition >>>>>>> + t_env.register_java_function for UDF registering >>>>>>> >>>>>>> Type conversions working: >>>>>>> - scala.collection.immutable.Map[String,String] => >>>>>>> org.apache.flink.types.Row => ROW<STRING,STRING> >>>>>>> - scala.collection.immutable.Map[String,String] => >>>>>>> java.util.Map[String,String] => MAP<STRING,STRING> >>>>>>> >>>>>>> Any hint for Map[String,Any] ? >>>>>>> >>>>>>> Best regards, >>>>>>> >>>>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a >>>>>>> écrit : >>>>>>> >>>>>>>> Hi Pierre, >>>>>>>> >>>>>>>> Those 2 approaches all work in my local machine, this is my code: >>>>>>>> >>>>>>>> Scala UDF: >>>>>>>> >>>>>>>> package com.dummy >>>>>>>> >>>>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation >>>>>>>> import org.apache.flink.table.annotation.DataTypeHint >>>>>>>> import org.apache.flink.table.api.Types >>>>>>>> import org.apache.flink.table.functions.ScalarFunction >>>>>>>> import org.apache.flink.types.Row >>>>>>>> >>>>>>>> /** >>>>>>>> * The scala UDF. >>>>>>>> */ >>>>>>>> class dummyMap extends ScalarFunction { >>>>>>>> >>>>>>>> // If the udf would be registered by the SQL statement, you need add >>>>>>>> this typehint >>>>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>>>> def eval(): Row = { >>>>>>>> >>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> // If the udf would be registered by the method >>>>>>>> 'register_java_function', you need override this >>>>>>>> // method. >>>>>>>> override def getResultType(signature: Array[Class[_]]): >>>>>>>> TypeInformation[_] = { >>>>>>>> // The type of the return values should be TypeInformation >>>>>>>> Types.ROW(Array("s", "t"), >>>>>>>> Array[TypeInformation[_]](Types.STRING(), Types.STRING())) >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> Python code: >>>>>>>> >>>>>>>> from pyflink.datastream import StreamExecutionEnvironment >>>>>>>> from pyflink.table import StreamTableEnvironment >>>>>>>> >>>>>>>> s_env = StreamExecutionEnvironment.get_execution_environment() >>>>>>>> st_env = StreamTableEnvironment.create(s_env) >>>>>>>> >>>>>>>> # load the scala udf jar file, the path should be modified to yours >>>>>>>> # or your can also load the jar file via other approaches >>>>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars", >>>>>>>> "file:///Users/zhongwei/the-dummy-udf.jar") >>>>>>>> >>>>>>>> # register the udf via >>>>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS >>>>>>>> 'com.dummy.dummyMap' LANGUAGE SCALA") >>>>>>>> # or register via the method >>>>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap") >>>>>>>> >>>>>>>> # prepare source and sink >>>>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], >>>>>>>> ['a', 'b', 'c']) >>>>>>>> st_env.execute_sql("""create table mySink ( >>>>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>>>> ) with ( >>>>>>>> 'connector' = 'print' >>>>>>>> )""") >>>>>>>> >>>>>>>> # execute query >>>>>>>> >>>>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result() >>>>>>>> >>>>>>>> Best, >>>>>>>> Wei >>>>>>>> >>>>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>>> 写道: >>>>>>>> >>>>>>>> Hi Wei, >>>>>>>> >>>>>>>> True, I'm using the method you mention, but glad to change. >>>>>>>> I tried your suggestion instead, but got a similar error. >>>>>>>> >>>>>>>> Thanks for your support. That is much more tedious than I thought. >>>>>>>> >>>>>>>> *Option 1 - SQL UDF* >>>>>>>> >>>>>>>> *SQL UDF* >>>>>>>> create_func_ddl = """ >>>>>>>> CREATE FUNCTION dummyMap >>>>>>>> AS 'com.dummy.dummyMap' LANGUAGE SCALA >>>>>>>> """ >>>>>>>> >>>>>>>> t_env.execute_sql(create_func_ddl) >>>>>>>> >>>>>>>> *Error* >>>>>>>> Py4JJavaError: An error occurred while calling o672.execute. >>>>>>>> : org.apache.flink.table.api.TableException: Result field does not >>>>>>>> match requested type. Requested: Row(s: String, t: String); Actual: >>>>>>>> GenericType<org.apache.flink.types.Row> >>>>>>>> >>>>>>>> *Option 2 *- *Overriding getResultType* >>>>>>>> >>>>>>>> Back to the old registering method, but overriding getResultType: >>>>>>>> >>>>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap") >>>>>>>> >>>>>>>> *Scala UDF* >>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>> >>>>>>>> def eval(): Row = { >>>>>>>> >>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>> >>>>>>>> } >>>>>>>> >>>>>>>> override def getResultType(signature: Array[Class[_]]): >>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>>>>> } >>>>>>>> >>>>>>>> *Error (on compilation)* >>>>>>>> >>>>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with >>>>>>>> alternatives: >>>>>>>> [error] (x$1: >>>>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType >>>>>>>> <and> >>>>>>>> [error] ()org.apache.flink.table.types.DataType <and> >>>>>>>> [error] (x$1: >>>>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType >>>>>>>> [error] cannot be applied to >>>>>>>> (org.apache.flink.table.types.DataType, >>>>>>>> org.apache.flink.table.types.DataType) >>>>>>>> [error] override def getResultType(signature: Array[Class[_]]): >>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>>>>> [error] >>>>>>>> ^ >>>>>>>> [error] one error found >>>>>>>> [error] (Compile / compileIncremental) Compilation failed >>>>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01 >>>>>>>> >>>>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a >>>>>>>> écrit : >>>>>>>> >>>>>>>>> Hi Pierre, >>>>>>>>> >>>>>>>>> I guess your UDF is registered by the method >>>>>>>>> 'register_java_function' which uses the old type system. In this >>>>>>>>> situation >>>>>>>>> you need to override the 'getResultType' method instead of adding type >>>>>>>>> hint. >>>>>>>>> >>>>>>>>> You can also try to register your UDF via the "CREATE FUNCTION" >>>>>>>>> sql statement, which accepts the type hint. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Wei >>>>>>>>> >>>>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>>>> 写道: >>>>>>>>> >>>>>>>>> Hi Wei, >>>>>>>>> >>>>>>>>> Thanks for your suggestion. Same error. >>>>>>>>> >>>>>>>>> *Scala UDF* >>>>>>>>> >>>>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>")) >>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>> def eval(): Row = { >>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> >>>>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> >>>>>>>>> a écrit : >>>>>>>>> >>>>>>>>>> Hi Pierre, >>>>>>>>>> >>>>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t >>>>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s >>>>>>>>>> STRING,t >>>>>>>>>> STRING>”))' >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Wei >>>>>>>>>> >>>>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer < >>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>> >>>>>>>>>> Hi Dian, Community, >>>>>>>>>> >>>>>>>>>> (bringing the thread back to wider audience) >>>>>>>>>> >>>>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead >>>>>>>>>> of Map but also this simple case leads to a type mismatch >>>>>>>>>> between UDF and Table API. >>>>>>>>>> I've also tried other Map objects from Flink (table.data.MapData, >>>>>>>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to >>>>>>>>>> Java >>>>>>>>>> (java.util.Map) in combination with DataTypeHint, >>>>>>>>>> without success. >>>>>>>>>> N.B. I'm using version 1.11. >>>>>>>>>> >>>>>>>>>> Am I doing something wrong or am I facing limitations in the >>>>>>>>>> toolkit ? >>>>>>>>>> >>>>>>>>>> Thanks in advance for your support ! >>>>>>>>>> >>>>>>>>>> Best regards, >>>>>>>>>> >>>>>>>>>> *Scala UDF* >>>>>>>>>> >>>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>>> >>>>>>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>>>>>> def eval(): Row = { >>>>>>>>>> >>>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> *Table DDL* >>>>>>>>>> >>>>>>>>>> my_sink_ddl = f""" >>>>>>>>>> create table mySink ( >>>>>>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>>>>>> ) with ( >>>>>>>>>> ... >>>>>>>>>> ) >>>>>>>>>> """ >>>>>>>>>> >>>>>>>>>> *Error* >>>>>>>>>> >>>>>>>>>> Py4JJavaError: An error occurred while calling o2.execute. >>>>>>>>>> : org.apache.flink.table.api.ValidationException: Field types of >>>>>>>>>> query result and registered TableSink >>>>>>>>>> `default_catalog`.`default_database`.`mySink` do not match. >>>>>>>>>> Query result schema: [output_of_my_scala_udf: >>>>>>>>>> GenericType<org.apache.flink.types.Row>] >>>>>>>>>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: >>>>>>>>>> String)] >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer < >>>>>>>>>> pierre.oberhol...@gmail.com> a écrit : >>>>>>>>>> >>>>>>>>>>> Thanks Dian, but same error when using explicit returned type: >>>>>>>>>>> >>>>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>>>> >>>>>>>>>>> def eval() : util.Map[java.lang.String,java.lang.String] = { >>>>>>>>>>> >>>>>>>>>>> val states = Map("key1" -> "val1", "key2" -> "val2") >>>>>>>>>>> >>>>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] >>>>>>>>>>> >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a >>>>>>>>>>> écrit : >>>>>>>>>>> >>>>>>>>>>>> You need to explicitly defined the result type the UDF. You >>>>>>>>>>>> could refer to [1] for more details if you are using Flink 1.11. >>>>>>>>>>>> If you are >>>>>>>>>>>> using other versions of Flink, you need to refer to the >>>>>>>>>>>> corresponding >>>>>>>>>>>> documentation. >>>>>>>>>>>> >>>>>>>>>>>> [1] >>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >>>>>>>>>>>> >>>>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer < >>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>>>> >>>>>>>>>>>> ScalarFunction >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Pierre >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Pierre >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Pierre >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Pierre >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Pierre >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Pierre >>>>>> >>>>> >>>>> >>>>> -- >>>>> Pierre >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Pierre >>>> >>> -- >> Pierre >> > -- Pierre