Hi Dmytro,
I would not recommend to use internal functions from
`org.apache.flink.table.planner.functions.aggfunctions`. They are called
by a slightly different stack that might cause this exception. Instead
you can use the testing functions in
`org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions`.
Your examples work in my 1.11 branch:
@Test
public void testWithCreateFunction() {
initInput();
String functionClass =
"org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions$WeightedAvg";
String createFunQuery = String.format("CREATE TEMPORARY
FUNCTION a AS '%s'", functionClass);
tableEnv.executeSql(createFunQuery);
tableEnv.createTemporaryView("B", tableEnv.from("A")
.groupBy($("symbol"))
.select($("symbol"), call("a",
$("price").cast(DataTypes.INT()), 12))
);
Table res = tableEnv.from("B");
res.execute().print();
}
@Test
public void testWithRegisterFunction() {
initInput();
String functionClass =
"org.apache.flink.table.planner.runtime.utils.JavaUserDefinedAggFunctions$WeightedAvg";
String createFunQuery = String.format("CREATE TEMPORARY
FUNCTION a AS '%s'", functionClass);
tableEnv.executeSql(createFunQuery);
Table res = tableEnv.sqlQuery("select a(CAST(price AS INT), 12)
as max_price from A group by symbol");
res.execute().print();
}
Regards,
Timo
On 28.07.20 17:20, Dmytro Dragan wrote:
Hi Timo,
I have switched to 1.11.1.
Create function using "create function ..." fails with magic:
Caused by: java.lang.IndexOutOfBoundsException: Index: 110, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:657)
at java.util.ArrayList.get(ArrayList.java:433)
at
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
at
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
at
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148)
at
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
at
org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:96)
at
org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:46)
at
org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
at
org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:63)
at
org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:709)
at
org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:714)
at
org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:401)
at
org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1392)
at
org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1356)
at
org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:383)
at SortAggregateWithKeys$79.processElement(Unknown Source)
"No match found for function signature fun(<NUMERIC>)" still exist.
Mentioned bug was about TableFunction, so maybe it is something different, but
related.
I have created a small github project with both cases:
https://github.com/dmytroDragan/flink-1.11-sql-agg-issue/blob/master/src/test/java/lets/test/flink/AggFunTest.java
I would appreciate if you could take a look.
On 27/07/2020, 16:49, "Timo Walther" <twal...@apache.org> wrote:
Hi Dmytro,
aggregate functions will support the new type system in Flink 1.12.
Until then, they cannot be used with the new `call()` syntax as
anonymous functions. In order to use the old type system, you need to
register the function explicilty using SQL `CREATE FUNCTION a AS
'myFunc'` and then use them in `call("myFunc", ...)`.
The mentioned "No match found for function signature fun(<NUMERIC>)" was
a bug that got fixed in 1.11.1:
https://issues.apache.org/jira/browse/FLINK-18520
This bug only exists for catalog functions, not temporary system functions.
Regards,
Timo
On 27.07.20 16:35, Dmytro Dragan wrote:
> Hi All,
>
> I see strange behavior of UDAF functions:
>
> Let`s say we have a simple table:
>
> EnvironmentSettings settings =
>
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
> TableEnvironment t = TableEnvironment./create/(settings);
>
> Table table = t.fromValues(DataTypes./ROW/(
> DataTypes./FIELD/("price", DataTypes./DOUBLE/().notNull()),
> DataTypes./FIELD/("symbol", DataTypes./STRING/().notNull())
> ),
> /row/(1.0, "S"), /row/(2.0, "S"));
> t.createTemporaryView("A", table);
>
> As example we will use build-in function with a new name:
>
> t.createTemporaryFunction("max_value", new
> MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction());
>
> Using Table API we can write:
>
> t.createTemporaryView("B", table
> .groupBy(/$/("symbol"))
> .select(/$/("symbol"),/call/("max_value", /$/("price")))
> );
>
> and get:
>
> org.apache.flink.table.api.TableException: Aggregate functions are not
> updated to the new type system yet.
>
> Using SQL API we can write:
>
> t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A
> group by symbol"));
>
> and get:
>
> org.apache.flink.table.api.ValidationException: SQL validation failed.
> From line 1, column 8 to line 1, column 23: No match found for function
> signature max_value(<NUMERIC>)
>
> Calling build-in max function instead of provided alias will produce
> correct results.
>
> In addition, non-retract implementation of max function
> (MaxAggFunction.DoubleMaxAggFunction) would produce:
>
> org.apache.flink.table.api.ValidationException: Could not register
> temporary catalog function 'default_catalog.default_database.max_value'
> due to implementation errors.
>
> Cause DoubleMaxAggFunction is not serializable.
>
> Am I missing something?
>