Hi Xingbo, Community,

Thanks a lot for your support.
May I finally ask to conclude this thread, including wider audience:
- Are serious performance issues to be expected with 100k fields per ROW
(i.e. due solely to metadata overhead and independently of queries logic) ?
- In sparse population (say 99% sparsity) already optimized in the ROW
object or are sparse types on your roadmap ?
Any experience with sparse Table from other users (including benchmarks vs.
other frameworks) are also highly welcome.

Thanks !

Best


Le jeu. 3 déc. 2020 à 02:53, Xingbo Huang <hxbks...@gmail.com> a écrit :

> Hi Pierre,
>
> This example is written based on the syntax of release-1.12 that is about
> to be released, and the test passed. In release-1.12, input_type can be
> omitted and expression can be used directly. If you are using release-1.11,
> you only need to modify the grammar of udf used slightly according to the
> udf documentation[1].
>
> The flink table connector supports avro format, please refer to the
> document[2].
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
> [2]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format
>
> Best,
> Xingbo
>
> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月3日周四 上午2:57写道:
>
>> Hi Xingbo,
>>
>> Nice ! This looks a bit hacky, but shows that it can be done ;)
>>
>> I just got an exception preventing me running your code, apparently from
>> udf.py:
>>
>> TypeError: Invalid input_type: input_type should be DataType but contains
>> None
>>
>> Can you pls check again ?
>> If the schema is defined is a .avsc file, do we have to parse it and
>> rebuild those syntax (ddl and udf) and or is there an existing component
>> that could be used ?
>>
>> Thanks a lot !
>>
>> Best,
>>
>>
>> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang <hxbks...@gmail.com> a écrit :
>>
>>> Hi Pierre,
>>>
>>> I wrote a PyFlink implementation, you can see if it meets your needs:
>>>
>>>
>>> from pyflink.datastream import StreamExecutionEnvironment
>>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
>>> DataTypes
>>> from pyflink.table.udf import udf
>>>
>>>
>>> def test():
>>>     env = StreamExecutionEnvironment.get_execution_environment()
>>>     env.set_parallelism(1)
>>>     t_env = StreamTableEnvironment.create(env,
>>>
>>> environment_settings=EnvironmentSettings.new_instance()
>>>
>>> .in_streaming_mode().use_blink_planner().build())
>>>
>>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>>                                                           '80m')
>>>
>>>     # 10k nested columns
>>>     num_field = 10_000
>>>     fields = ['f%s INT' % i for i in range(num_field)]
>>>     field_str = ','.join(fields)
>>>     t_env.execute_sql(f"""
>>>             CREATE TABLE source_table (
>>>             f0 BIGINT,
>>>             f1 DECIMAL(32,2),
>>>             f2 ROW<${field_str}>,
>>>             f3 TIMESTAMP(3)
>>>         ) WITH (
>>>           'connector' = 'datagen',
>>>           'number-of-rows' = '2'
>>>         )
>>>     """)
>>>
>>>     t_env.execute_sql(f"""
>>>         CREATE TABLE print_table (
>>>          f0 BIGINT,
>>>          f1 DECIMAL(32,2),
>>>          f2 ROW<${field_str}>,
>>>          f3 TIMESTAMP(3)
>>>         ) WITH (
>>>          'connector' = 'print'
>>>         )
>>>     """)
>>>     result_type = DataTypes.ROW(
>>>         [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
>>> range(num_field)])
>>>
>>>     func = udf(lambda x: x, result_type=result_type)
>>>
>>>     source = t_env.from_path("source_table")
>>>     result = source.select(source.f0, source.f1, func(source.f2),
>>> source.f3)
>>>     result.execute_insert("print_table")
>>>
>>>
>>> if __name__ == '__main__':
>>>     test()
>>>
>>>
>>>  Best,
>>>  Xingbo
>>>
>>> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 下午6:10写道:
>>>
>>>> Hi Xingbo,
>>>>
>>>> That would mean giving up on using Flink (table) features on the
>>>> content of the parsed JSON objects, so definitely a big loss. Let me know
>>>> if I missed something.
>>>>
>>>> Thanks !
>>>>
>>>> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang <hxbks...@gmail.com> a
>>>> écrit :
>>>>
>>>>> Hi Pierre,
>>>>>
>>>>> Have you ever thought of declaring your entire json as a string field
>>>>> in `Table` and putting the parsing work in UDF?
>>>>>
>>>>> Best,
>>>>> Xingbo
>>>>>
>>>>> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二
>>>>> 上午4:13写道:
>>>>>
>>>>>> Hi Xingbo,
>>>>>>
>>>>>> Many thanks for your follow up. Yes you got it right.
>>>>>> So using Table API and a ROW object for the nested output of my UDF,
>>>>>> and since types are mandatory, I guess this boils down to:
>>>>>> - How to nicely specify the types for the 100k fields : shall I use
>>>>>> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
>>>>>> - Do I have to put NULL values for all the fields that don't have a
>>>>>> value in my JSON ?
>>>>>> - Will the resulting Table be "sparse" and suffer performance
>>>>>> limitations ?
>>>>>> Let me know if Table API and ROW are the right candidates here, or if
>>>>>> other better alternatives exist.
>>>>>> As said I'd be glad to apply some downstream transformations using
>>>>>> key,value access (and possibly some Table <-> Pandas operations). Hope 
>>>>>> that
>>>>>> doesn't make it a too long wish list ;)
>>>>>>
>>>>>> Thanks a lot !
>>>>>>
>>>>>> Best regards,
>>>>>>
>>>>>> [1]
>>>>>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
>>>>>> [2]
>>>>>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>>>>>>
>>>>>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a
>>>>>> écrit :
>>>>>>
>>>>>>> Hi Pierre,
>>>>>>>
>>>>>>> Sorry for the late reply.
>>>>>>> Your requirement is that your `Table` has a `field` in `Json` format
>>>>>>> and its key has reached 100k, and then you want to use such a `field` as
>>>>>>> the input/output of `udf`, right? As to whether there is a limit on the
>>>>>>> number of nested key, I am not quite clear. Other contributors with
>>>>>>> experience in this area may have answers. On the part of `Python UDF`, 
>>>>>>> if
>>>>>>> the type of key or value of your `Map` is `Any`, we do not support it 
>>>>>>> now.
>>>>>>> You need to specify a specific type. For more information, please refer 
>>>>>>> to
>>>>>>> the related document[1].
>>>>>>>
>>>>>>> [1]
>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>>>>>>>
>>>>>>> Best,
>>>>>>> Xingbo
>>>>>>>
>>>>>>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>> 写道:
>>>>>>>
>>>>>>> Hello Wei, Dian, Xingbo,
>>>>>>>
>>>>>>> Not really sure when it is appropriate to knock on the door of the
>>>>>>> community ;)
>>>>>>> I just wanted to mention that your feedback on the above topic will
>>>>>>> be highly appreciated as it will condition the choice of framework on 
>>>>>>> our
>>>>>>> side for the months to come, and potentially help the community to cover
>>>>>>> sparse data with Flink.
>>>>>>>
>>>>>>> Thanks a lot !
>>>>>>>
>>>>>>> Have a great week-end
>>>>>>>
>>>>>>> Best,
>>>>>>>
>>>>>>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
>>>>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>>>>
>>>>>>>> Hi Wei,
>>>>>>>>
>>>>>>>> Thanks for the hint. May I please follow up by adding more context
>>>>>>>> and ask for your guidance.
>>>>>>>>
>>>>>>>> In case the bespoken Map[String,Any] object returned by Scala:
>>>>>>>>
>>>>>>>> - Has a defined schema (incl. nested) with up to 100k (!) different
>>>>>>>> possible keys
>>>>>>>> - Has only some portion of the keys populated for each record
>>>>>>>> - Is convertible to JSON
>>>>>>>> - Has to undergo downstream processing in Flink and/or Python UDF
>>>>>>>> with key value access
>>>>>>>> - Has to be ultimately stored in a Kafka/AVRO sink
>>>>>>>>
>>>>>>>> How would you declare the types explicitly in such a case ?
>>>>>>>>
>>>>>>>> Thanks for your support !
>>>>>>>>
>>>>>>>> Pierre
>>>>>>>>
>>>>>>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a
>>>>>>>> écrit :
>>>>>>>>
>>>>>>>>> Hi Pierre,
>>>>>>>>>
>>>>>>>>> Currently there is no type hint like ‘Map[String, Any]’. The
>>>>>>>>> recommended way is declaring your type more explicitly.
>>>>>>>>>
>>>>>>>>> If you insist on doing this, you can try to declaring a RAW data
>>>>>>>>> type for java.util.HashMap [1], but you may encounter some troubles 
>>>>>>>>> [2]
>>>>>>>>> related to the kryo serializers.
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Wei
>>>>>>>>>
>>>>>>>>> [1]
>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>>>>>>>>> [2]
>>>>>>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>>>> 写道:
>>>>>>>>>
>>>>>>>>> Hi Wei,
>>>>>>>>>
>>>>>>>>> It works ! Thanks a lot for your support.
>>>>>>>>> I hadn't tried this last combination for option 1, and I had wrong
>>>>>>>>> syntax for option 2.
>>>>>>>>>
>>>>>>>>> So to summarize..
>>>>>>>>>
>>>>>>>>> Methods working:
>>>>>>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>>>>>>>>> - Outdated: override getResultType in UDF definition
>>>>>>>>> + t_env.register_java_function for UDF registering
>>>>>>>>>
>>>>>>>>> Type conversions working:
>>>>>>>>> - scala.collection.immutable.Map[String,String] =>
>>>>>>>>> org.apache.flink.types.Row => ROW<STRING,STRING>
>>>>>>>>> - scala.collection.immutable.Map[String,String] =>
>>>>>>>>> java.util.Map[String,String] => MAP<STRING,STRING>
>>>>>>>>>
>>>>>>>>> Any hint for Map[String,Any] ?
>>>>>>>>>
>>>>>>>>> Best regards,
>>>>>>>>>
>>>>>>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com>
>>>>>>>>> a écrit :
>>>>>>>>>
>>>>>>>>>> Hi Pierre,
>>>>>>>>>>
>>>>>>>>>> Those 2 approaches all work in my local machine, this is my code:
>>>>>>>>>>
>>>>>>>>>> Scala UDF:
>>>>>>>>>>
>>>>>>>>>> package com.dummy
>>>>>>>>>>
>>>>>>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation
>>>>>>>>>> import org.apache.flink.table.annotation.DataTypeHint
>>>>>>>>>> import org.apache.flink.table.api.Types
>>>>>>>>>> import org.apache.flink.table.functions.ScalarFunction
>>>>>>>>>> import org.apache.flink.types.Row
>>>>>>>>>>
>>>>>>>>>> /**
>>>>>>>>>>   * The scala UDF.
>>>>>>>>>>   */
>>>>>>>>>> class dummyMap extends ScalarFunction {
>>>>>>>>>>
>>>>>>>>>>   // If the udf would be registered by the SQL statement, you need 
>>>>>>>>>> add this typehint
>>>>>>>>>>   @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>>>>>   def eval(): Row = {
>>>>>>>>>>
>>>>>>>>>>     Row.of(java.lang.String.valueOf("foo"), 
>>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>>
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>>   // If the udf would be registered by the method 
>>>>>>>>>> 'register_java_function', you need override this
>>>>>>>>>>   // method.
>>>>>>>>>>   override def getResultType(signature: Array[Class[_]]): 
>>>>>>>>>> TypeInformation[_] = {
>>>>>>>>>>     // The type of the return values should be TypeInformation
>>>>>>>>>>     Types.ROW(Array("s", "t"), 
>>>>>>>>>> Array[TypeInformation[_]](Types.STRING(), Types.STRING()))
>>>>>>>>>>   }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Python code:
>>>>>>>>>>
>>>>>>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>>>>>>> from pyflink.table import StreamTableEnvironment
>>>>>>>>>>
>>>>>>>>>> s_env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>>> st_env = StreamTableEnvironment.create(s_env)
>>>>>>>>>>
>>>>>>>>>> # load the scala udf jar file, the path should be modified to
>>>>>>>>>> yours
>>>>>>>>>> # or your can also load the jar file via other approaches
>>>>>>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars",
>>>>>>>>>> "file:///Users/zhongwei/the-dummy-udf.jar")
>>>>>>>>>>
>>>>>>>>>> # register the udf via
>>>>>>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS
>>>>>>>>>> 'com.dummy.dummyMap' LANGUAGE SCALA")
>>>>>>>>>> # or register via the method
>>>>>>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>>>>>>>>>>
>>>>>>>>>> # prepare source and sink
>>>>>>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi',
>>>>>>>>>> 'hello')], ['a', 'b', 'c'])
>>>>>>>>>> st_env.execute_sql("""create table mySink (
>>>>>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>>>>>     ) with (
>>>>>>>>>>         'connector' = 'print'
>>>>>>>>>>     )""")
>>>>>>>>>>
>>>>>>>>>> # execute query
>>>>>>>>>>
>>>>>>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Wei
>>>>>>>>>>
>>>>>>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <
>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>
>>>>>>>>>> Hi Wei,
>>>>>>>>>>
>>>>>>>>>> True, I'm using the method you mention, but glad to change.
>>>>>>>>>> I tried your suggestion instead, but got a similar error.
>>>>>>>>>>
>>>>>>>>>> Thanks for your support. That is much more tedious than I thought.
>>>>>>>>>>
>>>>>>>>>> *Option 1 - SQL UDF*
>>>>>>>>>>
>>>>>>>>>> *SQL UDF*
>>>>>>>>>> create_func_ddl = """
>>>>>>>>>> CREATE FUNCTION dummyMap
>>>>>>>>>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>>>>>>>>>> """
>>>>>>>>>>
>>>>>>>>>> t_env.execute_sql(create_func_ddl)
>>>>>>>>>>
>>>>>>>>>> *Error*
>>>>>>>>>> Py4JJavaError: An error occurred while calling o672.execute.
>>>>>>>>>> : org.apache.flink.table.api.TableException: Result field does
>>>>>>>>>> not match requested type. Requested: Row(s: String, t: String); 
>>>>>>>>>> Actual:
>>>>>>>>>> GenericType<org.apache.flink.types.Row>
>>>>>>>>>>
>>>>>>>>>> *Option 2 *- *Overriding getResultType*
>>>>>>>>>>
>>>>>>>>>> Back to the old registering method, but overriding getResultType:
>>>>>>>>>>
>>>>>>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>>>>>>>>>>
>>>>>>>>>> *Scala UDF*
>>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>>
>>>>>>>>>>   def eval(): Row = {
>>>>>>>>>>
>>>>>>>>>>       Row.of(java.lang.String.valueOf("foo"),
>>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>>
>>>>>>>>>>   }
>>>>>>>>>>
>>>>>>>>>>   override def getResultType(signature: Array[Class[_]]):
>>>>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> *Error (on compilation)*
>>>>>>>>>>
>>>>>>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with
>>>>>>>>>> alternatives:
>>>>>>>>>> [error]   (x$1:
>>>>>>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>>>>>>>>>> <and>
>>>>>>>>>> [error]   ()org.apache.flink.table.types.DataType <and>
>>>>>>>>>> [error]   (x$1:
>>>>>>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
>>>>>>>>>> [error]  cannot be applied to
>>>>>>>>>> (org.apache.flink.table.types.DataType,
>>>>>>>>>> org.apache.flink.table.types.DataType)
>>>>>>>>>> [error]   override def getResultType(signature:
>>>>>>>>>> Array[Class[_]]): TypeInformation[_] =
>>>>>>>>>> DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>>>>>>> [error]
>>>>>>>>>>                                 ^
>>>>>>>>>> [error] one error found
>>>>>>>>>> [error] (Compile / compileIncremental) Compilation failed
>>>>>>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>>>>>>>>>>
>>>>>>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com>
>>>>>>>>>> a écrit :
>>>>>>>>>>
>>>>>>>>>>> Hi Pierre,
>>>>>>>>>>>
>>>>>>>>>>> I guess your UDF is registered by the method
>>>>>>>>>>> 'register_java_function' which uses the old type system. In this 
>>>>>>>>>>> situation
>>>>>>>>>>> you need to override the 'getResultType' method instead of adding 
>>>>>>>>>>> type
>>>>>>>>>>> hint.
>>>>>>>>>>>
>>>>>>>>>>> You can also try to register your UDF via the "CREATE FUNCTION"
>>>>>>>>>>> sql statement, which accepts the type hint.
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Wei
>>>>>>>>>>>
>>>>>>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <
>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>>
>>>>>>>>>>> Hi Wei,
>>>>>>>>>>>
>>>>>>>>>>> Thanks for your suggestion. Same error.
>>>>>>>>>>>
>>>>>>>>>>> *Scala UDF*
>>>>>>>>>>>
>>>>>>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t
>>>>>>>>>>> STRING>"))
>>>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>>>   def eval(): Row = {
>>>>>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>>>   }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>>
>>>>>>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com>
>>>>>>>>>>> a écrit :
>>>>>>>>>>>
>>>>>>>>>>>> Hi Pierre,
>>>>>>>>>>>>
>>>>>>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t
>>>>>>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s 
>>>>>>>>>>>> STRING,t
>>>>>>>>>>>> STRING>”))'
>>>>>>>>>>>>
>>>>>>>>>>>> Best,
>>>>>>>>>>>> Wei
>>>>>>>>>>>>
>>>>>>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <
>>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>>>
>>>>>>>>>>>> Hi Dian, Community,
>>>>>>>>>>>>
>>>>>>>>>>>> (bringing the thread back to wider audience)
>>>>>>>>>>>>
>>>>>>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead
>>>>>>>>>>>> of Map but also this simple case leads to a type mismatch
>>>>>>>>>>>> between UDF and Table API.
>>>>>>>>>>>> I've also tried other Map objects from Flink
>>>>>>>>>>>> (table.data.MapData, flink.types.MapValue, 
>>>>>>>>>>>> flink.table.api.DataTypes.MAP)
>>>>>>>>>>>> in addition to Java (java.util.Map) in combination with
>>>>>>>>>>>> DataTypeHint, without success.
>>>>>>>>>>>> N.B. I'm using version 1.11.
>>>>>>>>>>>>
>>>>>>>>>>>> Am I doing something wrong or am I facing limitations in the
>>>>>>>>>>>> toolkit ?
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks in advance for your support !
>>>>>>>>>>>>
>>>>>>>>>>>> Best regards,
>>>>>>>>>>>>
>>>>>>>>>>>> *Scala UDF*
>>>>>>>>>>>>
>>>>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>>>>
>>>>>>>>>>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>>>>>>>  def eval(): Row = {
>>>>>>>>>>>>
>>>>>>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>>>>
>>>>>>>>>>>>   }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> *Table DDL*
>>>>>>>>>>>>
>>>>>>>>>>>> my_sink_ddl = f"""
>>>>>>>>>>>>     create table mySink (
>>>>>>>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>>>>>>>     ) with (
>>>>>>>>>>>>         ...
>>>>>>>>>>>>     )
>>>>>>>>>>>> """
>>>>>>>>>>>>
>>>>>>>>>>>> *Error*
>>>>>>>>>>>>
>>>>>>>>>>>> Py4JJavaError: An error occurred while calling o2.execute.
>>>>>>>>>>>> : org.apache.flink.table.api.ValidationException: Field types
>>>>>>>>>>>> of query result and registered TableSink
>>>>>>>>>>>> `default_catalog`.`default_database`.`mySink` do not match.
>>>>>>>>>>>> Query result schema: [output_of_my_scala_udf:
>>>>>>>>>>>> GenericType<org.apache.flink.types.Row>]
>>>>>>>>>>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t:
>>>>>>>>>>>> String)]
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>>>>>>>>>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Dian, but same error when using explicit returned type:
>>>>>>>>>>>>>
>>>>>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>>>>>
>>>>>>>>>>>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>>>>>>>>>>>
>>>>>>>>>>>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>>>>>>>>>>>
>>>>>>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>>>>>>>>>>>
>>>>>>>>>>>>>   }
>>>>>>>>>>>>> }
>>>>>>>>>>>>>
>>>>>>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com>
>>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>>
>>>>>>>>>>>>>> You need to explicitly defined the result type the UDF. You
>>>>>>>>>>>>>> could refer to [1] for more details if you are using Flink 1.11. 
>>>>>>>>>>>>>> If you are
>>>>>>>>>>>>>> using other versions of Flink, you need to refer to the 
>>>>>>>>>>>>>> corresponding
>>>>>>>>>>>>>> documentation.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> [1]
>>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <
>>>>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> ScalarFunction
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Pierre
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Pierre
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Pierre
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Pierre
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Pierre
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Pierre
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Pierre
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Pierre
>>>>>>
>>>>> --
>>>> Pierre
>>>>
>>>
>>
>> --
>> Pierre
>>
> --
Pierre

Reply via email to