I think Flink SQL does't support to use table aggregate function and you should use table api to use table aggregate function.
Best, Shengkai Guillermo <konstt2...@gmail.com> 于2025年3月14日周五 19:21写道: > I'm trying to execute the example to AggregationTable ( > https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/udfs/#table-aggregate-functions) > from FlinkSQL and I don't get it. > > I have copied the example, compiled and created the function. When I try > to execute it: > > Flink SQL> CREATE FUNCTION Top2 AS 'org.example.Top2'; > > Flink SQL> CREATE TABLE example_top ( > eventTimestamp BIGINT , > id INT, > data INT, > ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3), > WATERMARK FOR ts AS ts > ) WITH ( > 'connector' = 'kafka', > .. > > Flink SQL> SELECT e.id, Top2(e.data1) > FROM example_top AS e > GROUP BY id; > > [ERROR] Could not execute SQL statement. Reason: > java.lang.IllegalArgumentException: Type mismatch: > rel rowtype: RecordType(INTEGER msisdn, INTEGER EXPR$1) NOT NULL > equiv rowtype: RecordType(INTEGER msisdn, > *org.apache.flink.api.java.tuple.Tuple2(INTEGER, INTEGER)* EXPR$1) NOT NULL > Difference: > EXPR$1: INTEGER -> *org.apache.flink.api.java.tuple.Tuple2(INTEGER, > INTEGER)* > > How should I use the function? I tried many ways to call it. >