Hi Pierre,

Have you ever thought of declaring your entire json as a string field in
`Table` and putting the parsing work in UDF?

Best,
Xingbo

Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 上午4:13写道:

> Hi Xingbo,
>
> Many thanks for your follow up. Yes you got it right.
> So using Table API and a ROW object for the nested output of my UDF, and
> since types are mandatory, I guess this boils down to:
> - How to nicely specify the types for the 100k fields : shall I use
> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
> - Do I have to put NULL values for all the fields that don't have a value
> in my JSON ?
> - Will the resulting Table be "sparse" and suffer performance limitations ?
> Let me know if Table API and ROW are the right candidates here, or if
> other better alternatives exist.
> As said I'd be glad to apply some downstream transformations using
> key,value access (and possibly some Table <-> Pandas operations). Hope that
> doesn't make it a too long wish list ;)
>
> Thanks a lot !
>
> Best regards,
>
> [1]
> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
> [2]
> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>
> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a écrit :
>
>> Hi Pierre,
>>
>> Sorry for the late reply.
>> Your requirement is that your `Table` has a `field` in `Json` format and
>> its key has reached 100k, and then you want to use such a `field` as the
>> input/output of `udf`, right? As to whether there is a limit on the number
>> of nested key, I am not quite clear. Other contributors with experience in
>> this area may have answers. On the part of `Python UDF`, if the type of key
>> or value of your `Map` is `Any`, we do not support it now. You need to
>> specify a specific type. For more information, please refer to the related
>> document[1].
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>>
>> Best,
>> Xingbo
>>
>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>>
>> Hello Wei, Dian, Xingbo,
>>
>> Not really sure when it is appropriate to knock on the door of the
>> community ;)
>> I just wanted to mention that your feedback on the above topic will be
>> highly appreciated as it will condition the choice of framework on our side
>> for the months to come, and potentially help the community to cover sparse
>> data with Flink.
>>
>> Thanks a lot !
>>
>> Have a great week-end
>>
>> Best,
>>
>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
>> pierre.oberhol...@gmail.com> a écrit :
>>
>>> Hi Wei,
>>>
>>> Thanks for the hint. May I please follow up by adding more context and
>>> ask for your guidance.
>>>
>>> In case the bespoken Map[String,Any] object returned by Scala:
>>>
>>> - Has a defined schema (incl. nested) with up to 100k (!) different
>>> possible keys
>>> - Has only some portion of the keys populated for each record
>>> - Is convertible to JSON
>>> - Has to undergo downstream processing in Flink and/or Python UDF with
>>> key value access
>>> - Has to be ultimately stored in a Kafka/AVRO sink
>>>
>>> How would you declare the types explicitly in such a case ?
>>>
>>> Thanks for your support !
>>>
>>> Pierre
>>>
>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a
>>> écrit :
>>>
>>>> Hi Pierre,
>>>>
>>>> Currently there is no type hint like ‘Map[String, Any]’. The
>>>> recommended way is declaring your type more explicitly.
>>>>
>>>> If you insist on doing this, you can try to declaring a RAW data type
>>>> for java.util.HashMap [1], but you may encounter some troubles [2] related
>>>> to the kryo serializers.
>>>>
>>>> Best,
>>>> Wei
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>>>> [2]
>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>>>>
>>>>
>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>>>>
>>>> Hi Wei,
>>>>
>>>> It works ! Thanks a lot for your support.
>>>> I hadn't tried this last combination for option 1, and I had wrong
>>>> syntax for option 2.
>>>>
>>>> So to summarize..
>>>>
>>>> Methods working:
>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>>>> - Outdated: override getResultType in UDF definition
>>>> + t_env.register_java_function for UDF registering
>>>>
>>>> Type conversions working:
>>>> - scala.collection.immutable.Map[String,String] =>
>>>> org.apache.flink.types.Row => ROW<STRING,STRING>
>>>> - scala.collection.immutable.Map[String,String] =>
>>>> java.util.Map[String,String] => MAP<STRING,STRING>
>>>>
>>>> Any hint for Map[String,Any] ?
>>>>
>>>> Best regards,
>>>>
>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a
>>>> écrit :
>>>>
>>>>> Hi Pierre,
>>>>>
>>>>> Those 2 approaches all work in my local machine, this is my code:
>>>>>
>>>>> Scala UDF:
>>>>>
>>>>> package com.dummy
>>>>>
>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation
>>>>> import org.apache.flink.table.annotation.DataTypeHint
>>>>> import org.apache.flink.table.api.Types
>>>>> import org.apache.flink.table.functions.ScalarFunction
>>>>> import org.apache.flink.types.Row
>>>>>
>>>>> /**
>>>>>   * The scala UDF.
>>>>>   */
>>>>> class dummyMap extends ScalarFunction {
>>>>>
>>>>>   // If the udf would be registered by the SQL statement, you need add 
>>>>> this typehint
>>>>>   @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>   def eval(): Row = {
>>>>>
>>>>>     Row.of(java.lang.String.valueOf("foo"), 
>>>>> java.lang.String.valueOf("bar"))
>>>>>
>>>>>   }
>>>>>
>>>>>   // If the udf would be registered by the method 
>>>>> 'register_java_function', you need override this
>>>>>   // method.
>>>>>   override def getResultType(signature: Array[Class[_]]): 
>>>>> TypeInformation[_] = {
>>>>>     // The type of the return values should be TypeInformation
>>>>>     Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
>>>>> Types.STRING()))
>>>>>   }
>>>>> }
>>>>>
>>>>> Python code:
>>>>>
>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>> from pyflink.table import StreamTableEnvironment
>>>>>
>>>>> s_env = StreamExecutionEnvironment.get_execution_environment()
>>>>> st_env = StreamTableEnvironment.create(s_env)
>>>>>
>>>>> # load the scala udf jar file, the path should be modified to yours
>>>>> # or your can also load the jar file via other approaches
>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars", "
>>>>> file:///Users/zhongwei/the-dummy-udf.jar")
>>>>>
>>>>> # register the udf via
>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap'
>>>>> LANGUAGE SCALA")
>>>>> # or register via the method
>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>>>>>
>>>>> # prepare source and sink
>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')],
>>>>> ['a', 'b', 'c'])
>>>>> st_env.execute_sql("""create table mySink (
>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>     ) with (
>>>>>         'connector' = 'print'
>>>>>     )""")
>>>>>
>>>>> # execute query
>>>>>
>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>>>>>
>>>>> Best,
>>>>> Wei
>>>>>
>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>> 写道:
>>>>>
>>>>> Hi Wei,
>>>>>
>>>>> True, I'm using the method you mention, but glad to change.
>>>>> I tried your suggestion instead, but got a similar error.
>>>>>
>>>>> Thanks for your support. That is much more tedious than I thought.
>>>>>
>>>>> *Option 1 - SQL UDF*
>>>>>
>>>>> *SQL UDF*
>>>>> create_func_ddl = """
>>>>> CREATE FUNCTION dummyMap
>>>>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>>>>> """
>>>>>
>>>>> t_env.execute_sql(create_func_ddl)
>>>>>
>>>>> *Error*
>>>>> Py4JJavaError: An error occurred while calling o672.execute.
>>>>> : org.apache.flink.table.api.TableException: Result field does not
>>>>> match requested type. Requested: Row(s: String, t: String); Actual:
>>>>> GenericType<org.apache.flink.types.Row>
>>>>>
>>>>> *Option 2 *- *Overriding getResultType*
>>>>>
>>>>> Back to the old registering method, but overriding getResultType:
>>>>>
>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>>>>>
>>>>> *Scala UDF*
>>>>> class dummyMap() extends ScalarFunction {
>>>>>
>>>>>   def eval(): Row = {
>>>>>
>>>>>       Row.of(java.lang.String.valueOf("foo"),
>>>>> java.lang.String.valueOf("bar"))
>>>>>
>>>>>   }
>>>>>
>>>>>   override def getResultType(signature: Array[Class[_]]):
>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>> }
>>>>>
>>>>> *Error (on compilation)*
>>>>>
>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with
>>>>> alternatives:
>>>>> [error]   (x$1:
>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>>>>> <and>
>>>>> [error]   ()org.apache.flink.table.types.DataType <and>
>>>>> [error]   (x$1:
>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
>>>>> [error]  cannot be applied to (org.apache.flink.table.types.DataType,
>>>>> org.apache.flink.table.types.DataType)
>>>>> [error]   override def getResultType(signature: Array[Class[_]]):
>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>> [error]
>>>>>                             ^
>>>>> [error] one error found
>>>>> [error] (Compile / compileIncremental) Compilation failed
>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>>>>>
>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a
>>>>> écrit :
>>>>>
>>>>>> Hi Pierre,
>>>>>>
>>>>>> I guess your UDF is registered by the method 'register_java_function'
>>>>>> which uses the old type system. In this situation you need to override 
>>>>>> the
>>>>>> 'getResultType' method instead of adding type hint.
>>>>>>
>>>>>> You can also try to register your UDF via the "CREATE FUNCTION" sql
>>>>>> statement, which accepts the type hint.
>>>>>>
>>>>>> Best,
>>>>>> Wei
>>>>>>
>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>> 写道:
>>>>>>
>>>>>> Hi Wei,
>>>>>>
>>>>>> Thanks for your suggestion. Same error.
>>>>>>
>>>>>> *Scala UDF*
>>>>>>
>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>   def eval(): Row = {
>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>> java.lang.String.valueOf("bar"))
>>>>>>   }
>>>>>> }
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a
>>>>>> écrit :
>>>>>>
>>>>>>> Hi Pierre,
>>>>>>>
>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t
>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t
>>>>>>> STRING>”))'
>>>>>>>
>>>>>>> Best,
>>>>>>> Wei
>>>>>>>
>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>> 写道:
>>>>>>>
>>>>>>> Hi Dian, Community,
>>>>>>>
>>>>>>> (bringing the thread back to wider audience)
>>>>>>>
>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead
>>>>>>> of Map but also this simple case leads to a type mismatch between
>>>>>>> UDF and Table API.
>>>>>>> I've also tried other Map objects from Flink (table.data.MapData,
>>>>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
>>>>>>> (java.util.Map) in combination with DataTypeHint, without success.
>>>>>>> N.B. I'm using version 1.11.
>>>>>>>
>>>>>>> Am I doing something wrong or am I facing limitations in the toolkit
>>>>>>> ?
>>>>>>>
>>>>>>> Thanks in advance for your support !
>>>>>>>
>>>>>>> Best regards,
>>>>>>>
>>>>>>> *Scala UDF*
>>>>>>>
>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>
>>>>>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>>  def eval(): Row = {
>>>>>>>
>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> *Table DDL*
>>>>>>>
>>>>>>> my_sink_ddl = f"""
>>>>>>>     create table mySink (
>>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>>     ) with (
>>>>>>>         ...
>>>>>>>     )
>>>>>>> """
>>>>>>>
>>>>>>> *Error*
>>>>>>>
>>>>>>> Py4JJavaError: An error occurred while calling o2.execute.
>>>>>>> : org.apache.flink.table.api.ValidationException: Field types of
>>>>>>> query result and registered TableSink
>>>>>>> `default_catalog`.`default_database`.`mySink` do not match.
>>>>>>> Query result schema: [output_of_my_scala_udf:
>>>>>>> GenericType<org.apache.flink.types.Row>]
>>>>>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t:
>>>>>>> String)]
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>>>>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>>>>
>>>>>>>> Thanks Dian, but same error when using explicit returned type:
>>>>>>>>
>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>
>>>>>>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>>>>>>
>>>>>>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>>>>>>
>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>>>>>>
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a
>>>>>>>> écrit :
>>>>>>>>
>>>>>>>>> You need to explicitly defined the result type the UDF. You could
>>>>>>>>> refer to [1] for more details if you are using Flink 1.11. If you are 
>>>>>>>>> using
>>>>>>>>> other versions of Flink, you need to refer to the corresponding
>>>>>>>>> documentation.
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>>>>>>
>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <
>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>
>>>>>>>>> ScalarFunction
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Pierre
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Pierre
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Pierre
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Pierre
>>>>>
>>>>>
>>>>>
>>>>
>>>> --
>>>> Pierre
>>>>
>>>>
>>>>
>>>
>>> --
>>> Pierre
>>>
>>
>>
>> --
>> Pierre
>>
>>
>>
>
> --
> Pierre
>

Reply via email to