Hi All,

I see strange behavior of UDAF functions:

Let`s say we have a simple table:
EnvironmentSettings settings = 
EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment t = TableEnvironment.create(settings);


Table table = t.fromValues(DataTypes.ROW(
        DataTypes.FIELD("price", DataTypes.DOUBLE().notNull()),
        DataTypes.FIELD("symbol", DataTypes.STRING().notNull())
        ),
        row(1.0, "S"), row(2.0, "S"));
t.createTemporaryView("A", table);

As example we will use build-in function with a new name:

t.createTemporaryFunction("max_value", new 
MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction());

Using Table API we can write:

t.createTemporaryView("B", table
        .groupBy($("symbol"))
        .select($("symbol"),call("max_value", $("price")))
);
and get:
org.apache.flink.table.api.TableException: Aggregate functions are not updated 
to the new type system yet.

Using SQL API we can write:

t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A group by 
symbol"));
and get:
org.apache.flink.table.api.ValidationException: SQL validation failed. From 
line 1, column 8 to line 1, column 23: No match found for function signature 
max_value(<NUMERIC>)

Calling build-in max function instead of provided alias will produce correct 
results.

In addition,  non-retract implementation of max function 
(MaxAggFunction.DoubleMaxAggFunction) would produce:
org.apache.flink.table.api.ValidationException: Could not register temporary 
catalog function 'default_catalog.default_database.max_value' due to 
implementation errors.
Cause DoubleMaxAggFunction is not serializable.

Am I missing something?




Reply via email to