Hi Timo, 

I have switched to 1.11.1.

Create function using "create function ..." fails with magic:
Caused by: java.lang.IndexOutOfBoundsException: Index: 110, Size: 0
        at java.util.ArrayList.rangeCheck(ArrayList.java:657)
        at java.util.ArrayList.get(ArrayList.java:433)
        at 
com.esotericsoftware.kryo.util.MapReferenceResolver.getReadObject(MapReferenceResolver.java:42)
        at com.esotericsoftware.kryo.Kryo.readReferenceOrNull(Kryo.java:805)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:759)
        at 
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:346)
        at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:148)
        at 
org.apache.flink.api.common.typeutils.base.MapSerializer.deserialize(MapSerializer.java:43)
        at 
org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:96)
        at 
org.apache.flink.table.dataview.MapViewSerializer.deserialize(MapViewSerializer.java:46)
        at 
org.apache.flink.util.InstantiationUtil.deserializeFromByteArray(InstantiationUtil.java:536)
        at 
org.apache.flink.table.data.binary.BinaryRawValueData.toObject(BinaryRawValueData.java:63)
        at 
org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:709)
        at 
org.apache.flink.table.data.util.DataFormatConverters$GenericConverter.toExternalImpl(DataFormatConverters.java:714)
        at 
org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:401)
        at 
org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1392)
        at 
org.apache.flink.table.data.util.DataFormatConverters$PojoConverter.toExternalImpl(DataFormatConverters.java:1356)
        at 
org.apache.flink.table.data.util.DataFormatConverters$DataFormatConverter.toExternal(DataFormatConverters.java:383)
        at SortAggregateWithKeys$79.processElement(Unknown Source)

"No match found for function signature fun(<NUMERIC>)" still exist. 
Mentioned bug was about TableFunction, so maybe it is something different, but 
related.

I have created a small github project with both cases:
https://github.com/dmytroDragan/flink-1.11-sql-agg-issue/blob/master/src/test/java/lets/test/flink/AggFunTest.java
I would appreciate if you could take a look.


On 27/07/2020, 16:49, "Timo Walther" <twal...@apache.org> wrote:

    Hi Dmytro,
    
    aggregate functions will support the new type system in Flink 1.12. 
    Until then, they cannot be used with the new `call()` syntax as 
    anonymous functions. In order to use the old type system, you need to 
    register the function explicilty using SQL `CREATE FUNCTION a AS 
    'myFunc'` and then use them in `call("myFunc", ...)`.
    
    The mentioned "No match found for function signature fun(<NUMERIC>)" was 
    a bug that got fixed in 1.11.1:
    
    https://issues.apache.org/jira/browse/FLINK-18520
    
    This bug only exists for catalog functions, not temporary system functions.
    
    Regards,
    Timo
    
    
    On 27.07.20 16:35, Dmytro Dragan wrote:
    > Hi All,
    > 
    > I see strange behavior of UDAF functions:
    > 
    > Let`s say we have a simple table:
    > 
    > EnvironmentSettings settings = 
    > 
EnvironmentSettings./newInstance/().useBlinkPlanner().inBatchMode().build();
    > TableEnvironment t = TableEnvironment./create/(settings);
    > 
    > Table table = t.fromValues(DataTypes./ROW/(
    >          DataTypes./FIELD/("price", DataTypes./DOUBLE/().notNull()),
    > DataTypes./FIELD/("symbol", DataTypes./STRING/().notNull())
    >          ),
    > /row/(1.0, "S"), /row/(2.0, "S"));
    > t.createTemporaryView("A", table);
    > 
    > As example we will use build-in function with a new name:
    > 
    > t.createTemporaryFunction("max_value", new 
    > MaxWithRetractAggFunction.DoubleMaxWithRetractAggFunction());
    > 
    > Using Table API we can write:
    > 
    > t.createTemporaryView("B", table
    >          .groupBy(/$/("symbol"))
    >          .select(/$/("symbol"),/call/("max_value", /$/("price")))
    > );
    > 
    > and get:
    > 
    > org.apache.flink.table.api.TableException: Aggregate functions are not 
    > updated to the new type system yet.
    > 
    > Using SQL API we can write:
    > 
    > t.createTemporaryView("B", t.sqlQuery("select max_value(price) from A 
    > group by symbol"));
    > 
    > and get:
    > 
    > org.apache.flink.table.api.ValidationException: SQL validation failed. 
    >  From line 1, column 8 to line 1, column 23: No match found for function 
    > signature max_value(<NUMERIC>)
    > 
    > Calling build-in max function instead of provided alias will produce 
    > correct results.
    > 
    > In addition,  non-retract implementation of max function 
    > (MaxAggFunction.DoubleMaxAggFunction) would produce:
    > 
    > org.apache.flink.table.api.ValidationException: Could not register 
    > temporary catalog function 'default_catalog.default_database.max_value' 
    > due to implementation errors.
    > 
    > Cause DoubleMaxAggFunction is not serializable.
    > 
    > Am I missing something?
    > 
    
    
    

Reply via email to