There are multiple ways of having a more generic UDF. I will use pseudo
code here:
// supports any input
def eval(@DataTypeHint(inputGroup = ANY) Object o): String = {
}
// or you use no annotations at all and simply define a strategy
// default input strategy is wildcard
def eval(Map[Row, Integer]): String = {
...
}
override def getTypeInference() = {
TypeInference.newBuilder()
.outputStrategy(callContext => Optional.of(DataTypes.STRING()))
.build();
}
You can also check the logical type using the callContext in
outputStrategy().
Regards,
Timo
On 10.11.21 07:25, JING ZHANG wrote:
Hi, vtygoss
I add the UDF in my latest email in `AggregateITCase` of Flink master
branch, it works well.
I need some time to find out why it doesn't work in branch 1.12.0. I
would give you response as soon as possible.
Best regards,
JING zhang
vtygoss <vtyg...@126.com <mailto:vtyg...@126.com>> 于2021年11月10日周三
上午11:53写道:
Hi, JING ZHANG!
I tried not to define the `annotation` of eval method as possible,
but it didn't work with exception below. Then I found a way
to make it work by defining `annotation`.
- 1. is there a way to avoid to define `annotation`? It's very
important for generality of UDF.
- 2. I am using Flink 1.12.0, which version do you use to make it
work without `annotation`?
Best Regards!
```
def eval(data: java.util.Map[Row, Integer]):String = {
if (data ==null || data.size() ==0) {
return ""
}
data.keySet().toArray().mkString(",")
}
```
在 2021年11月10日 11:33,JING ZHANG<beyond1...@gmail.com
<mailto:beyond1...@gmail.com>> 写道:
Hi vtygoss,
I'm a little confused.
The UDF could already work well without defining `DataTypeHint
`annotation.
Why do you define `DataTypeHint `annotation before input parameter
of `eval `method?
Best,
JING ZHANG
vtygoss <vtyg...@126.com <mailto:vtyg...@126.com>> 于2021年11月9日周
二 下午8:17写道:
Hi, JING ZHANG!
Thanks for your many times of help.
I already try to use COLLECT(ROW(id, name)) and store the result
with type String(for POC test). So I try to define an UDF, and
the annotation of function eval must be defined as
"MULTISET<ROW<...>>" as below, otherwise exception "..RAW/MAP
expected. but MULTISET<ROW<`EXPR$0` STRING NOT NULL, `EXPR$1`
STRING, `EXPR$2` STRING, `EXPR$3` STRING, `EXPR$4` STRING> NOT
NULL> NOT NULL passed" thrown.
And i think this way of UDF's annotation maybe not a convenient
way for general scenario. I can't define many UDFs for all
RowData structures, such as Row<id>, Row<id, name>, Row<.....>
Is there any way to define the annotation for dynamic RowData
structure?
Thanks for your suggestions again.
Best Regards!
```
def eval(@DataTypeHint("MULTISET<ROW<" +
"vital_sign_id STRING, cdt_vital_sign_index STRING, " +
"unit_name STRING, parameter_value STRING, measure_datetime
STRING>" +
">") data: JMAP[Row, Integer]):String = {
if (data ==null || data.size() ==0) {
return ""
}
data.keySet().toArray().mkString(",")
}
```
在 2021年11月8日 21:26,JING ZHANG<beyond1...@gmail.com
<mailto:beyond1...@gmail.com>> 写道:
Hi Vtygoss,
You could try the following SQL:
```
select COLLECT(ROW(id, name)) as info
from table
group by ...;
```
In the above sql, the result type of `COLLECT(ROW(id, name))` is
MULTISET<ROW>.
`CollectAggFunction` would store the data in a MapState. key is
element type, represent the row value. value is Integer type,
represents the count of row.
If you need to define a UDF which handles the result from
`COLLECT(ROW(id, name))`, you could use Map<Row, Integer> as
input parameter type.
The following code is a demo. Hope it helps.
tEnv.registerFunction("TestFunc", TestFunc)
tEnv.sqlQuery("select TestFunc(COLLECT(ROW(id, name))) as info from
table
group by ...")
....
@SerialVersionUID(1L)
object TestFuncextends ScalarFunction {
def eval(s: java.util.Map[Row, Integer]):String =
s.keySet().mkString("\n")
}
Best regards,
JING ZHANG
vtygoss <vtyg...@126.com <mailto:vtyg...@126.com>> 于2021年11月8
日周一 下午7:00写道:
Hi, flink community!
I am working on migrating data production pipeline from
SparkSQL to FlinkSQL(1.12.0). And i meet a problem about
MULTISET<STRUCT<....>>.
```
Spark SQL
select COLLECT_LIST(named_struct('id', id, 'name', name)) as
info
from table
group by ...;
```
- 1. how to express and store this data structure in flink?
I tried to express by MULTISET<ROW<id long, name string>> in
FlinkSQL. But it seems that ORC / JSON / AVRO format cann't
store this type.
- 2. How to read MULTISET<Row<id long, name string>> in
FlinkSQL?
If i need to define a function, which type should be briged
to for MultiSet<Row<id long, name string>>?
Is there any other way more convenient to solute this problem?
Thanks very much for your any suggestions or replies.
Best Regards!