Hi vtygoss,
I'm a little confused.
The UDF could already work well without defining `DataTypeHint `annotation.
Why do you define `DataTypeHint `annotation before input parameter of
`eval `method?

Best,
JING ZHANG

vtygoss <vtyg...@126.com> 于2021年11月9日周二 下午8:17写道:

> Hi, JING ZHANG!
>
> Thanks for your many times of help.
>
>
> I already try to use COLLECT(ROW(id, name)) and store the result with type
> String(for POC test).  So I try to define an UDF, and the annotation of
> function eval must be defined as "MULTISET<ROW<...>>" as below, otherwise
> exception "..RAW/MAP expected. but MULTISET<ROW<`EXPR$0` STRING NOT NULL,
> `EXPR$1` STRING, `EXPR$2` STRING, `EXPR$3` STRING, `EXPR$4` STRING> NOT
> NULL> NOT NULL passed" thrown.
>
>
> And i think this way of UDF's annotation maybe not a convenient way for
> general scenario. I can't define many UDFs for all RowData structures, such
> as Row<id>, Row<id, name>, Row<.....>
>
>
> Is there any way to define the annotation for dynamic RowData structure?
>
> Thanks for your suggestions again.
>
>
> Best Regards!
>
>
> ```
>
> def eval(@DataTypeHint("MULTISET<ROW<" +
>   "vital_sign_id STRING, cdt_vital_sign_index STRING, " +
>   "unit_name STRING, parameter_value STRING, measure_datetime STRING>" +
>   ">") data: JMAP[Row, Integer]): String = {
>   if (data == null || data.size() == 0) {
>     return ""
>   }
>   data.keySet().toArray().mkString(",")
> }
>
> ```
>
>
>
>
>
> 在 2021年11月8日 21:26,JING ZHANG<beyond1...@gmail.com> 写道:
>
> Hi Vtygoss,
> You could try the following SQL:
> ```
>
> select COLLECT(ROW(id, name)) as info
>
> from table
>
> group by ...;
>
> ```
>
> In the above sql, the result type of `COLLECT(ROW(id, name))` is
> MULTISET<ROW>.
>
> `CollectAggFunction` would store the data in a MapState. key is element
> type, represent the row value. value is Integer type, represents the count
> of row.
>
>
> If you need to define a UDF which handles the result from `COLLECT(ROW(id,
> name))`, you could use Map<Row, Integer> as input parameter type.
>
> The following code is a demo. Hope it helps.
>
> tEnv.registerFunction("TestFunc", TestFunc)
> tEnv.sqlQuery("select TestFunc(COLLECT(ROW(id, name))) as info from table 
> group by ...")
>
> ....
>
> @SerialVersionUID(1L)
> object TestFunc extends ScalarFunction {
>   def eval(s: java.util.Map[Row, Integer]): String = s.keySet().mkString("\n")
> }
>
> Best regards,
>
> JING ZHANG
>
> vtygoss <vtyg...@126.com> 于2021年11月8日周一 下午7:00写道:
>
>> Hi, flink community!
>>
>>
>> I am working on migrating data production pipeline from SparkSQL to
>> FlinkSQL(1.12.0). And i meet a problem about MULTISET<STRUCT<....>>.
>>
>>
>> ```
>>
>> Spark SQL
>>
>>
>> select COLLECT_LIST(named_struct('id', id, 'name', name)) as info
>>
>> from table
>>
>> group by ...;
>>
>>
>> ```
>>
>>
>> - 1. how to express and store this data structure in flink?
>>
>> I tried to express by MULTISET<ROW<id long, name string>> in FlinkSQL.
>> But it seems that ORC / JSON / AVRO format cann't store this type.
>>
>> - 2.  How to read MULTISET<Row<id long, name string>> in FlinkSQL?
>>
>> If i need to define a function, which type should be briged to for
>> MultiSet<Row<id long, name string>>?
>>
>>
>> Is there any other way more convenient to solute this problem?
>>
>> Thanks very much for your any suggestions or replies.
>>
>>
>> Best Regards!
>>
>

Reply via email to