Hi vtygoss, I'm a little confused. The UDF could already work well without defining `DataTypeHint `annotation. Why do you define `DataTypeHint `annotation before input parameter of `eval `method?
Best, JING ZHANG vtygoss <vtyg...@126.com> 于2021年11月9日周二 下午8:17写道: > Hi, JING ZHANG! > > Thanks for your many times of help. > > > I already try to use COLLECT(ROW(id, name)) and store the result with type > String(for POC test). So I try to define an UDF, and the annotation of > function eval must be defined as "MULTISET<ROW<...>>" as below, otherwise > exception "..RAW/MAP expected. but MULTISET<ROW<`EXPR$0` STRING NOT NULL, > `EXPR$1` STRING, `EXPR$2` STRING, `EXPR$3` STRING, `EXPR$4` STRING> NOT > NULL> NOT NULL passed" thrown. > > > And i think this way of UDF's annotation maybe not a convenient way for > general scenario. I can't define many UDFs for all RowData structures, such > as Row<id>, Row<id, name>, Row<.....> > > > Is there any way to define the annotation for dynamic RowData structure? > > Thanks for your suggestions again. > > > Best Regards! > > > ``` > > def eval(@DataTypeHint("MULTISET<ROW<" + > "vital_sign_id STRING, cdt_vital_sign_index STRING, " + > "unit_name STRING, parameter_value STRING, measure_datetime STRING>" + > ">") data: JMAP[Row, Integer]): String = { > if (data == null || data.size() == 0) { > return "" > } > data.keySet().toArray().mkString(",") > } > > ``` > > > > > > 在 2021年11月8日 21:26,JING ZHANG<beyond1...@gmail.com> 写道: > > Hi Vtygoss, > You could try the following SQL: > ``` > > select COLLECT(ROW(id, name)) as info > > from table > > group by ...; > > ``` > > In the above sql, the result type of `COLLECT(ROW(id, name))` is > MULTISET<ROW>. > > `CollectAggFunction` would store the data in a MapState. key is element > type, represent the row value. value is Integer type, represents the count > of row. > > > If you need to define a UDF which handles the result from `COLLECT(ROW(id, > name))`, you could use Map<Row, Integer> as input parameter type. > > The following code is a demo. Hope it helps. > > tEnv.registerFunction("TestFunc", TestFunc) > tEnv.sqlQuery("select TestFunc(COLLECT(ROW(id, name))) as info from table > group by ...") > > .... > > @SerialVersionUID(1L) > object TestFunc extends ScalarFunction { > def eval(s: java.util.Map[Row, Integer]): String = s.keySet().mkString("\n") > } > > Best regards, > > JING ZHANG > > vtygoss <vtyg...@126.com> 于2021年11月8日周一 下午7:00写道: > >> Hi, flink community! >> >> >> I am working on migrating data production pipeline from SparkSQL to >> FlinkSQL(1.12.0). And i meet a problem about MULTISET<STRUCT<....>>. >> >> >> ``` >> >> Spark SQL >> >> >> select COLLECT_LIST(named_struct('id', id, 'name', name)) as info >> >> from table >> >> group by ...; >> >> >> ``` >> >> >> - 1. how to express and store this data structure in flink? >> >> I tried to express by MULTISET<ROW<id long, name string>> in FlinkSQL. >> But it seems that ORC / JSON / AVRO format cann't store this type. >> >> - 2. How to read MULTISET<Row<id long, name string>> in FlinkSQL? >> >> If i need to define a function, which type should be briged to for >> MultiSet<Row<id long, name string>>? >> >> >> Is there any other way more convenient to solute this problem? >> >> Thanks very much for your any suggestions or replies. >> >> >> Best Regards! >> >