Hi, Jad.
IIUC, TableAggregateFunfunction has not been supported in SQL. The original 
Flip[1] only implements it in Table API. You can send an email to dev maillist 
for more detail and create an improvement jira[2] for it.


[1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=97552739
[2] https://issues.apache.org/jira/projects/FLINK/issues



--

    Best!
    Xuyang




在 2024-03-08 03:12:19,"Jad Naous" <j...@grepr.ai> 写道:

Hi Junrui,
Thank you for the pointer. I had read that page, and I can use the function 
with the Java Table API ok, but I'm trying to use the Top2 accumulator with a 
SQL function. I can't use a left lateral join on it since the planner fails 
with "not a table function". I don't think a join is the right thing anyway, 
since it's an aggregation table function.


    tEnv.createTemporaryFunction("TOP2", Top2.class);

    var calculated2 = tEnv.sqlQuery(
        "SELECT " +
            "  TUMBLE_START(ts, INTERVAL '1' SECOND) as w_start, " +
            "  TUMBLE_END(ts, INTERVAL '1' SECOND) as w_end, " +
            "  TUMBLE_ROWTIME(ts, INTERVAL '1' SECOND) as w_rowtime, " +
            "  id, " +
            "  top1, " +
            "  top2 " +
            "FROM " +
            "  source " +
            "  LEFT JOIN LATERAL TABLE(TOP2(val)) ON TRUE " +
            "GROUP BY " +
            "  TUMBLE(ts, INTERVAL '1' SECOND), " +
            "  id"
    ).printExplain();



Gives the following:


   org.apache.flink.table.api.ValidationException: SQL validation failed. 
Function 'default_catalog.default_database.TOP2' cannot be used as a table 
function.
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:200)
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:117)
        at 
org.apache.flink.table.planner.operations.SqlNodeToOperationConversion.convert(SqlNodeToOperationConversion.java:261)
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106)
        at 
app//org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:708)
        at 
app//io.grepr.query.MetricsTableApiTest.test(MetricsTableApiTest.java:129)
        Caused by:
        org.apache.flink.table.api.ValidationException: Function 
'default_catalog.default_database.TOP2' cannot be used as a table function.
            at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.verifyFunctionKind(FunctionCatalogOperatorTable.java:200)
            at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:133)
            at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:126)
            at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
            at java.base@11.0.22/java.util.Optional.flatMap(Optional.java:294)
            at 
org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:100)
            at 
org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:69)
            at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1310)
            at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
            at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
            at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
            at 
org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1296)
            at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:993)
            at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:749)
            at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:196)
 


Jad Naous

Grepr, CEO/Founder




ᐧ


On Thu, Mar 7, 2024 at 9:43 AM Junrui Lee <jrlee....@gmail.com> wrote:

Hi Jad,

You can refer to the CREATE FUNCTION section 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/create/#create-function)
 and the Table Aggregate Functions section 
(https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/udfs/#table-aggregate-functions)
 for details on creating and using these functions.

Best regards,

Junrui


Jad Naous <j...@grepr.ai> 于2024年3月7日周四 22:19写道:

Hi,
The docs don't mention the correct syntax for using UDTAGGs in SQL. Is it 
possible to use them with SQL?

Thanks,
Jad Naous

Grepr, CEO/Founder


ᐧ

Reply via email to