I'm trying to execute the example to AggregationTable ( https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/udfs/#table-aggregate-functions) from FlinkSQL and I don't get it.
I have copied the example, compiled and created the function. When I try to execute it: Flink SQL> CREATE FUNCTION Top2 AS 'org.example.Top2'; Flink SQL> CREATE TABLE example_top ( eventTimestamp BIGINT , id INT, data INT, ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), WATERMARK FOR ts AS ts ) WITH ( 'connector' = 'kafka', .. Flink SQL> SELECT e.id, Top2(e.data1) FROM example_top AS e GROUP BY id; [ERROR] Could not execute SQL statement. Reason: java.lang.IllegalArgumentException: Type mismatch: rel rowtype: RecordType(INTEGER msisdn, INTEGER EXPR$1) NOT NULL equiv rowtype: RecordType(INTEGER msisdn, *org.apache.flink.api.java.tuple.Tuple2(INTEGER, INTEGER)* EXPR$1) NOT NULL Difference: EXPR$1: INTEGER -> *org.apache.flink.api.java.tuple.Tuple2(INTEGER, INTEGER)* How should I use the function? I tried many ways to call it.