I'm trying to execute the example to AggregationTable (
https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/table/functions/udfs/#table-aggregate-functions)
from FlinkSQL and I don't get it.

I have copied the example, compiled and created the function. When I try to
execute it:

Flink SQL> CREATE FUNCTION Top2 AS 'org.example.Top2';

Flink SQL> CREATE TABLE example_top (
eventTimestamp BIGINT ,
id INT,
data INT,
ts AS TO_TIMESTAMP_LTZ(eventTimestamp, 3),
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'kafka',
..

Flink SQL> SELECT e.id, Top2(e.data1)
FROM example_top AS e
GROUP BY id;

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: Type mismatch:
rel rowtype: RecordType(INTEGER msisdn, INTEGER EXPR$1) NOT NULL
equiv rowtype: RecordType(INTEGER msisdn,
*org.apache.flink.api.java.tuple.Tuple2(INTEGER, INTEGER)* EXPR$1) NOT NULL
Difference:
EXPR$1: INTEGER -> *org.apache.flink.api.java.tuple.Tuple2(INTEGER,
INTEGER)*

How should I use the function? I tried many ways to call it.

Reply via email to