Hi Xingbo, That would mean giving up on using Flink (table) features on the content of the parsed JSON objects, so definitely a big loss. Let me know if I missed something.
Thanks ! Le mar. 1 déc. 2020 à 07:26, Xingbo Huang <hxbks...@gmail.com> a écrit : > Hi Pierre, > > Have you ever thought of declaring your entire json as a string field in > `Table` and putting the parsing work in UDF? > > Best, > Xingbo > > Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 上午4:13写道: > >> Hi Xingbo, >> >> Many thanks for your follow up. Yes you got it right. >> So using Table API and a ROW object for the nested output of my UDF, and >> since types are mandatory, I guess this boils down to: >> - How to nicely specify the types for the 100k fields : shall I use >> TypeInformation [1] or better retrieve it from Schema Registry [2] ? >> - Do I have to put NULL values for all the fields that don't have a value >> in my JSON ? >> - Will the resulting Table be "sparse" and suffer performance limitations >> ? >> Let me know if Table API and ROW are the right candidates here, or if >> other better alternatives exist. >> As said I'd be glad to apply some downstream transformations using >> key,value access (and possibly some Table <-> Pandas operations). Hope that >> doesn't make it a too long wish list ;) >> >> Thanks a lot ! >> >> Best regards, >> >> [1] >> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly >> [2] >> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html >> >> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a écrit : >> >>> Hi Pierre, >>> >>> Sorry for the late reply. >>> Your requirement is that your `Table` has a `field` in `Json` format and >>> its key has reached 100k, and then you want to use such a `field` as the >>> input/output of `udf`, right? As to whether there is a limit on the number >>> of nested key, I am not quite clear. Other contributors with experience in >>> this area may have answers. On the part of `Python UDF`, if the type of key >>> or value of your `Map` is `Any`, we do not support it now. You need to >>> specify a specific type. For more information, please refer to the related >>> document[1]. >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html >>> >>> Best, >>> Xingbo >>> >>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道: >>> >>> Hello Wei, Dian, Xingbo, >>> >>> Not really sure when it is appropriate to knock on the door of the >>> community ;) >>> I just wanted to mention that your feedback on the above topic will be >>> highly appreciated as it will condition the choice of framework on our side >>> for the months to come, and potentially help the community to cover sparse >>> data with Flink. >>> >>> Thanks a lot ! >>> >>> Have a great week-end >>> >>> Best, >>> >>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer < >>> pierre.oberhol...@gmail.com> a écrit : >>> >>>> Hi Wei, >>>> >>>> Thanks for the hint. May I please follow up by adding more context and >>>> ask for your guidance. >>>> >>>> In case the bespoken Map[String,Any] object returned by Scala: >>>> >>>> - Has a defined schema (incl. nested) with up to 100k (!) different >>>> possible keys >>>> - Has only some portion of the keys populated for each record >>>> - Is convertible to JSON >>>> - Has to undergo downstream processing in Flink and/or Python UDF with >>>> key value access >>>> - Has to be ultimately stored in a Kafka/AVRO sink >>>> >>>> How would you declare the types explicitly in such a case ? >>>> >>>> Thanks for your support ! >>>> >>>> Pierre >>>> >>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a >>>> écrit : >>>> >>>>> Hi Pierre, >>>>> >>>>> Currently there is no type hint like ‘Map[String, Any]’. The >>>>> recommended way is declaring your type more explicitly. >>>>> >>>>> If you insist on doing this, you can try to declaring a RAW data type >>>>> for java.util.HashMap [1], but you may encounter some troubles [2] related >>>>> to the kryo serializers. >>>>> >>>>> Best, >>>>> Wei >>>>> >>>>> [1] >>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw >>>>> [2] >>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class >>>>> >>>>> >>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>> 写道: >>>>> >>>>> Hi Wei, >>>>> >>>>> It works ! Thanks a lot for your support. >>>>> I hadn't tried this last combination for option 1, and I had wrong >>>>> syntax for option 2. >>>>> >>>>> So to summarize.. >>>>> >>>>> Methods working: >>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering >>>>> - Outdated: override getResultType in UDF definition >>>>> + t_env.register_java_function for UDF registering >>>>> >>>>> Type conversions working: >>>>> - scala.collection.immutable.Map[String,String] => >>>>> org.apache.flink.types.Row => ROW<STRING,STRING> >>>>> - scala.collection.immutable.Map[String,String] => >>>>> java.util.Map[String,String] => MAP<STRING,STRING> >>>>> >>>>> Any hint for Map[String,Any] ? >>>>> >>>>> Best regards, >>>>> >>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a >>>>> écrit : >>>>> >>>>>> Hi Pierre, >>>>>> >>>>>> Those 2 approaches all work in my local machine, this is my code: >>>>>> >>>>>> Scala UDF: >>>>>> >>>>>> package com.dummy >>>>>> >>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation >>>>>> import org.apache.flink.table.annotation.DataTypeHint >>>>>> import org.apache.flink.table.api.Types >>>>>> import org.apache.flink.table.functions.ScalarFunction >>>>>> import org.apache.flink.types.Row >>>>>> >>>>>> /** >>>>>> * The scala UDF. >>>>>> */ >>>>>> class dummyMap extends ScalarFunction { >>>>>> >>>>>> // If the udf would be registered by the SQL statement, you need add >>>>>> this typehint >>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>> def eval(): Row = { >>>>>> >>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>> java.lang.String.valueOf("bar")) >>>>>> >>>>>> } >>>>>> >>>>>> // If the udf would be registered by the method >>>>>> 'register_java_function', you need override this >>>>>> // method. >>>>>> override def getResultType(signature: Array[Class[_]]): >>>>>> TypeInformation[_] = { >>>>>> // The type of the return values should be TypeInformation >>>>>> Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), >>>>>> Types.STRING())) >>>>>> } >>>>>> } >>>>>> >>>>>> Python code: >>>>>> >>>>>> from pyflink.datastream import StreamExecutionEnvironment >>>>>> from pyflink.table import StreamTableEnvironment >>>>>> >>>>>> s_env = StreamExecutionEnvironment.get_execution_environment() >>>>>> st_env = StreamTableEnvironment.create(s_env) >>>>>> >>>>>> # load the scala udf jar file, the path should be modified to yours >>>>>> # or your can also load the jar file via other approaches >>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars", " >>>>>> file:///Users/zhongwei/the-dummy-udf.jar") >>>>>> >>>>>> # register the udf via >>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap' >>>>>> LANGUAGE SCALA") >>>>>> # or register via the method >>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap") >>>>>> >>>>>> # prepare source and sink >>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], >>>>>> ['a', 'b', 'c']) >>>>>> st_env.execute_sql("""create table mySink ( >>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>> ) with ( >>>>>> 'connector' = 'print' >>>>>> )""") >>>>>> >>>>>> # execute query >>>>>> >>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result() >>>>>> >>>>>> Best, >>>>>> Wei >>>>>> >>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>> 写道: >>>>>> >>>>>> Hi Wei, >>>>>> >>>>>> True, I'm using the method you mention, but glad to change. >>>>>> I tried your suggestion instead, but got a similar error. >>>>>> >>>>>> Thanks for your support. That is much more tedious than I thought. >>>>>> >>>>>> *Option 1 - SQL UDF* >>>>>> >>>>>> *SQL UDF* >>>>>> create_func_ddl = """ >>>>>> CREATE FUNCTION dummyMap >>>>>> AS 'com.dummy.dummyMap' LANGUAGE SCALA >>>>>> """ >>>>>> >>>>>> t_env.execute_sql(create_func_ddl) >>>>>> >>>>>> *Error* >>>>>> Py4JJavaError: An error occurred while calling o672.execute. >>>>>> : org.apache.flink.table.api.TableException: Result field does not >>>>>> match requested type. Requested: Row(s: String, t: String); Actual: >>>>>> GenericType<org.apache.flink.types.Row> >>>>>> >>>>>> *Option 2 *- *Overriding getResultType* >>>>>> >>>>>> Back to the old registering method, but overriding getResultType: >>>>>> >>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap") >>>>>> >>>>>> *Scala UDF* >>>>>> class dummyMap() extends ScalarFunction { >>>>>> >>>>>> def eval(): Row = { >>>>>> >>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>> java.lang.String.valueOf("bar")) >>>>>> >>>>>> } >>>>>> >>>>>> override def getResultType(signature: Array[Class[_]]): >>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>>> } >>>>>> >>>>>> *Error (on compilation)* >>>>>> >>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with >>>>>> alternatives: >>>>>> [error] (x$1: >>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType >>>>>> <and> >>>>>> [error] ()org.apache.flink.table.types.DataType <and> >>>>>> [error] (x$1: >>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType >>>>>> [error] cannot be applied to >>>>>> (org.apache.flink.table.types.DataType, >>>>>> org.apache.flink.table.types.DataType) >>>>>> [error] override def getResultType(signature: Array[Class[_]]): >>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>>> [error] >>>>>> ^ >>>>>> [error] one error found >>>>>> [error] (Compile / compileIncremental) Compilation failed >>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01 >>>>>> >>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a >>>>>> écrit : >>>>>> >>>>>>> Hi Pierre, >>>>>>> >>>>>>> I guess your UDF is registered by the method >>>>>>> 'register_java_function' which uses the old type system. In this >>>>>>> situation >>>>>>> you need to override the 'getResultType' method instead of adding type >>>>>>> hint. >>>>>>> >>>>>>> You can also try to register your UDF via the "CREATE FUNCTION" sql >>>>>>> statement, which accepts the type hint. >>>>>>> >>>>>>> Best, >>>>>>> Wei >>>>>>> >>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>> 写道: >>>>>>> >>>>>>> Hi Wei, >>>>>>> >>>>>>> Thanks for your suggestion. Same error. >>>>>>> >>>>>>> *Scala UDF* >>>>>>> >>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>")) >>>>>>> class dummyMap() extends ScalarFunction { >>>>>>> def eval(): Row = { >>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>> java.lang.String.valueOf("bar")) >>>>>>> } >>>>>>> } >>>>>>> >>>>>>> Best regards, >>>>>>> >>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a >>>>>>> écrit : >>>>>>> >>>>>>>> Hi Pierre, >>>>>>>> >>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t >>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s >>>>>>>> STRING,t >>>>>>>> STRING>”))' >>>>>>>> >>>>>>>> Best, >>>>>>>> Wei >>>>>>>> >>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>>> 写道: >>>>>>>> >>>>>>>> Hi Dian, Community, >>>>>>>> >>>>>>>> (bringing the thread back to wider audience) >>>>>>>> >>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead >>>>>>>> of Map but also this simple case leads to a type mismatch between >>>>>>>> UDF and Table API. >>>>>>>> I've also tried other Map objects from Flink (table.data.MapData, >>>>>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to >>>>>>>> Java >>>>>>>> (java.util.Map) in combination with DataTypeHint, without success. >>>>>>>> N.B. I'm using version 1.11. >>>>>>>> >>>>>>>> Am I doing something wrong or am I facing limitations in the >>>>>>>> toolkit ? >>>>>>>> >>>>>>>> Thanks in advance for your support ! >>>>>>>> >>>>>>>> Best regards, >>>>>>>> >>>>>>>> *Scala UDF* >>>>>>>> >>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>> >>>>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>>>> def eval(): Row = { >>>>>>>> >>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>> >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> *Table DDL* >>>>>>>> >>>>>>>> my_sink_ddl = f""" >>>>>>>> create table mySink ( >>>>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>>>> ) with ( >>>>>>>> ... >>>>>>>> ) >>>>>>>> """ >>>>>>>> >>>>>>>> *Error* >>>>>>>> >>>>>>>> Py4JJavaError: An error occurred while calling o2.execute. >>>>>>>> : org.apache.flink.table.api.ValidationException: Field types of >>>>>>>> query result and registered TableSink >>>>>>>> `default_catalog`.`default_database`.`mySink` do not match. >>>>>>>> Query result schema: [output_of_my_scala_udf: >>>>>>>> GenericType<org.apache.flink.types.Row>] >>>>>>>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: >>>>>>>> String)] >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer < >>>>>>>> pierre.oberhol...@gmail.com> a écrit : >>>>>>>> >>>>>>>>> Thanks Dian, but same error when using explicit returned type: >>>>>>>>> >>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>> >>>>>>>>> def eval() : util.Map[java.lang.String,java.lang.String] = { >>>>>>>>> >>>>>>>>> val states = Map("key1" -> "val1", "key2" -> "val2") >>>>>>>>> >>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] >>>>>>>>> >>>>>>>>> } >>>>>>>>> } >>>>>>>>> >>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a >>>>>>>>> écrit : >>>>>>>>> >>>>>>>>>> You need to explicitly defined the result type the UDF. You could >>>>>>>>>> refer to [1] for more details if you are using Flink 1.11. If you >>>>>>>>>> are using >>>>>>>>>> other versions of Flink, you need to refer to the corresponding >>>>>>>>>> documentation. >>>>>>>>>> >>>>>>>>>> [1] >>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >>>>>>>>>> >>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer < >>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>> >>>>>>>>>> ScalarFunction >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Pierre >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Pierre >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Pierre >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Pierre >>>>>> >>>>>> >>>>>> >>>>> >>>>> -- >>>>> Pierre >>>>> >>>>> >>>>> >>>> >>>> -- >>>> Pierre >>>> >>> >>> >>> -- >>> Pierre >>> >>> >>> >> >> -- >> Pierre >> > -- Pierre