Hi Pierre,

I wrote a PyFlink implementation, you can see if it meets your needs:


from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
DataTypes
from pyflink.table.udf import udf


def test():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    t_env = StreamTableEnvironment.create(env,

environment_settings=EnvironmentSettings.new_instance()

.in_streaming_mode().use_blink_planner().build())

t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
                                                          '80m')

    # 10k nested columns
    num_field = 10_000
    fields = ['f%s INT' % i for i in range(num_field)]
    field_str = ','.join(fields)
    t_env.execute_sql(f"""
            CREATE TABLE source_table (
            f0 BIGINT,
            f1 DECIMAL(32,2),
            f2 ROW<${field_str}>,
            f3 TIMESTAMP(3)
        ) WITH (
          'connector' = 'datagen',
          'number-of-rows' = '2'
        )
    """)

    t_env.execute_sql(f"""
        CREATE TABLE print_table (
         f0 BIGINT,
         f1 DECIMAL(32,2),
         f2 ROW<${field_str}>,
         f3 TIMESTAMP(3)
        ) WITH (
         'connector' = 'print'
        )
    """)
    result_type = DataTypes.ROW(
        [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
range(num_field)])

    func = udf(lambda x: x, result_type=result_type)

    source = t_env.from_path("source_table")
    result = source.select(source.f0, source.f1, func(source.f2), source.f3)
    result.execute_insert("print_table")


if __name__ == '__main__':
    test()


 Best,
 Xingbo

Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 下午6:10写道:

> Hi Xingbo,
>
> That would mean giving up on using Flink (table) features on the content
> of the parsed JSON objects, so definitely a big loss. Let me know if I
> missed something.
>
> Thanks !
>
> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang <hxbks...@gmail.com> a écrit :
>
>> Hi Pierre,
>>
>> Have you ever thought of declaring your entire json as a string field in
>> `Table` and putting the parsing work in UDF?
>>
>> Best,
>> Xingbo
>>
>> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 上午4:13写道:
>>
>>> Hi Xingbo,
>>>
>>> Many thanks for your follow up. Yes you got it right.
>>> So using Table API and a ROW object for the nested output of my UDF, and
>>> since types are mandatory, I guess this boils down to:
>>> - How to nicely specify the types for the 100k fields : shall I use
>>> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
>>> - Do I have to put NULL values for all the fields that don't have a
>>> value in my JSON ?
>>> - Will the resulting Table be "sparse" and suffer performance
>>> limitations ?
>>> Let me know if Table API and ROW are the right candidates here, or if
>>> other better alternatives exist.
>>> As said I'd be glad to apply some downstream transformations using
>>> key,value access (and possibly some Table <-> Pandas operations). Hope that
>>> doesn't make it a too long wish list ;)
>>>
>>> Thanks a lot !
>>>
>>> Best regards,
>>>
>>> [1]
>>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
>>> [2]
>>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>>>
>>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a
>>> écrit :
>>>
>>>> Hi Pierre,
>>>>
>>>> Sorry for the late reply.
>>>> Your requirement is that your `Table` has a `field` in `Json` format
>>>> and its key has reached 100k, and then you want to use such a `field` as
>>>> the input/output of `udf`, right? As to whether there is a limit on the
>>>> number of nested key, I am not quite clear. Other contributors with
>>>> experience in this area may have answers. On the part of `Python UDF`, if
>>>> the type of key or value of your `Map` is `Any`, we do not support it now.
>>>> You need to specify a specific type. For more information, please refer to
>>>> the related document[1].
>>>>
>>>> [1]
>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>>>>
>>>> Hello Wei, Dian, Xingbo,
>>>>
>>>> Not really sure when it is appropriate to knock on the door of the
>>>> community ;)
>>>> I just wanted to mention that your feedback on the above topic will be
>>>> highly appreciated as it will condition the choice of framework on our side
>>>> for the months to come, and potentially help the community to cover sparse
>>>> data with Flink.
>>>>
>>>> Thanks a lot !
>>>>
>>>> Have a great week-end
>>>>
>>>> Best,
>>>>
>>>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>
>>>>> Hi Wei,
>>>>>
>>>>> Thanks for the hint. May I please follow up by adding more context and
>>>>> ask for your guidance.
>>>>>
>>>>> In case the bespoken Map[String,Any] object returned by Scala:
>>>>>
>>>>> - Has a defined schema (incl. nested) with up to 100k (!) different
>>>>> possible keys
>>>>> - Has only some portion of the keys populated for each record
>>>>> - Is convertible to JSON
>>>>> - Has to undergo downstream processing in Flink and/or Python UDF with
>>>>> key value access
>>>>> - Has to be ultimately stored in a Kafka/AVRO sink
>>>>>
>>>>> How would you declare the types explicitly in such a case ?
>>>>>
>>>>> Thanks for your support !
>>>>>
>>>>> Pierre
>>>>>
>>>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a
>>>>> écrit :
>>>>>
>>>>>> Hi Pierre,
>>>>>>
>>>>>> Currently there is no type hint like ‘Map[String, Any]’. The
>>>>>> recommended way is declaring your type more explicitly.
>>>>>>
>>>>>> If you insist on doing this, you can try to declaring a RAW data type
>>>>>> for java.util.HashMap [1], but you may encounter some troubles [2] 
>>>>>> related
>>>>>> to the kryo serializers.
>>>>>>
>>>>>> Best,
>>>>>> Wei
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>>>>>> [2]
>>>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>>>>>>
>>>>>>
>>>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>> 写道:
>>>>>>
>>>>>> Hi Wei,
>>>>>>
>>>>>> It works ! Thanks a lot for your support.
>>>>>> I hadn't tried this last combination for option 1, and I had wrong
>>>>>> syntax for option 2.
>>>>>>
>>>>>> So to summarize..
>>>>>>
>>>>>> Methods working:
>>>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>>>>>> - Outdated: override getResultType in UDF definition
>>>>>> + t_env.register_java_function for UDF registering
>>>>>>
>>>>>> Type conversions working:
>>>>>> - scala.collection.immutable.Map[String,String] =>
>>>>>> org.apache.flink.types.Row => ROW<STRING,STRING>
>>>>>> - scala.collection.immutable.Map[String,String] =>
>>>>>> java.util.Map[String,String] => MAP<STRING,STRING>
>>>>>>
>>>>>> Any hint for Map[String,Any] ?
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a
>>>>>> écrit :
>>>>>>
>>>>>>> Hi Pierre,
>>>>>>>
>>>>>>> Those 2 approaches all work in my local machine, this is my code:
>>>>>>>
>>>>>>> Scala UDF:
>>>>>>>
>>>>>>> package com.dummy
>>>>>>>
>>>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation
>>>>>>> import org.apache.flink.table.annotation.DataTypeHint
>>>>>>> import org.apache.flink.table.api.Types
>>>>>>> import org.apache.flink.table.functions.ScalarFunction
>>>>>>> import org.apache.flink.types.Row
>>>>>>>
>>>>>>> /**
>>>>>>>   * The scala UDF.
>>>>>>>   */
>>>>>>> class dummyMap extends ScalarFunction {
>>>>>>>
>>>>>>>   // If the udf would be registered by the SQL statement, you need add 
>>>>>>> this typehint
>>>>>>>   @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>>   def eval(): Row = {
>>>>>>>
>>>>>>>     Row.of(java.lang.String.valueOf("foo"), 
>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>   // If the udf would be registered by the method 
>>>>>>> 'register_java_function', you need override this
>>>>>>>   // method.
>>>>>>>   override def getResultType(signature: Array[Class[_]]): 
>>>>>>> TypeInformation[_] = {
>>>>>>>     // The type of the return values should be TypeInformation
>>>>>>>     Types.ROW(Array("s", "t"), 
>>>>>>> Array[TypeInformation[_]](Types.STRING(), Types.STRING()))
>>>>>>>   }
>>>>>>> }
>>>>>>>
>>>>>>> Python code:
>>>>>>>
>>>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>>>> from pyflink.table import StreamTableEnvironment
>>>>>>>
>>>>>>> s_env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>> st_env = StreamTableEnvironment.create(s_env)
>>>>>>>
>>>>>>> # load the scala udf jar file, the path should be modified to yours
>>>>>>> # or your can also load the jar file via other approaches
>>>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars", "
>>>>>>> file:///Users/zhongwei/the-dummy-udf.jar")
>>>>>>>
>>>>>>> # register the udf via
>>>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap'
>>>>>>> LANGUAGE SCALA")
>>>>>>> # or register via the method
>>>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>>>>>>>
>>>>>>> # prepare source and sink
>>>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')],
>>>>>>> ['a', 'b', 'c'])
>>>>>>> st_env.execute_sql("""create table mySink (
>>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>>     ) with (
>>>>>>>         'connector' = 'print'
>>>>>>>     )""")
>>>>>>>
>>>>>>> # execute query
>>>>>>>
>>>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>>>>>>>
>>>>>>> Best,
>>>>>>> Wei
>>>>>>>
>>>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>> 写道:
>>>>>>>
>>>>>>> Hi Wei,
>>>>>>>
>>>>>>> True, I'm using the method you mention, but glad to change.
>>>>>>> I tried your suggestion instead, but got a similar error.
>>>>>>>
>>>>>>> Thanks for your support. That is much more tedious than I thought.
>>>>>>>
>>>>>>> *Option 1 - SQL UDF*
>>>>>>>
>>>>>>> *SQL UDF*
>>>>>>> create_func_ddl = """
>>>>>>> CREATE FUNCTION dummyMap
>>>>>>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>>>>>>> """
>>>>>>>
>>>>>>> t_env.execute_sql(create_func_ddl)
>>>>>>>
>>>>>>> *Error*
>>>>>>> Py4JJavaError: An error occurred while calling o672.execute.
>>>>>>> : org.apache.flink.table.api.TableException: Result field does not
>>>>>>> match requested type. Requested: Row(s: String, t: String); Actual:
>>>>>>> GenericType<org.apache.flink.types.Row>
>>>>>>>
>>>>>>> *Option 2 *- *Overriding getResultType*
>>>>>>>
>>>>>>> Back to the old registering method, but overriding getResultType:
>>>>>>>
>>>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>>>>>>>
>>>>>>> *Scala UDF*
>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>
>>>>>>>   def eval(): Row = {
>>>>>>>
>>>>>>>       Row.of(java.lang.String.valueOf("foo"),
>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>
>>>>>>>   }
>>>>>>>
>>>>>>>   override def getResultType(signature: Array[Class[_]]):
>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>>>> }
>>>>>>>
>>>>>>> *Error (on compilation)*
>>>>>>>
>>>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with
>>>>>>> alternatives:
>>>>>>> [error]   (x$1:
>>>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>>>>>>> <and>
>>>>>>> [error]   ()org.apache.flink.table.types.DataType <and>
>>>>>>> [error]   (x$1:
>>>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
>>>>>>> [error]  cannot be applied to
>>>>>>> (org.apache.flink.table.types.DataType,
>>>>>>> org.apache.flink.table.types.DataType)
>>>>>>> [error]   override def getResultType(signature: Array[Class[_]]):
>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>>>> [error]
>>>>>>>                               ^
>>>>>>> [error] one error found
>>>>>>> [error] (Compile / compileIncremental) Compilation failed
>>>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>>>>>>>
>>>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a
>>>>>>> écrit :
>>>>>>>
>>>>>>>> Hi Pierre,
>>>>>>>>
>>>>>>>> I guess your UDF is registered by the method
>>>>>>>> 'register_java_function' which uses the old type system. In this 
>>>>>>>> situation
>>>>>>>> you need to override the 'getResultType' method instead of adding type
>>>>>>>> hint.
>>>>>>>>
>>>>>>>> You can also try to register your UDF via the "CREATE FUNCTION" sql
>>>>>>>> statement, which accepts the type hint.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Wei
>>>>>>>>
>>>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>>> 写道:
>>>>>>>>
>>>>>>>> Hi Wei,
>>>>>>>>
>>>>>>>> Thanks for your suggestion. Same error.
>>>>>>>>
>>>>>>>> *Scala UDF*
>>>>>>>>
>>>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>   def eval(): Row = {
>>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>   }
>>>>>>>> }
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>>
>>>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a
>>>>>>>> écrit :
>>>>>>>>
>>>>>>>>> Hi Pierre,
>>>>>>>>>
>>>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t
>>>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s 
>>>>>>>>> STRING,t
>>>>>>>>> STRING>”))'
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Wei
>>>>>>>>>
>>>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>>>> 写道:
>>>>>>>>>
>>>>>>>>> Hi Dian, Community,
>>>>>>>>>
>>>>>>>>> (bringing the thread back to wider audience)
>>>>>>>>>
>>>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead
>>>>>>>>> of Map but also this simple case leads to a type mismatch between
>>>>>>>>> UDF and Table API.
>>>>>>>>> I've also tried other Map objects from Flink (table.data.MapData,
>>>>>>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to 
>>>>>>>>> Java
>>>>>>>>> (java.util.Map) in combination with DataTypeHint, without success.
>>>>>>>>> N.B. I'm using version 1.11.
>>>>>>>>>
>>>>>>>>> Am I doing something wrong or am I facing limitations in the
>>>>>>>>> toolkit ?
>>>>>>>>>
>>>>>>>>> Thanks in advance for your support !
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>>
>>>>>>>>> *Scala UDF*
>>>>>>>>>
>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>
>>>>>>>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>>>>  def eval(): Row = {
>>>>>>>>>
>>>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> *Table DDL*
>>>>>>>>>
>>>>>>>>> my_sink_ddl = f"""
>>>>>>>>>     create table mySink (
>>>>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>>>>     ) with (
>>>>>>>>>         ...
>>>>>>>>>     )
>>>>>>>>> """
>>>>>>>>>
>>>>>>>>> *Error*
>>>>>>>>>
>>>>>>>>> Py4JJavaError: An error occurred while calling o2.execute.
>>>>>>>>> : org.apache.flink.table.api.ValidationException: Field types of
>>>>>>>>> query result and registered TableSink
>>>>>>>>> `default_catalog`.`default_database`.`mySink` do not match.
>>>>>>>>> Query result schema: [output_of_my_scala_udf:
>>>>>>>>> GenericType<org.apache.flink.types.Row>]
>>>>>>>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t:
>>>>>>>>> String)]
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>>>>>>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>>>>>>
>>>>>>>>>> Thanks Dian, but same error when using explicit returned type:
>>>>>>>>>>
>>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>>
>>>>>>>>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>>>>>>>>
>>>>>>>>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>>>>>>>>
>>>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>>>>>>>>
>>>>>>>>>>   }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a
>>>>>>>>>> écrit :
>>>>>>>>>>
>>>>>>>>>>> You need to explicitly defined the result type the UDF. You
>>>>>>>>>>> could refer to [1] for more details if you are using Flink 1.11. If 
>>>>>>>>>>> you are
>>>>>>>>>>> using other versions of Flink, you need to refer to the 
>>>>>>>>>>> corresponding
>>>>>>>>>>> documentation.
>>>>>>>>>>>
>>>>>>>>>>> [1]
>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>>>>>>>>
>>>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <
>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>>
>>>>>>>>>>> ScalarFunction
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Pierre
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Pierre
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Pierre
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Pierre
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Pierre
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Pierre
>>>>>
>>>>
>>>>
>>>> --
>>>> Pierre
>>>>
>>>>
>>>>
>>>
>>> --
>>> Pierre
>>>
>> --
> Pierre
>

Reply via email to