Hi Pierre,

This example is written based on the syntax of release-1.12 that is about
to be released, and the test passed. In release-1.12, input_type can be
omitted and expression can be used directly. If you are using release-1.11,
you only need to modify the grammar of udf used slightly according to the
udf documentation[1].

The flink table connector supports avro format, please refer to the
document[2].

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format

Best,
Xingbo

Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月3日周四 上午2:57写道:

> Hi Xingbo,
>
> Nice ! This looks a bit hacky, but shows that it can be done ;)
>
> I just got an exception preventing me running your code, apparently from
> udf.py:
>
> TypeError: Invalid input_type: input_type should be DataType but contains
> None
>
> Can you pls check again ?
> If the schema is defined is a .avsc file, do we have to parse it and
> rebuild those syntax (ddl and udf) and or is there an existing component
> that could be used ?
>
> Thanks a lot !
>
> Best,
>
>
> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang <hxbks...@gmail.com> a écrit :
>
>> Hi Pierre,
>>
>> I wrote a PyFlink implementation, you can see if it meets your needs:
>>
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings,
>> DataTypes
>> from pyflink.table.udf import udf
>>
>>
>> def test():
>>     env = StreamExecutionEnvironment.get_execution_environment()
>>     env.set_parallelism(1)
>>     t_env = StreamTableEnvironment.create(env,
>>
>> environment_settings=EnvironmentSettings.new_instance()
>>
>> .in_streaming_mode().use_blink_planner().build())
>>
>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size",
>>                                                           '80m')
>>
>>     # 10k nested columns
>>     num_field = 10_000
>>     fields = ['f%s INT' % i for i in range(num_field)]
>>     field_str = ','.join(fields)
>>     t_env.execute_sql(f"""
>>             CREATE TABLE source_table (
>>             f0 BIGINT,
>>             f1 DECIMAL(32,2),
>>             f2 ROW<${field_str}>,
>>             f3 TIMESTAMP(3)
>>         ) WITH (
>>           'connector' = 'datagen',
>>           'number-of-rows' = '2'
>>         )
>>     """)
>>
>>     t_env.execute_sql(f"""
>>         CREATE TABLE print_table (
>>          f0 BIGINT,
>>          f1 DECIMAL(32,2),
>>          f2 ROW<${field_str}>,
>>          f3 TIMESTAMP(3)
>>         ) WITH (
>>          'connector' = 'print'
>>         )
>>     """)
>>     result_type = DataTypes.ROW(
>>         [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in
>> range(num_field)])
>>
>>     func = udf(lambda x: x, result_type=result_type)
>>
>>     source = t_env.from_path("source_table")
>>     result = source.select(source.f0, source.f1, func(source.f2),
>> source.f3)
>>     result.execute_insert("print_table")
>>
>>
>> if __name__ == '__main__':
>>     test()
>>
>>
>>  Best,
>>  Xingbo
>>
>> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 下午6:10写道:
>>
>>> Hi Xingbo,
>>>
>>> That would mean giving up on using Flink (table) features on the content
>>> of the parsed JSON objects, so definitely a big loss. Let me know if I
>>> missed something.
>>>
>>> Thanks !
>>>
>>> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang <hxbks...@gmail.com> a écrit :
>>>
>>>> Hi Pierre,
>>>>
>>>> Have you ever thought of declaring your entire json as a string field
>>>> in `Table` and putting the parsing work in UDF?
>>>>
>>>> Best,
>>>> Xingbo
>>>>
>>>> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 上午4:13写道:
>>>>
>>>>> Hi Xingbo,
>>>>>
>>>>> Many thanks for your follow up. Yes you got it right.
>>>>> So using Table API and a ROW object for the nested output of my UDF,
>>>>> and since types are mandatory, I guess this boils down to:
>>>>> - How to nicely specify the types for the 100k fields : shall I use
>>>>> TypeInformation [1] or better retrieve it from Schema Registry [2] ?
>>>>> - Do I have to put NULL values for all the fields that don't have a
>>>>> value in my JSON ?
>>>>> - Will the resulting Table be "sparse" and suffer performance
>>>>> limitations ?
>>>>> Let me know if Table API and ROW are the right candidates here, or if
>>>>> other better alternatives exist.
>>>>> As said I'd be glad to apply some downstream transformations using
>>>>> key,value access (and possibly some Table <-> Pandas operations). Hope 
>>>>> that
>>>>> doesn't make it a too long wish list ;)
>>>>>
>>>>> Thanks a lot !
>>>>>
>>>>> Best regards,
>>>>>
>>>>> [1]
>>>>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly
>>>>> [2]
>>>>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html
>>>>>
>>>>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a
>>>>> écrit :
>>>>>
>>>>>> Hi Pierre,
>>>>>>
>>>>>> Sorry for the late reply.
>>>>>> Your requirement is that your `Table` has a `field` in `Json` format
>>>>>> and its key has reached 100k, and then you want to use such a `field` as
>>>>>> the input/output of `udf`, right? As to whether there is a limit on the
>>>>>> number of nested key, I am not quite clear. Other contributors with
>>>>>> experience in this area may have answers. On the part of `Python UDF`, if
>>>>>> the type of key or value of your `Map` is `Any`, we do not support it 
>>>>>> now.
>>>>>> You need to specify a specific type. For more information, please refer 
>>>>>> to
>>>>>> the related document[1].
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html
>>>>>>
>>>>>> Best,
>>>>>> Xingbo
>>>>>>
>>>>>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>> 写道:
>>>>>>
>>>>>> Hello Wei, Dian, Xingbo,
>>>>>>
>>>>>> Not really sure when it is appropriate to knock on the door of the
>>>>>> community ;)
>>>>>> I just wanted to mention that your feedback on the above topic will
>>>>>> be highly appreciated as it will condition the choice of framework on our
>>>>>> side for the months to come, and potentially help the community to cover
>>>>>> sparse data with Flink.
>>>>>>
>>>>>> Thanks a lot !
>>>>>>
>>>>>> Have a great week-end
>>>>>>
>>>>>> Best,
>>>>>>
>>>>>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer <
>>>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>>>
>>>>>>> Hi Wei,
>>>>>>>
>>>>>>> Thanks for the hint. May I please follow up by adding more context
>>>>>>> and ask for your guidance.
>>>>>>>
>>>>>>> In case the bespoken Map[String,Any] object returned by Scala:
>>>>>>>
>>>>>>> - Has a defined schema (incl. nested) with up to 100k (!) different
>>>>>>> possible keys
>>>>>>> - Has only some portion of the keys populated for each record
>>>>>>> - Is convertible to JSON
>>>>>>> - Has to undergo downstream processing in Flink and/or Python UDF
>>>>>>> with key value access
>>>>>>> - Has to be ultimately stored in a Kafka/AVRO sink
>>>>>>>
>>>>>>> How would you declare the types explicitly in such a case ?
>>>>>>>
>>>>>>> Thanks for your support !
>>>>>>>
>>>>>>> Pierre
>>>>>>>
>>>>>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a
>>>>>>> écrit :
>>>>>>>
>>>>>>>> Hi Pierre,
>>>>>>>>
>>>>>>>> Currently there is no type hint like ‘Map[String, Any]’. The
>>>>>>>> recommended way is declaring your type more explicitly.
>>>>>>>>
>>>>>>>> If you insist on doing this, you can try to declaring a RAW data
>>>>>>>> type for java.util.HashMap [1], but you may encounter some troubles [2]
>>>>>>>> related to the kryo serializers.
>>>>>>>>
>>>>>>>> Best,
>>>>>>>> Wei
>>>>>>>>
>>>>>>>> [1]
>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
>>>>>>>> [2]
>>>>>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>>>>>>>>
>>>>>>>>
>>>>>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>>> 写道:
>>>>>>>>
>>>>>>>> Hi Wei,
>>>>>>>>
>>>>>>>> It works ! Thanks a lot for your support.
>>>>>>>> I hadn't tried this last combination for option 1, and I had wrong
>>>>>>>> syntax for option 2.
>>>>>>>>
>>>>>>>> So to summarize..
>>>>>>>>
>>>>>>>> Methods working:
>>>>>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering
>>>>>>>> - Outdated: override getResultType in UDF definition
>>>>>>>> + t_env.register_java_function for UDF registering
>>>>>>>>
>>>>>>>> Type conversions working:
>>>>>>>> - scala.collection.immutable.Map[String,String] =>
>>>>>>>> org.apache.flink.types.Row => ROW<STRING,STRING>
>>>>>>>> - scala.collection.immutable.Map[String,String] =>
>>>>>>>> java.util.Map[String,String] => MAP<STRING,STRING>
>>>>>>>>
>>>>>>>> Any hint for Map[String,Any] ?
>>>>>>>>
>>>>>>>> Best regards,
>>>>>>>>
>>>>>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a
>>>>>>>> écrit :
>>>>>>>>
>>>>>>>>> Hi Pierre,
>>>>>>>>>
>>>>>>>>> Those 2 approaches all work in my local machine, this is my code:
>>>>>>>>>
>>>>>>>>> Scala UDF:
>>>>>>>>>
>>>>>>>>> package com.dummy
>>>>>>>>>
>>>>>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation
>>>>>>>>> import org.apache.flink.table.annotation.DataTypeHint
>>>>>>>>> import org.apache.flink.table.api.Types
>>>>>>>>> import org.apache.flink.table.functions.ScalarFunction
>>>>>>>>> import org.apache.flink.types.Row
>>>>>>>>>
>>>>>>>>> /**
>>>>>>>>>   * The scala UDF.
>>>>>>>>>   */
>>>>>>>>> class dummyMap extends ScalarFunction {
>>>>>>>>>
>>>>>>>>>   // If the udf would be registered by the SQL statement, you need 
>>>>>>>>> add this typehint
>>>>>>>>>   @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>>>>   def eval(): Row = {
>>>>>>>>>
>>>>>>>>>     Row.of(java.lang.String.valueOf("foo"), 
>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   // If the udf would be registered by the method 
>>>>>>>>> 'register_java_function', you need override this
>>>>>>>>>   // method.
>>>>>>>>>   override def getResultType(signature: Array[Class[_]]): 
>>>>>>>>> TypeInformation[_] = {
>>>>>>>>>     // The type of the return values should be TypeInformation
>>>>>>>>>     Types.ROW(Array("s", "t"), 
>>>>>>>>> Array[TypeInformation[_]](Types.STRING(), Types.STRING()))
>>>>>>>>>   }
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> Python code:
>>>>>>>>>
>>>>>>>>> from pyflink.datastream import StreamExecutionEnvironment
>>>>>>>>> from pyflink.table import StreamTableEnvironment
>>>>>>>>>
>>>>>>>>> s_env = StreamExecutionEnvironment.get_execution_environment()
>>>>>>>>> st_env = StreamTableEnvironment.create(s_env)
>>>>>>>>>
>>>>>>>>> # load the scala udf jar file, the path should be modified to yours
>>>>>>>>> # or your can also load the jar file via other approaches
>>>>>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars",
>>>>>>>>> "file:///Users/zhongwei/the-dummy-udf.jar")
>>>>>>>>>
>>>>>>>>> # register the udf via
>>>>>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS
>>>>>>>>> 'com.dummy.dummyMap' LANGUAGE SCALA")
>>>>>>>>> # or register via the method
>>>>>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>>>>>>>>>
>>>>>>>>> # prepare source and sink
>>>>>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')],
>>>>>>>>> ['a', 'b', 'c'])
>>>>>>>>> st_env.execute_sql("""create table mySink (
>>>>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>>>>     ) with (
>>>>>>>>>         'connector' = 'print'
>>>>>>>>>     )""")
>>>>>>>>>
>>>>>>>>> # execute query
>>>>>>>>>
>>>>>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>>>>>>>>>
>>>>>>>>> Best,
>>>>>>>>> Wei
>>>>>>>>>
>>>>>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>>>>> 写道:
>>>>>>>>>
>>>>>>>>> Hi Wei,
>>>>>>>>>
>>>>>>>>> True, I'm using the method you mention, but glad to change.
>>>>>>>>> I tried your suggestion instead, but got a similar error.
>>>>>>>>>
>>>>>>>>> Thanks for your support. That is much more tedious than I thought.
>>>>>>>>>
>>>>>>>>> *Option 1 - SQL UDF*
>>>>>>>>>
>>>>>>>>> *SQL UDF*
>>>>>>>>> create_func_ddl = """
>>>>>>>>> CREATE FUNCTION dummyMap
>>>>>>>>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>>>>>>>>> """
>>>>>>>>>
>>>>>>>>> t_env.execute_sql(create_func_ddl)
>>>>>>>>>
>>>>>>>>> *Error*
>>>>>>>>> Py4JJavaError: An error occurred while calling o672.execute.
>>>>>>>>> : org.apache.flink.table.api.TableException: Result field does not
>>>>>>>>> match requested type. Requested: Row(s: String, t: String); Actual:
>>>>>>>>> GenericType<org.apache.flink.types.Row>
>>>>>>>>>
>>>>>>>>> *Option 2 *- *Overriding getResultType*
>>>>>>>>>
>>>>>>>>> Back to the old registering method, but overriding getResultType:
>>>>>>>>>
>>>>>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>>>>>>>>>
>>>>>>>>> *Scala UDF*
>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>
>>>>>>>>>   def eval(): Row = {
>>>>>>>>>
>>>>>>>>>       Row.of(java.lang.String.valueOf("foo"),
>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>
>>>>>>>>>   }
>>>>>>>>>
>>>>>>>>>   override def getResultType(signature: Array[Class[_]]):
>>>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>> *Error (on compilation)*
>>>>>>>>>
>>>>>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with
>>>>>>>>> alternatives:
>>>>>>>>> [error]   (x$1:
>>>>>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>>>>>>>>> <and>
>>>>>>>>> [error]   ()org.apache.flink.table.types.DataType <and>
>>>>>>>>> [error]   (x$1:
>>>>>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
>>>>>>>>> [error]  cannot be applied to
>>>>>>>>> (org.apache.flink.table.types.DataType,
>>>>>>>>> org.apache.flink.table.types.DataType)
>>>>>>>>> [error]   override def getResultType(signature: Array[Class[_]]):
>>>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>>>>>>>>> [error]
>>>>>>>>>                                 ^
>>>>>>>>> [error] one error found
>>>>>>>>> [error] (Compile / compileIncremental) Compilation failed
>>>>>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>>>>>>>>>
>>>>>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com>
>>>>>>>>> a écrit :
>>>>>>>>>
>>>>>>>>>> Hi Pierre,
>>>>>>>>>>
>>>>>>>>>> I guess your UDF is registered by the method
>>>>>>>>>> 'register_java_function' which uses the old type system. In this 
>>>>>>>>>> situation
>>>>>>>>>> you need to override the 'getResultType' method instead of adding 
>>>>>>>>>> type
>>>>>>>>>> hint.
>>>>>>>>>>
>>>>>>>>>> You can also try to register your UDF via the "CREATE FUNCTION"
>>>>>>>>>> sql statement, which accepts the type hint.
>>>>>>>>>>
>>>>>>>>>> Best,
>>>>>>>>>> Wei
>>>>>>>>>>
>>>>>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer <
>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>
>>>>>>>>>> Hi Wei,
>>>>>>>>>>
>>>>>>>>>> Thanks for your suggestion. Same error.
>>>>>>>>>>
>>>>>>>>>> *Scala UDF*
>>>>>>>>>>
>>>>>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>>   def eval(): Row = {
>>>>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>>   }
>>>>>>>>>> }
>>>>>>>>>>
>>>>>>>>>> Best regards,
>>>>>>>>>>
>>>>>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com>
>>>>>>>>>> a écrit :
>>>>>>>>>>
>>>>>>>>>>> Hi Pierre,
>>>>>>>>>>>
>>>>>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t
>>>>>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s 
>>>>>>>>>>> STRING,t
>>>>>>>>>>> STRING>”))'
>>>>>>>>>>>
>>>>>>>>>>> Best,
>>>>>>>>>>> Wei
>>>>>>>>>>>
>>>>>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <
>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>>
>>>>>>>>>>> Hi Dian, Community,
>>>>>>>>>>>
>>>>>>>>>>> (bringing the thread back to wider audience)
>>>>>>>>>>>
>>>>>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead
>>>>>>>>>>> of Map but also this simple case leads to a type mismatch
>>>>>>>>>>> between UDF and Table API.
>>>>>>>>>>> I've also tried other Map objects from Flink
>>>>>>>>>>> (table.data.MapData, flink.types.MapValue, 
>>>>>>>>>>> flink.table.api.DataTypes.MAP)
>>>>>>>>>>> in addition to Java (java.util.Map) in combination with
>>>>>>>>>>> DataTypeHint, without success.
>>>>>>>>>>> N.B. I'm using version 1.11.
>>>>>>>>>>>
>>>>>>>>>>> Am I doing something wrong or am I facing limitations in the
>>>>>>>>>>> toolkit ?
>>>>>>>>>>>
>>>>>>>>>>> Thanks in advance for your support !
>>>>>>>>>>>
>>>>>>>>>>> Best regards,
>>>>>>>>>>>
>>>>>>>>>>> *Scala UDF*
>>>>>>>>>>>
>>>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>>>
>>>>>>>>>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>>>>>>>>>  def eval(): Row = {
>>>>>>>>>>>
>>>>>>>>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>>>>>>>>> java.lang.String.valueOf("bar"))
>>>>>>>>>>>
>>>>>>>>>>>   }
>>>>>>>>>>> }
>>>>>>>>>>>
>>>>>>>>>>> *Table DDL*
>>>>>>>>>>>
>>>>>>>>>>> my_sink_ddl = f"""
>>>>>>>>>>>     create table mySink (
>>>>>>>>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>>>>>>>>     ) with (
>>>>>>>>>>>         ...
>>>>>>>>>>>     )
>>>>>>>>>>> """
>>>>>>>>>>>
>>>>>>>>>>> *Error*
>>>>>>>>>>>
>>>>>>>>>>> Py4JJavaError: An error occurred while calling o2.execute.
>>>>>>>>>>> : org.apache.flink.table.api.ValidationException: Field types of
>>>>>>>>>>> query result and registered TableSink
>>>>>>>>>>> `default_catalog`.`default_database`.`mySink` do not match.
>>>>>>>>>>> Query result schema: [output_of_my_scala_udf:
>>>>>>>>>>> GenericType<org.apache.flink.types.Row>]
>>>>>>>>>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t:
>>>>>>>>>>> String)]
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>>>>>>>>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Dian, but same error when using explicit returned type:
>>>>>>>>>>>>
>>>>>>>>>>>> class dummyMap() extends ScalarFunction {
>>>>>>>>>>>>
>>>>>>>>>>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>>>>>>>>>>
>>>>>>>>>>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>>>>>>>>>>
>>>>>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>>>>>>>>>>
>>>>>>>>>>>>   }
>>>>>>>>>>>> }
>>>>>>>>>>>>
>>>>>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com>
>>>>>>>>>>>> a écrit :
>>>>>>>>>>>>
>>>>>>>>>>>>> You need to explicitly defined the result type the UDF. You
>>>>>>>>>>>>> could refer to [1] for more details if you are using Flink 1.11. 
>>>>>>>>>>>>> If you are
>>>>>>>>>>>>> using other versions of Flink, you need to refer to the 
>>>>>>>>>>>>> corresponding
>>>>>>>>>>>>> documentation.
>>>>>>>>>>>>>
>>>>>>>>>>>>> [1]
>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>>>>>>>>>>
>>>>>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <
>>>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道:
>>>>>>>>>>>>>
>>>>>>>>>>>>> ScalarFunction
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Pierre
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Pierre
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Pierre
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Pierre
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Pierre
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Pierre
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Pierre
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Pierre
>>>>>
>>>> --
>>> Pierre
>>>
>>
>
> --
> Pierre
>

Reply via email to