Hi Vtygoss,
You could try the following SQL:
```

select COLLECT(ROW(id, name)) as info

from table

group by ...;

```

In the above sql, the result type of `COLLECT(ROW(id, name))` is
MULTISET<ROW>.

`CollectAggFunction` would store the data in a MapState. key is element
type, represent the row value. value is Integer type, represents the count
of row.


If you need to define a UDF which handles the result from `COLLECT(ROW(id,
name))`, you could use Map<Row, Integer> as input parameter type.

The following code is a demo. Hope it helps.

tEnv.registerFunction("TestFunc", TestFunc)
tEnv.sqlQuery("select TestFunc(COLLECT(ROW(id, name))) as info from
table group by ...")

....

@SerialVersionUID(1L)
object TestFunc extends ScalarFunction {
  def eval(s: java.util.Map[Row, Integer]): String = s.keySet().mkString("\n")
}

Best regards,

JING ZHANG

vtygoss <vtyg...@126.com> 于2021年11月8日周一 下午7:00写道:

> Hi, flink community!
>
>
> I am working on migrating data production pipeline from SparkSQL to
> FlinkSQL(1.12.0). And i meet a problem about MULTISET<STRUCT<....>>.
>
>
> ```
>
> Spark SQL
>
>
> select COLLECT_LIST(named_struct('id', id, 'name', name)) as info
>
> from table
>
> group by ...;
>
>
> ```
>
>
> - 1. how to express and store this data structure in flink?
>
> I tried to express by MULTISET<ROW<id long, name string>> in FlinkSQL. But
> it seems that ORC / JSON / AVRO format cann't store this type.
>
> - 2.  How to read MULTISET<Row<id long, name string>> in FlinkSQL?
>
> If i need to define a function, which type should be briged to for
> MultiSet<Row<id long, name string>>?
>
>
> Is there any other way more convenient to solute this problem?
>
> Thanks very much for your any suggestions or replies.
>
>
> Best Regards!
>

Reply via email to