Hi Wei,

Thanks for the hint. May I please follow up by adding more context and ask
for your guidance.

In case the bespoken Map[String,Any] object returned by Scala:

- Has a defined schema (incl. nested) with up to 100k (!) different
possible keys
- Has only some portion of the keys populated for each record
- Is convertible to JSON
- Has to undergo downstream processing in Flink and/or Python UDF with key
value access
- Has to be ultimately stored in a Kafka/AVRO sink

How would you declare the types explicitly in such a case ?

Thanks for your support !

Pierre

Le jeu. 19 nov. 2020 à 03:54, Wei Zhong <weizhong0...@gmail.com> a écrit :

> Hi Pierre,
>
> Currently there is no type hint like ‘Map[String, Any]’. The recommended
> way is declaring your type more explicitly.
>
> If you insist on doing this, you can try to declaring a RAW data type for
> java.util.HashMap [1], but you may encounter some troubles [2] related to
> the kryo serializers.
>
> Best,
> Wei
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/types.html#raw
> [2]
> https://stackoverflow.com/questions/28157236/kryo-serialization-with-nested-hashmap-with-custom-class
>
>
> 在 2020年11月19日,04:31,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>
> Hi Wei,
>
> It works ! Thanks a lot for your support.
> I hadn't tried this last combination for option 1, and I had wrong
> syntax for option 2.
>
> So to summarize..
>
> Methods working:
> - Current: DataTypeHint in UDF definition + SQL for UDF registering
> - Outdated: override getResultType in UDF definition
> + t_env.register_java_function for UDF registering
>
> Type conversions working:
> - scala.collection.immutable.Map[String,String] =>
> org.apache.flink.types.Row => ROW<STRING,STRING>
> - scala.collection.immutable.Map[String,String] =>
> java.util.Map[String,String] => MAP<STRING,STRING>
>
> Any hint for Map[String,Any] ?
>
> Best regards,
>
> Le mer. 18 nov. 2020 à 03:26, Wei Zhong <weizhong0...@gmail.com> a écrit :
>
>> Hi Pierre,
>>
>> Those 2 approaches all work in my local machine, this is my code:
>>
>> Scala UDF:
>>
>> package com.dummy
>>
>> import org.apache.flink.api.common.typeinfo.TypeInformation
>> import org.apache.flink.table.annotation.DataTypeHint
>> import org.apache.flink.table.api.Types
>> import org.apache.flink.table.functions.ScalarFunction
>> import org.apache.flink.types.Row
>>
>> /**
>>   * The scala UDF.
>>   */
>> class dummyMap extends ScalarFunction {
>>
>>   // If the udf would be registered by the SQL statement, you need add this 
>> typehint
>>   @DataTypeHint("ROW<s STRING,t STRING>")
>>   def eval(): Row = {
>>
>>     Row.of(java.lang.String.valueOf("foo"), java.lang.String.valueOf("bar"))
>>
>>   }
>>
>>   // If the udf would be registered by the method 'register_java_function', 
>> you need override this
>>   // method.
>>   override def getResultType(signature: Array[Class[_]]): TypeInformation[_] 
>> = {
>>     // The type of the return values should be TypeInformation
>>     Types.ROW(Array("s", "t"), Array[TypeInformation[_]](Types.STRING(), 
>> Types.STRING()))
>>   }
>> }
>>
>> Python code:
>>
>> from pyflink.datastream import StreamExecutionEnvironment
>> from pyflink.table import StreamTableEnvironment
>>
>> s_env = StreamExecutionEnvironment.get_execution_environment()
>> st_env = StreamTableEnvironment.create(s_env)
>>
>> # load the scala udf jar file, the path should be modified to yours
>> # or your can also load the jar file via other approaches
>> st_env.get_config().get_configuration().set_string("pipeline.jars", "
>> file:///Users/zhongwei/the-dummy-udf.jar")
>>
>> # register the udf via
>> st_env.execute_sql("CREATE FUNCTION dummyMap AS 'com.dummy.dummyMap'
>> LANGUAGE SCALA")
>> # or register via the method
>> # st_env.register_java_function("dummyMap", "com.dummy.dummyMap")
>>
>> # prepare source and sink
>> t = st_env.from_elements([(1, 'hi', 'hello'), (2, 'hi', 'hello')], ['a',
>> 'b', 'c'])
>> st_env.execute_sql("""create table mySink (
>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>     ) with (
>>         'connector' = 'print'
>>     )""")
>>
>> # execute query
>>
>> t.select("dummyMap()").execute_insert("mySink").get_job_client().get_job_execution_result().result()
>>
>> Best,
>> Wei
>>
>> 在 2020年11月18日,03:28,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>>
>> Hi Wei,
>>
>> True, I'm using the method you mention, but glad to change.
>> I tried your suggestion instead, but got a similar error.
>>
>> Thanks for your support. That is much more tedious than I thought.
>>
>> *Option 1 - SQL UDF*
>>
>> *SQL UDF*
>> create_func_ddl = """
>> CREATE FUNCTION dummyMap
>>   AS 'com.dummy.dummyMap' LANGUAGE SCALA
>> """
>>
>> t_env.execute_sql(create_func_ddl)
>>
>> *Error*
>> Py4JJavaError: An error occurred while calling o672.execute.
>> : org.apache.flink.table.api.TableException: Result field does not match
>> requested type. Requested: Row(s: String, t: String); Actual:
>> GenericType<org.apache.flink.types.Row>
>>
>> *Option 2 *- *Overriding getResultType*
>>
>> Back to the old registering method, but overriding getResultType:
>>
>> t_env.register_java_function("dummyMap","com.dummy.dummyMap")
>>
>> *Scala UDF*
>> class dummyMap() extends ScalarFunction {
>>
>>   def eval(): Row = {
>>
>>       Row.of(java.lang.String.valueOf("foo"),
>> java.lang.String.valueOf("bar"))
>>
>>   }
>>
>>   override def getResultType(signature: Array[Class[_]]):
>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>> }
>>
>> *Error (on compilation)*
>>
>> [error] dummyMap.scala:66:90: overloaded method value ROW with
>> alternatives:
>> [error]   (x$1:
>> org.apache.flink.table.api.DataTypes.AbstractField*)org.apache.flink.table.types.UnresolvedDataType
>> <and>
>> [error]   ()org.apache.flink.table.types.DataType <and>
>> [error]   (x$1:
>> org.apache.flink.table.api.DataTypes.Field*)org.apache.flink.table.types.DataType
>> [error]  cannot be applied to (org.apache.flink.table.types.DataType,
>> org.apache.flink.table.types.DataType)
>> [error]   override def getResultType(signature: Array[Class[_]]):
>> TypeInformation[_] = DataTypes.ROW(DataTypes.STRING,DataTypes.STRING)
>> [error]
>>                         ^
>> [error] one error found
>> [error] (Compile / compileIncremental) Compilation failed
>> [error] Total time: 3 s, completed 17 nov. 2020 à 20:00:01
>>
>> Le mar. 17 nov. 2020 à 14:01, Wei Zhong <weizhong0...@gmail.com> a
>> écrit :
>>
>>> Hi Pierre,
>>>
>>> I guess your UDF is registered by the method 'register_java_function'
>>> which uses the old type system. In this situation you need to override the
>>> 'getResultType' method instead of adding type hint.
>>>
>>> You can also try to register your UDF via the "CREATE FUNCTION" sql
>>> statement, which accepts the type hint.
>>>
>>> Best,
>>> Wei
>>>
>>> 在 2020年11月17日,19:29,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>>>
>>> Hi Wei,
>>>
>>> Thanks for your suggestion. Same error.
>>>
>>> *Scala UDF*
>>>
>>> @FunctionHint(output = new DataTypeHint("ROW<s STRING,t STRING>"))
>>> class dummyMap() extends ScalarFunction {
>>>   def eval(): Row = {
>>>     Row.of(java.lang.String.valueOf("foo"),
>>> java.lang.String.valueOf("bar"))
>>>   }
>>> }
>>>
>>> Best regards,
>>>
>>> Le mar. 17 nov. 2020 à 10:04, Wei Zhong <weizhong0...@gmail.com> a
>>> écrit :
>>>
>>>> Hi Pierre,
>>>>
>>>> You can try to replace the '@DataTypeHint("ROW<s STRING,t
>>>> STRING>")' with '@FunctionHint(output = new DataTypeHint("ROW<s STRING,t
>>>> STRING>”))'
>>>>
>>>> Best,
>>>> Wei
>>>>
>>>> 在 2020年11月17日,15:45,Pierre Oberholzer <pierre.oberhol...@gmail.com> 写道:
>>>>
>>>> Hi Dian, Community,
>>>>
>>>> (bringing the thread back to wider audience)
>>>>
>>>> As you suggested, I've tried to use DataTypeHint with Row instead of
>>>> Map but also this simple case leads to a type mismatch between UDF and
>>>> Table API.
>>>> I've also tried other Map objects from Flink (table.data.MapData,
>>>> flink.types.MapValue, flink.table.api.DataTypes.MAP) in addition to Java
>>>> (java.util.Map) in combination with DataTypeHint, without success.
>>>> N.B. I'm using version 1.11.
>>>>
>>>> Am I doing something wrong or am I facing limitations in the toolkit ?
>>>>
>>>> Thanks in advance for your support !
>>>>
>>>> Best regards,
>>>>
>>>> *Scala UDF*
>>>>
>>>> class dummyMap() extends ScalarFunction {
>>>>
>>>>  @DataTypeHint("ROW<s STRING,t STRING>")
>>>>  def eval(): Row = {
>>>>
>>>>     Row.of(java.lang.String.valueOf("foo"),
>>>> java.lang.String.valueOf("bar"))
>>>>
>>>>   }
>>>> }
>>>>
>>>> *Table DDL*
>>>>
>>>> my_sink_ddl = f"""
>>>>     create table mySink (
>>>>         output_of_my_scala_udf ROW<s STRING,t STRING>
>>>>     ) with (
>>>>         ...
>>>>     )
>>>> """
>>>>
>>>> *Error*
>>>>
>>>> Py4JJavaError: An error occurred while calling o2.execute.
>>>> : org.apache.flink.table.api.ValidationException: Field types of query
>>>> result and registered TableSink
>>>> `default_catalog`.`default_database`.`mySink` do not match.
>>>> Query result schema: [output_of_my_scala_udf:
>>>> GenericType<org.apache.flink.types.Row>]
>>>> TableSink schema:    [output_of_my_scala_udf: Row(s: String, t: String)]
>>>>
>>>>
>>>>
>>>> Le ven. 13 nov. 2020 à 11:59, Pierre Oberholzer <
>>>> pierre.oberhol...@gmail.com> a écrit :
>>>>
>>>>> Thanks Dian, but same error when using explicit returned type:
>>>>>
>>>>> class dummyMap() extends ScalarFunction {
>>>>>
>>>>>   def eval() : util.Map[java.lang.String,java.lang.String] = {
>>>>>
>>>>>     val states = Map("key1" -> "val1", "key2" -> "val2")
>>>>>     states.asInstanceOf[util.Map[java.lang.String,java.lang.String]]
>>>>>
>>>>>   }
>>>>> }
>>>>>
>>>>> Le ven. 13 nov. 2020 à 10:34, Dian Fu <dian0511...@gmail.com> a
>>>>> écrit :
>>>>>
>>>>>> You need to explicitly defined the result type the UDF. You could
>>>>>> refer to [1] for more details if you are using Flink 1.11. If you are 
>>>>>> using
>>>>>> other versions of Flink, you need to refer to the corresponding
>>>>>> documentation.
>>>>>>
>>>>>> [1]
>>>>>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/functions/udfs.html#implementation-guide
>>>>>>
>>>>>> 在 2020年11月13日,下午4:56,Pierre Oberholzer <pierre.oberhol...@gmail.com>
>>>>>> 写道:
>>>>>>
>>>>>> ScalarFunction
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Pierre
>>>>>
>>>>
>>>> --
>>>> Pierre
>>>>
>>>>
>>>>
>>>
>>> --
>>> Pierre
>>>
>>>
>>>
>>
>> --
>> Pierre
>>
>>
>>
>
> --
> Pierre
>
>
>

-- 
Pierre

Reply via email to