Hi Xingbo,

That would mean giving up on using Flink (table) features on the content of
the parsed JSON objects, so definitely a big loss. Let me know if I missed
something.

Thanks !

Le mar. 1 déc. 2020 à 07:26, Xingbo Huang <hxbks...@gmail.com> a écrit :

> Hi Pierre,
>
> Have you ever thought of declaring your entire json as a string field in
> `Table` and putting the parsing work in UDF?
>
> Best,
> Xingbo
>
> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 上午4:13写道:
>
>> Hi Xingbo,
>>
>> Many thanks for your follow up. Yes you got it right.
>> So using Table API and a ROW object for the nested output of my UDF, and
>> since types are mandatory, I guess this boils down to:
>> - How to nicely specify the types for the 100k fields : shall I use
>> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
>> - Do I have to put NULL values for all the fields that don't have a value
>> in my JSON ?
>> - Will the resulting Table be "sparse" and suffer performance limitations
>> ?
>> Let me know if Table API and ROW are the right candidates here, or if
>> other better alternatives exist.
>> As said I'd be glad to apply some downstream transformations using
>> key,value access (and possibly some Table <-> Pandas operations). Hope that
>> doesn't make it a too long wish list ;)
>>
>> Thanks a lot !
>>
>> Best regards,
>>
>> [1]
>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
>> [2]
>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>>
>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a écrit :
>>
>>> Hi Pierre,
>>>
>>> Sorry for the late reply.
>>> Your requirement is that your `Table` has a `field` in `Json` format and
>>> its key has reached 100k, and then you want to use such a `field` as the
>>> input/output of `udf`, right? As to whether there is a limit on the number
>>> of nested key, I am not quite clear. Other contributors with experience in
>>> this area may have answers. On the part of `Python UDF`, if the type of key
>>> or value of your `Map` is `Any`, we do not support it now. You need to
>>> specify a specific type. For more information, please refer to the related
>>> document[1].
>>>
>>> [1]
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>>>
>>> Best,
>>> Xingbo
>>>
>>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>>>
>>> Hello Wei, Dian, Xingbo,
>>>
>>> Not really sure when it is appropriate to knock on the door of the
>>> community ;)
>>> I just wanted to mention that your feedback on the above topic will be
>>> highly appreciated as it will condition the choice of framework on our side
>>> for the months to come, and potentially help the community to cover sparse
>>> data with Flink.
>>>
>>> Thanks a lot !
>>>
>>> Have a great week-end
>>>
>>> Best,
>>>
>>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
>>> pierre.oberhol...@gmail.com> a écrit :
>>>
>>>> Hi Wei,
>>>>
>>>> Thanks for the hint. May I please follow up by adding more context and
>>>> ask for your guidance.
>>>>
>>>> In case the bespoken Map[String,Any] object returned by Scala:
>>>>
>>>> - Has a defined schema (incl. nested) with up to 100k (!) different
>>>> possible keys
>>>> - Has only some portion of the keys populated for each record
>>>> - Is convertible to JSON
>>>> - Has to undergo downstream processing in Flink and/or Python UDF with
>>>> key value access
>>>> - Has to be ultimately stored in a Kafka/AVRO sink
>>>>
>>>> How would you declare the types explicitly in such a case ?
>>>>
>>>> Thanks for your support !
>>>>
>>>> Pierre
>>>>
>>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a
>>>> écrit :
>>>>
>>>>> Hi Pierre,
>>>>>
>>>>> Currently there is no type hint like ‘Map[String, Any]’. The
>>>>> recommended way is declaring your type more explicitly.
>>>>>
>>>>> If you insist on doing this, you can try to declaring a RAW data type
>>>>> for java.util.HashMap [1], but you may encounter some troubles [2] related
>>>>> to the kryo serializers.
>>>>>
>>>>> Best,
>>>>> Wei
>>>>>
>>>>> [1]
>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>>>>> [2]
>>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>>>>>
>>>>>
>>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>> 写道:
>>>>>
>>>>> Hi Wei,
>>>>>
>>>>> It works ! Thanks a lot for your support.
>>>>> I hadn't tried this last combination for option 1, and I had wrong
>>>>> syntax for option 2.
>>>>>
>>>>> So to summarize..
>>>>>
>>>>> Methods working:
>>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>>>>> - Outdated: override getResultType in UDF definition
>>>>> + t_env.register_java_function for UDF registering
>>>>>
>>>>> Type conversions working:
>>>>> - scala.collection.immutable.Map[String,String] =>
>>>>> org.apache.flink.types.Row => ROW<STRING,STRING>
>>>>> - scala.collection.immutable.Map[String,String] =>
>>>>> java.util.Map[String,String] => MAP<STRING,STRING>
>>>>>
>>>>> Any hint for Map[String,Any] ?
>>>>>
>>>>> Best regards,
>>>>>
>>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a
>>>>> écrit :
>>>>>
>>>>>> Hi Pierre,
>>>>>>
>>>>>> Those 2 approaches all work in my local machine, this is my code:
>>>>>>
>>>>>> Scala UDF:
>>>>>>
>>>>>> package com.dummy
>>>>>>
>>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation
>>>>>> import org.apache.flink.table.annotation.DataTypeHint
>>>>>> import org.apache.flink.table.api.Types
>>>>>> import org.apache.flink.table.functions.ScalarFunction
>>>>>> import org.apache.flink.types.Row
>>>>>>
>>>>>> /**
>>>>>>   * The scala UDF.
>>>>>>   */
>>>>>> class dummyMap extends ScalarFunction {
>>>>>>
>>>>>>   // If the udf would be registered by the SQL statement, you need add 
>>>>>> this typehint
>>>>>>   @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>   def eval(): Row = {
>>>>>>
>>>>>>     Row.of(java.lang.String.valueOf("foo"), 
>>>>>> java.lang.String.valueOf("bar"))
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>   // If the udf would be registered by the method 
>>>>>> 'register_java_function', you need override this
>>>>>>   // method.
>>>>>>   override def getResultType(signature: Array[Class[_]]): 
>>>>>> TypeInformation[_] = {
>>>>>>     // The type of the return values should be TypeInformation
>>>>>>     Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
>>>>>> Types.STRING()))
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> Python code:
>>>>>>
>>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>>> from pyflink.table import StreamTableEnvironment
>>>>>>
>>>>>> s_env = StreamExecutionEnvironment.get_execution_environment()
>>>>>> st_env = StreamTableEnvironment.create(s_env)
>>>>>>
>>>>>> # load the scala udf jar file, the path should be modified to yours
>>>>>> # or your can also load the jar file via other approaches
>>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars", "
>>>>>> file:///Users/zhongwei/the-dummy-udf.jar")
>>>>>>
>>>>>> # register the udf via
>>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap'
>>>>>> LANGUAGE SCALA")
>>>>>> # or register via the method
>>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>>>>>>
>>>>>> # prepare source and sink
>>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')],
>>>>>> ['a', 'b', 'c'])
>>>>>> st_env.execute_sql("""create table mySink (
>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>     ) with (
>>>>>>         'connector' = 'print'
>>>>>>     )""")
>>>>>>
>>>>>> # execute query
>>>>>>
>>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>>>>>>
>>>>>> Best,
>>>>>> Wei
>>>>>>
>>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>> 写道:
>>>>>>
>>>>>> Hi Wei,
>>>>>>
>>>>>> True, I'm using the method you mention, but glad to change.
>>>>>> I tried your suggestion instead, but got a similar error.
>>>>>>
>>>>>> Thanks for your support. That is much more tedious than I thought.
>>>>>>
>>>>>> *Option 1 - SQL UDF*
>>>>>>
>>>>>> *SQL UDF*
>>>>>> create_func_ddl = """
>>>>>> CREATE FUNCTION dummyMap
>>>>>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>>>>>> """
>>>>>>
>>>>>> t_env.execute_sql(create_func_ddl)
>>>>>>
>>>>>> *Error*
>>>>>> Py4JJavaError: An error occurred while calling o672.execute.
>>>>>> : org.apache.flink.table.api.TableException: Result field does not
>>>>>> match requested type. Requested: Row(s: String, t: String); Actual:
>>>>>> GenericType<org.apache.flink.types.Row>
>>>>>>
>>>>>> *Option 2 *- *Overriding getResultType*
>>>>>>
>>>>>> Back to the old registering method, but overriding getResultType:
>>>>>>
>>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>>>>>>
>>>>>> *Scala UDF*
>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>
>>>>>>   def eval(): Row = {
>>>>>>
>>>>>>       Row.of(java.lang.String.valueOf("foo"),
>>>>>> java.lang.String.valueOf("bar"))
>>>>>>
>>>>>>   }
>>>>>>
>>>>>>   override def getResultType(signature: Array[Class[_]]):
>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>>> }
>>>>>>
>>>>>> *Error (on compilation)*
>>>>>>
>>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with
>>>>>> alternatives:
>>>>>> [error]   (x$1:
>>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>>>>>> <and>
>>>>>> [error]   ()org.apache.flink.table.types.DataType <and>
>>>>>> [error]   (x$1:
>>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
>>>>>> [error]  cannot be applied to
>>>>>> (org.apache.flink.table.types.DataType,
>>>>>> org.apache.flink.table.types.DataType)
>>>>>> [error]   override def getResultType(signature: Array[Class[_]]):
>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>>> [error]
>>>>>>                             ^
>>>>>> [error] one error found
>>>>>> [error] (Compile / compileIncremental) Compilation failed
>>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>>>>>>
>>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a
>>>>>> écrit :
>>>>>>
>>>>>>> Hi Pierre,
>>>>>>>
>>>>>>> I guess your UDF is registered by the method
>>>>>>> 'register_java_function' which uses the old type system. In this 
>>>>>>> situation
>>>>>>> you need to override the 'getResultType' method instead of adding type
>>>>>>> hint.
>>>>>>>
>>>>>>> You can also try to register your UDF via the "CREATE FUNCTION" sql
>>>>>>> statement, which accepts the type hint.
>>>>>>>
>>>>>>> Best,
>>>>>>> Wei
>>>>>>>
>>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>> 写道:
>>>>>>>
>>>>>>> Hi Wei,
>>>>>>>
>>>>>>> Thanks for your suggestion. Same error.
>>>>>>>
>>>>>>> *Scala UDF*
>>>>>>>
>>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>   def eval(): Row = {
>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a
>>>>>>> écrit :
>>>>>>>
>>>>>>>> Hi Pierre,
>>>>>>>>
>>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t
>>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s 
>>>>>>>> STRING,t
>>>>>>>> STRING>”))'
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Wei
>>>>>>>>
>>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>>> 写道:
>>>>>>>>
>>>>>>>> Hi Dian, Community,
>>>>>>>>
>>>>>>>> (bringing the thread back to wider audience)
>>>>>>>>
>>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead
>>>>>>>> of Map but also this simple case leads to a type mismatch between
>>>>>>>> UDF and Table API.
>>>>>>>> I've also tried other Map objects from Flink (table.data.MapData,
>>>>>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to 
>>>>>>>> Java
>>>>>>>> (java.util.Map) in combination with DataTypeHint, without success.
>>>>>>>> N.B. I'm using version 1.11.
>>>>>>>>
>>>>>>>> Am I doing something wrong or am I facing limitations in the
>>>>>>>> toolkit ?
>>>>>>>>
>>>>>>>> Thanks in advance for your support !
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>>
>>>>>>>> *Scala UDF*
>>>>>>>>
>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>
>>>>>>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>>>  def eval(): Row = {
>>>>>>>>
>>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> *Table DDL*
>>>>>>>>
>>>>>>>> my_sink_ddl = f"""
>>>>>>>>     create table mySink (
>>>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>>>     ) with (
>>>>>>>>         ...
>>>>>>>>     )
>>>>>>>> """
>>>>>>>>
>>>>>>>> *Error*
>>>>>>>>
>>>>>>>> Py4JJavaError: An error occurred while calling o2.execute.
>>>>>>>> : org.apache.flink.table.api.ValidationException: Field types of
>>>>>>>> query result and registered TableSink
>>>>>>>> `default_catalog`.`default_database`.`mySink` do not match.
>>>>>>>> Query result schema: [output_of_my_scala_udf:
>>>>>>>> GenericType<org.apache.flink.types.Row>]
>>>>>>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t:
>>>>>>>> String)]
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>>>>>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>>>>>
>>>>>>>>> Thanks Dian, but same error when using explicit returned type:
>>>>>>>>>
>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>
>>>>>>>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>>>>>>>
>>>>>>>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>>>>>>>
>>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>>>>>>>
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a
>>>>>>>>> écrit :
>>>>>>>>>
>>>>>>>>>> You need to explicitly defined the result type the UDF. You could
>>>>>>>>>> refer to [1] for more details if you are using Flink 1.11. If you 
>>>>>>>>>> are using
>>>>>>>>>> other versions of Flink, you need to refer to the corresponding
>>>>>>>>>> documentation.
>>>>>>>>>>
>>>>>>>>>> [1]
>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>>>>>>>
>>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <
>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>
>>>>>>>>>> ScalarFunction
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Pierre
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Pierre
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Pierre
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Pierre
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Pierre
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Pierre
>>>>
>>>
>>>
>>> --
>>> Pierre
>>>
>>>
>>>
>>
>> --
>> Pierre
>>
> --
Pierre

Reply via email to