Hi, JING ZHANG! Thanks for your many times of help.
I already try to use COLLECT(ROW(id, name)) and store the result with type String(for POC test). So I try to define an UDF, and the annotation of function eval must be defined as "MULTISET<ROW<...>>" as below, otherwise exception "..RAW/MAP expected. but MULTISET<ROW<`EXPR$0` STRING NOT NULL, `EXPR$1` STRING, `EXPR$2` STRING, `EXPR$3` STRING, `EXPR$4` STRING> NOT NULL> NOT NULL passed" thrown. And i think this way of UDF's annotation maybe not a convenient way for general scenario. I can't define many UDFs for all RowData structures, such as Row<id>, Row<id, name>, Row<.....> Is there any way to define the annotation for dynamic RowData structure? Thanks for your suggestions again. Best Regards! ``` def eval(@DataTypeHint("MULTISET<ROW<" + "vital_sign_id STRING, cdt_vital_sign_index STRING, " + "unit_name STRING, parameter_value STRING, measure_datetime STRING>" + ">") data: JMAP[Row, Integer]): String = { if (data == null || data.size() == 0) { return "" } data.keySet().toArray().mkString(",") } ``` 在 2021年11月8日 21:26,JING ZHANG<beyond1...@gmail.com> 写道: Hi Vtygoss, You could try the following SQL: ``` select COLLECT(ROW(id, name)) as info from table group by ...; ``` In the above sql, the result type of `COLLECT(ROW(id, name))` is MULTISET<ROW>. `CollectAggFunction` would store the data in a MapState. key is element type, represent the row value. value is Integer type, represents the count of row. If you need to define a UDF which handles the result from `COLLECT(ROW(id, name))`, you could use Map<Row, Integer> as input parameter type. The following code is a demo. Hope it helps. tEnv.registerFunction("TestFunc", TestFunc) tEnv.sqlQuery("select TestFunc(COLLECT(ROW(id, name))) as info from table group by ...") .... @SerialVersionUID(1L) object TestFunc extends ScalarFunction { def eval(s: java.util.Map[Row, Integer]): String = s.keySet().mkString("\n") } Best regards, JING ZHANG vtygoss <vtyg...@126.com> 于2021年11月8日周一 下午7:00写道: Hi, flink community! I am working on migrating data production pipeline from SparkSQL to FlinkSQL(1.12.0). And i meet a problem about MULTISET<STRUCT<....>>. ``` Spark SQL select COLLECT_LIST(named_struct('id', id, 'name', name)) as info from table group by ...; ``` - 1. how to express and store this data structure in flink? I tried to express by MULTISET<ROW<id long, name string>> in FlinkSQL. But it seems that ORC / JSON / AVRO format cann't store this type. - 2. How to read MULTISET<Row<id long, name string>> in FlinkSQL? If i need to define a function, which type should be briged to for MultiSet<Row<id long, name string>>? Is there any other way more convenient to solute this problem? Thanks very much for your any suggestions or replies. Best Regards!