Hi Vtygoss, You could try the following SQL: ``` select COLLECT(ROW(id, name)) as info
from table group by ...; ``` In the above sql, the result type of `COLLECT(ROW(id, name))` is MULTISET<ROW>. `CollectAggFunction` would store the data in a MapState. key is element type, represent the row value. value is Integer type, represents the count of row. If you need to define a UDF which handles the result from `COLLECT(ROW(id, name))`, you could use Map<Row, Integer> as input parameter type. The following code is a demo. Hope it helps. tEnv.registerFunction("TestFunc", TestFunc) tEnv.sqlQuery("select TestFunc(COLLECT(ROW(id, name))) as info from table group by ...") .... @SerialVersionUID(1L) object TestFunc extends ScalarFunction { def eval(s: java.util.Map[Row, Integer]): String = s.keySet().mkString("\n") } Best regards, JING ZHANG vtygoss <vtyg...@126.com> 于2021年11月8日周一 下午7:00写道: > Hi, flink community! > > > I am working on migrating data production pipeline from SparkSQL to > FlinkSQL(1.12.0). And i meet a problem about MULTISET<STRUCT<....>>. > > > ``` > > Spark SQL > > > select COLLECT_LIST(named_struct('id', id, 'name', name)) as info > > from table > > group by ...; > > > ``` > > > - 1. how to express and store this data structure in flink? > > I tried to express by MULTISET<ROW<id long, name string>> in FlinkSQL. But > it seems that ORC / JSON / AVRO format cann't store this type. > > - 2. How to read MULTISET<Row<id long, name string>> in FlinkSQL? > > If i need to define a function, which type should be briged to for > MultiSet<Row<id long, name string>>? > > > Is there any other way more convenient to solute this problem? > > Thanks very much for your any suggestions or replies. > > > Best Regards! >