I think Flink SQL does't support to use table aggregate function and you
should use table api to use table aggregate function.

Best,
Shengkai

Guillermo <konstt2...@gmail.com> 于2025年3月14日周五 19:21写道:

> I'm trying to execute the example to AggregationTable (
> https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/udfs/#table-aggregate-functions)
> from FlinkSQL and I don't get it.
>
> I have copied the example, compiled and created the function. When I try
> to execute it:
>
> Flink SQL> CREATE FUNCTION Top2 AS 'org.example.Top2';
>
> Flink SQL> CREATE TABLE example_top (
> eventTimestamp BIGINT ,
> id INT,
> data INT,
> ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
> WATERMARK FOR ts AS ts
> ) WITH (
> 'connector' = 'kafka',
> ..
>
> Flink SQL> SELECT e.id, Top2(e.data1)
> FROM example_top AS e
> GROUP BY id;
>
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalArgumentException: Type mismatch:
> rel rowtype: RecordType(INTEGER msisdn, INTEGER EXPR$1) NOT NULL
> equiv rowtype: RecordType(INTEGER msisdn,
> *org.apache.flink.api.java.tuple.Tuple2(INTEGER, INTEGER)* EXPR$1) NOT NULL
> Difference:
> EXPR$1: INTEGER -> *org.apache.flink.api.java.tuple.Tuple2(INTEGER,
> INTEGER)*
>
> How should I use the function? I tried many ways to call it.
>

Reply via email to