Hi Xingbo, Community, Thanks a lot for your support. May I finally ask to conclude this thread, including wider audience: - Are serious performance issues to be expected with 100k fields per ROW (i.e. due solely to metadata overhead and independently of queries logic) ? - In sparse population (say 99% sparsity) already optimized in the ROW object or are sparse types on your roadmap ? Any experience with sparse Table from other users (including benchmarks vs. other frameworks) are also highly welcome.
Thanks ! Best Le jeu. 3 déc. 2020 à 02:53, Xingbo Huang <hxbks...@gmail.com> a écrit : > Hi Pierre, > > This example is written based on the syntax of release-1.12 that is about > to be released, and the test passed. In release-1.12, input_type can be > omitted and expression can be used directly. If you are using release-1.11, > you only need to modify the grammar of udf used slightly according to the > udf documentation[1]. > > The flink table connector supports avro format, please refer to the > document[2]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions > [2] > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/formats/avro.html#avro-format > > Best, > Xingbo > > Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月3日周四 上午2:57写道: > >> Hi Xingbo, >> >> Nice ! This looks a bit hacky, but shows that it can be done ;) >> >> I just got an exception preventing me running your code, apparently from >> udf.py: >> >> TypeError: Invalid input_type: input_type should be DataType but contains >> None >> >> Can you pls check again ? >> If the schema is defined is a .avsc file, do we have to parse it and >> rebuild those syntax (ddl and udf) and or is there an existing component >> that could be used ? >> >> Thanks a lot ! >> >> Best, >> >> >> Le mer. 2 déc. 2020 à 04:50, Xingbo Huang <hxbks...@gmail.com> a écrit : >> >>> Hi Pierre, >>> >>> I wrote a PyFlink implementation, you can see if it meets your needs: >>> >>> >>> from pyflink.datastream import StreamExecutionEnvironment >>> from pyflink.table import StreamTableEnvironment, EnvironmentSettings, >>> DataTypes >>> from pyflink.table.udf import udf >>> >>> >>> def test(): >>> env = StreamExecutionEnvironment.get_execution_environment() >>> env.set_parallelism(1) >>> t_env = StreamTableEnvironment.create(env, >>> >>> environment_settings=EnvironmentSettings.new_instance() >>> >>> .in_streaming_mode().use_blink_planner().build()) >>> >>> t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", >>> '80m') >>> >>> # 10k nested columns >>> num_field = 10_000 >>> fields = ['f%s INT' % i for i in range(num_field)] >>> field_str = ','.join(fields) >>> t_env.execute_sql(f""" >>> CREATE TABLE source_table ( >>> f0 BIGINT, >>> f1 DECIMAL(32,2), >>> f2 ROW<${field_str}>, >>> f3 TIMESTAMP(3) >>> ) WITH ( >>> 'connector' = 'datagen', >>> 'number-of-rows' = '2' >>> ) >>> """) >>> >>> t_env.execute_sql(f""" >>> CREATE TABLE print_table ( >>> f0 BIGINT, >>> f1 DECIMAL(32,2), >>> f2 ROW<${field_str}>, >>> f3 TIMESTAMP(3) >>> ) WITH ( >>> 'connector' = 'print' >>> ) >>> """) >>> result_type = DataTypes.ROW( >>> [DataTypes.FIELD("f%s" % i, DataTypes.INT()) for i in >>> range(num_field)]) >>> >>> func = udf(lambda x: x, result_type=result_type) >>> >>> source = t_env.from_path("source_table") >>> result = source.select(source.f0, source.f1, func(source.f2), >>> source.f3) >>> result.execute_insert("print_table") >>> >>> >>> if __name__ == '__main__': >>> test() >>> >>> >>> Best, >>> Xingbo >>> >>> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 下午6:10写道: >>> >>>> Hi Xingbo, >>>> >>>> That would mean giving up on using Flink (table) features on the >>>> content of the parsed JSON objects, so definitely a big loss. Let me know >>>> if I missed something. >>>> >>>> Thanks ! >>>> >>>> Le mar. 1 déc. 2020 à 07:26, Xingbo Huang <hxbks...@gmail.com> a >>>> écrit : >>>> >>>>> Hi Pierre, >>>>> >>>>> Have you ever thought of declaring your entire json as a string field >>>>> in `Table` and putting the parsing work in UDF? >>>>> >>>>> Best, >>>>> Xingbo >>>>> >>>>> Pierre Oberholzer <pierre.oberhol...@gmail.com> 于2020年12月1日周二 >>>>> 上午4:13写道: >>>>> >>>>>> Hi Xingbo, >>>>>> >>>>>> Many thanks for your follow up. Yes you got it right. >>>>>> So using Table API and a ROW object for the nested output of my UDF, >>>>>> and since types are mandatory, I guess this boils down to: >>>>>> - How to nicely specify the types for the 100k fields : shall I use >>>>>> TypeInformation [1] or better retrieve it from Schema Registry [2] ? >>>>>> - Do I have to put NULL values for all the fields that don't have a >>>>>> value in my JSON ? >>>>>> - Will the resulting Table be "sparse" and suffer performance >>>>>> limitations ? >>>>>> Let me know if Table API and ROW are the right candidates here, or if >>>>>> other better alternatives exist. >>>>>> As said I'd be glad to apply some downstream transformations using >>>>>> key,value access (and possibly some Table <-> Pandas operations). Hope >>>>>> that >>>>>> doesn't make it a too long wish list ;) >>>>>> >>>>>> Thanks a lot ! >>>>>> >>>>>> Best regards, >>>>>> >>>>>> [1] >>>>>> https://stackoverflow.com/questions/48696875/how-to-attach-schema-to-a-flink-datastream-on-the-fly >>>>>> [2] >>>>>> https://docs.cloudera.com/csa/1.2.0/datastream-connectors/topics/csa-schema-registry.html >>>>>> >>>>>> Le sam. 28 nov. 2020 à 04:04, Xingbo Huang <hxbks...@gmail.com> a >>>>>> écrit : >>>>>> >>>>>>> Hi Pierre, >>>>>>> >>>>>>> Sorry for the late reply. >>>>>>> Your requirement is that your `Table` has a `field` in `Json` format >>>>>>> and its key has reached 100k, and then you want to use such a `field` as >>>>>>> the input/output of `udf`, right? As to whether there is a limit on the >>>>>>> number of nested key, I am not quite clear. Other contributors with >>>>>>> experience in this area may have answers. On the part of `Python UDF`, >>>>>>> if >>>>>>> the type of key or value of your `Map` is `Any`, we do not support it >>>>>>> now. >>>>>>> You need to specify a specific type. For more information, please refer >>>>>>> to >>>>>>> the related document[1]. >>>>>>> >>>>>>> [1] >>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/python/table-api-users-guide/python_types.html >>>>>>> >>>>>>> Best, >>>>>>> Xingbo >>>>>>> >>>>>>> 2020年11月28日 上午12:49,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>> 写道: >>>>>>> >>>>>>> Hello Wei, Dian, Xingbo, >>>>>>> >>>>>>> Not really sure when it is appropriate to knock on the door of the >>>>>>> community ;) >>>>>>> I just wanted to mention that your feedback on the above topic will >>>>>>> be highly appreciated as it will condition the choice of framework on >>>>>>> our >>>>>>> side for the months to come, and potentially help the community to cover >>>>>>> sparse data with Flink. >>>>>>> >>>>>>> Thanks a lot ! >>>>>>> >>>>>>> Have a great week-end >>>>>>> >>>>>>> Best, >>>>>>> >>>>>>> Le ven. 20 nov. 2020 à 10:11, Pierre Oberholzer < >>>>>>> pierre.oberhol...@gmail.com> a écrit : >>>>>>> >>>>>>>> Hi Wei, >>>>>>>> >>>>>>>> Thanks for the hint. May I please follow up by adding more context >>>>>>>> and ask for your guidance. >>>>>>>> >>>>>>>> In case the bespoken Map[String,Any] object returned by Scala: >>>>>>>> >>>>>>>> - Has a defined schema (incl. nested) with up to 100k (!) different >>>>>>>> possible keys >>>>>>>> - Has only some portion of the keys populated for each record >>>>>>>> - Is convertible to JSON >>>>>>>> - Has to undergo downstream processing in Flink and/or Python UDF >>>>>>>> with key value access >>>>>>>> - Has to be ultimately stored in a Kafka/AVRO sink >>>>>>>> >>>>>>>> How would you declare the types explicitly in such a case ? >>>>>>>> >>>>>>>> Thanks for your support ! >>>>>>>> >>>>>>>> Pierre >>>>>>>> >>>>>>>> Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a >>>>>>>> écrit : >>>>>>>> >>>>>>>>> Hi Pierre, >>>>>>>>> >>>>>>>>> Currently there is no type hint like ‘Map[String, Any]’. The >>>>>>>>> recommended way is declaring your type more explicitly. >>>>>>>>> >>>>>>>>> If you insist on doing this, you can try to declaring a RAW data >>>>>>>>> type for java.util.HashMap [1], but you may encounter some troubles >>>>>>>>> [2] >>>>>>>>> related to the kryo serializers. >>>>>>>>> >>>>>>>>> Best, >>>>>>>>> Wei >>>>>>>>> >>>>>>>>> [1] >>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw >>>>>>>>> [2] >>>>>>>>> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class >>>>>>>>> >>>>>>>>> >>>>>>>>> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> >>>>>>>>> 写道: >>>>>>>>> >>>>>>>>> Hi Wei, >>>>>>>>> >>>>>>>>> It works ! Thanks a lot for your support. >>>>>>>>> I hadn't tried this last combination for option 1, and I had wrong >>>>>>>>> syntax for option 2. >>>>>>>>> >>>>>>>>> So to summarize.. >>>>>>>>> >>>>>>>>> Methods working: >>>>>>>>> - Current: DataTypeHint in UDF definition + SQL for UDF registering >>>>>>>>> - Outdated: override getResultType in UDF definition >>>>>>>>> + t_env.register_java_function for UDF registering >>>>>>>>> >>>>>>>>> Type conversions working: >>>>>>>>> - scala.collection.immutable.Map[String,String] => >>>>>>>>> org.apache.flink.types.Row => ROW<STRING,STRING> >>>>>>>>> - scala.collection.immutable.Map[String,String] => >>>>>>>>> java.util.Map[String,String] => MAP<STRING,STRING> >>>>>>>>> >>>>>>>>> Any hint for Map[String,Any] ? >>>>>>>>> >>>>>>>>> Best regards, >>>>>>>>> >>>>>>>>> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> >>>>>>>>> a écrit : >>>>>>>>> >>>>>>>>>> Hi Pierre, >>>>>>>>>> >>>>>>>>>> Those 2 approaches all work in my local machine, this is my code: >>>>>>>>>> >>>>>>>>>> Scala UDF: >>>>>>>>>> >>>>>>>>>> package com.dummy >>>>>>>>>> >>>>>>>>>> import org.apache.flink.api.common.typeinfo.TypeInformation >>>>>>>>>> import org.apache.flink.table.annotation.DataTypeHint >>>>>>>>>> import org.apache.flink.table.api.Types >>>>>>>>>> import org.apache.flink.table.functions.ScalarFunction >>>>>>>>>> import org.apache.flink.types.Row >>>>>>>>>> >>>>>>>>>> /** >>>>>>>>>> * The scala UDF. >>>>>>>>>> */ >>>>>>>>>> class dummyMap extends ScalarFunction { >>>>>>>>>> >>>>>>>>>> // If the udf would be registered by the SQL statement, you need >>>>>>>>>> add this typehint >>>>>>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>>>>>> def eval(): Row = { >>>>>>>>>> >>>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> // If the udf would be registered by the method >>>>>>>>>> 'register_java_function', you need override this >>>>>>>>>> // method. >>>>>>>>>> override def getResultType(signature: Array[Class[_]]): >>>>>>>>>> TypeInformation[_] = { >>>>>>>>>> // The type of the return values should be TypeInformation >>>>>>>>>> Types.ROW(Array("s", "t"), >>>>>>>>>> Array[TypeInformation[_]](Types.STRING(), Types.STRING())) >>>>>>>>>> } >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> Python code: >>>>>>>>>> >>>>>>>>>> from pyflink.datastream import StreamExecutionEnvironment >>>>>>>>>> from pyflink.table import StreamTableEnvironment >>>>>>>>>> >>>>>>>>>> s_env = StreamExecutionEnvironment.get_execution_environment() >>>>>>>>>> st_env = StreamTableEnvironment.create(s_env) >>>>>>>>>> >>>>>>>>>> # load the scala udf jar file, the path should be modified to >>>>>>>>>> yours >>>>>>>>>> # or your can also load the jar file via other approaches >>>>>>>>>> st_env.get_config().get_configuration().set_string("pipeline.jars", >>>>>>>>>> "file:///Users/zhongwei/the-dummy-udf.jar") >>>>>>>>>> >>>>>>>>>> # register the udf via >>>>>>>>>> st_env.execute_sql("CREATE FUNCTION dummyMap AS >>>>>>>>>> 'com.dummy.dummyMap' LANGUAGE SCALA") >>>>>>>>>> # or register via the method >>>>>>>>>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap") >>>>>>>>>> >>>>>>>>>> # prepare source and sink >>>>>>>>>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', >>>>>>>>>> 'hello')], ['a', 'b', 'c']) >>>>>>>>>> st_env.execute_sql("""create table mySink ( >>>>>>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>>>>>> ) with ( >>>>>>>>>> 'connector' = 'print' >>>>>>>>>> )""") >>>>>>>>>> >>>>>>>>>> # execute query >>>>>>>>>> >>>>>>>>>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result() >>>>>>>>>> >>>>>>>>>> Best, >>>>>>>>>> Wei >>>>>>>>>> >>>>>>>>>> 在 2020年11月18日,03:28,Pierre Oberholzer < >>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>> >>>>>>>>>> Hi Wei, >>>>>>>>>> >>>>>>>>>> True, I'm using the method you mention, but glad to change. >>>>>>>>>> I tried your suggestion instead, but got a similar error. >>>>>>>>>> >>>>>>>>>> Thanks for your support. That is much more tedious than I thought. >>>>>>>>>> >>>>>>>>>> *Option 1 - SQL UDF* >>>>>>>>>> >>>>>>>>>> *SQL UDF* >>>>>>>>>> create_func_ddl = """ >>>>>>>>>> CREATE FUNCTION dummyMap >>>>>>>>>> AS 'com.dummy.dummyMap' LANGUAGE SCALA >>>>>>>>>> """ >>>>>>>>>> >>>>>>>>>> t_env.execute_sql(create_func_ddl) >>>>>>>>>> >>>>>>>>>> *Error* >>>>>>>>>> Py4JJavaError: An error occurred while calling o672.execute. >>>>>>>>>> : org.apache.flink.table.api.TableException: Result field does >>>>>>>>>> not match requested type. Requested: Row(s: String, t: String); >>>>>>>>>> Actual: >>>>>>>>>> GenericType<org.apache.flink.types.Row> >>>>>>>>>> >>>>>>>>>> *Option 2 *- *Overriding getResultType* >>>>>>>>>> >>>>>>>>>> Back to the old registering method, but overriding getResultType: >>>>>>>>>> >>>>>>>>>> t_env.register_java_function("dummyMap","com.dummy.dummyMap") >>>>>>>>>> >>>>>>>>>> *Scala UDF* >>>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>>> >>>>>>>>>> def eval(): Row = { >>>>>>>>>> >>>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>>> >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> override def getResultType(signature: Array[Class[_]]): >>>>>>>>>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>>>>>>> } >>>>>>>>>> >>>>>>>>>> *Error (on compilation)* >>>>>>>>>> >>>>>>>>>> [error] dummyMap.scala:66:90: overloaded method value ROW with >>>>>>>>>> alternatives: >>>>>>>>>> [error] (x$1: >>>>>>>>>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType >>>>>>>>>> <and> >>>>>>>>>> [error] ()org.apache.flink.table.types.DataType <and> >>>>>>>>>> [error] (x$1: >>>>>>>>>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType >>>>>>>>>> [error] cannot be applied to >>>>>>>>>> (org.apache.flink.table.types.DataType, >>>>>>>>>> org.apache.flink.table.types.DataType) >>>>>>>>>> [error] override def getResultType(signature: >>>>>>>>>> Array[Class[_]]): TypeInformation[_] = >>>>>>>>>> DataTypes.ROW(DataTypes.STRING,DataTypes.STRING) >>>>>>>>>> [error] >>>>>>>>>> ^ >>>>>>>>>> [error] one error found >>>>>>>>>> [error] (Compile / compileIncremental) Compilation failed >>>>>>>>>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01 >>>>>>>>>> >>>>>>>>>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> >>>>>>>>>> a écrit : >>>>>>>>>> >>>>>>>>>>> Hi Pierre, >>>>>>>>>>> >>>>>>>>>>> I guess your UDF is registered by the method >>>>>>>>>>> 'register_java_function' which uses the old type system. In this >>>>>>>>>>> situation >>>>>>>>>>> you need to override the 'getResultType' method instead of adding >>>>>>>>>>> type >>>>>>>>>>> hint. >>>>>>>>>>> >>>>>>>>>>> You can also try to register your UDF via the "CREATE FUNCTION" >>>>>>>>>>> sql statement, which accepts the type hint. >>>>>>>>>>> >>>>>>>>>>> Best, >>>>>>>>>>> Wei >>>>>>>>>>> >>>>>>>>>>> 在 2020年11月17日,19:29,Pierre Oberholzer < >>>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>>> >>>>>>>>>>> Hi Wei, >>>>>>>>>>> >>>>>>>>>>> Thanks for your suggestion. Same error. >>>>>>>>>>> >>>>>>>>>>> *Scala UDF* >>>>>>>>>>> >>>>>>>>>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t >>>>>>>>>>> STRING>")) >>>>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>>>> def eval(): Row = { >>>>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>>>> } >>>>>>>>>>> } >>>>>>>>>>> >>>>>>>>>>> Best regards, >>>>>>>>>>> >>>>>>>>>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> >>>>>>>>>>> a écrit : >>>>>>>>>>> >>>>>>>>>>>> Hi Pierre, >>>>>>>>>>>> >>>>>>>>>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t >>>>>>>>>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s >>>>>>>>>>>> STRING,t >>>>>>>>>>>> STRING>”))' >>>>>>>>>>>> >>>>>>>>>>>> Best, >>>>>>>>>>>> Wei >>>>>>>>>>>> >>>>>>>>>>>> 在 2020年11月17日,15:45,Pierre Oberholzer < >>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>>>> >>>>>>>>>>>> Hi Dian, Community, >>>>>>>>>>>> >>>>>>>>>>>> (bringing the thread back to wider audience) >>>>>>>>>>>> >>>>>>>>>>>> As you suggested, I've tried to use DataTypeHint with Row instead >>>>>>>>>>>> of Map but also this simple case leads to a type mismatch >>>>>>>>>>>> between UDF and Table API. >>>>>>>>>>>> I've also tried other Map objects from Flink >>>>>>>>>>>> (table.data.MapData, flink.types.MapValue, >>>>>>>>>>>> flink.table.api.DataTypes.MAP) >>>>>>>>>>>> in addition to Java (java.util.Map) in combination with >>>>>>>>>>>> DataTypeHint, without success. >>>>>>>>>>>> N.B. I'm using version 1.11. >>>>>>>>>>>> >>>>>>>>>>>> Am I doing something wrong or am I facing limitations in the >>>>>>>>>>>> toolkit ? >>>>>>>>>>>> >>>>>>>>>>>> Thanks in advance for your support ! >>>>>>>>>>>> >>>>>>>>>>>> Best regards, >>>>>>>>>>>> >>>>>>>>>>>> *Scala UDF* >>>>>>>>>>>> >>>>>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>>>>> >>>>>>>>>>>> @DataTypeHint("ROW<s STRING,t STRING>") >>>>>>>>>>>> def eval(): Row = { >>>>>>>>>>>> >>>>>>>>>>>> Row.of(java.lang.String.valueOf("foo"), >>>>>>>>>>>> java.lang.String.valueOf("bar")) >>>>>>>>>>>> >>>>>>>>>>>> } >>>>>>>>>>>> } >>>>>>>>>>>> >>>>>>>>>>>> *Table DDL* >>>>>>>>>>>> >>>>>>>>>>>> my_sink_ddl = f""" >>>>>>>>>>>> create table mySink ( >>>>>>>>>>>> output_of_my_scala_udf ROW<s STRING,t STRING> >>>>>>>>>>>> ) with ( >>>>>>>>>>>> ... >>>>>>>>>>>> ) >>>>>>>>>>>> """ >>>>>>>>>>>> >>>>>>>>>>>> *Error* >>>>>>>>>>>> >>>>>>>>>>>> Py4JJavaError: An error occurred while calling o2.execute. >>>>>>>>>>>> : org.apache.flink.table.api.ValidationException: Field types >>>>>>>>>>>> of query result and registered TableSink >>>>>>>>>>>> `default_catalog`.`default_database`.`mySink` do not match. >>>>>>>>>>>> Query result schema: [output_of_my_scala_udf: >>>>>>>>>>>> GenericType<org.apache.flink.types.Row>] >>>>>>>>>>>> TableSink schema: [output_of_my_scala_udf: Row(s: String, t: >>>>>>>>>>>> String)] >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer < >>>>>>>>>>>> pierre.oberhol...@gmail.com> a écrit : >>>>>>>>>>>> >>>>>>>>>>>>> Thanks Dian, but same error when using explicit returned type: >>>>>>>>>>>>> >>>>>>>>>>>>> class dummyMap() extends ScalarFunction { >>>>>>>>>>>>> >>>>>>>>>>>>> def eval() : util.Map[java.lang.String,java.lang.String] = { >>>>>>>>>>>>> >>>>>>>>>>>>> val states = Map("key1" -> "val1", "key2" -> "val2") >>>>>>>>>>>>> >>>>>>>>>>>>> states.asInstanceOf[util.Map[java.lang.String,java.lang.String]] >>>>>>>>>>>>> >>>>>>>>>>>>> } >>>>>>>>>>>>> } >>>>>>>>>>>>> >>>>>>>>>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> >>>>>>>>>>>>> a écrit : >>>>>>>>>>>>> >>>>>>>>>>>>>> You need to explicitly defined the result type the UDF. You >>>>>>>>>>>>>> could refer to [1] for more details if you are using Flink 1.11. >>>>>>>>>>>>>> If you are >>>>>>>>>>>>>> using other versions of Flink, you need to refer to the >>>>>>>>>>>>>> corresponding >>>>>>>>>>>>>> documentation. >>>>>>>>>>>>>> >>>>>>>>>>>>>> [1] >>>>>>>>>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide >>>>>>>>>>>>>> >>>>>>>>>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer < >>>>>>>>>>>>>> pierre.oberhol...@gmail.com> 写道: >>>>>>>>>>>>>> >>>>>>>>>>>>>> ScalarFunction >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Pierre >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Pierre >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Pierre >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> -- >>>>>>>>>> Pierre >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Pierre >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Pierre >>>>>>>> >>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Pierre >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> -- >>>>>> Pierre >>>>>> >>>>> -- >>>> Pierre >>>> >>> >> >> -- >> Pierre >> > -- Pierre