[ 
https://issues.apache.org/jira/browse/FLINK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jianhui Dong closed FLINK-30849.
--------------------------------
    Resolution: Invalid

> udaf validated failed with TableEnvironment#executeSql but work correctly 
> with StreamTableEnrivorment#registerFunction
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-30849
>                 URL: https://issues.apache.org/jira/browse/FLINK-30849
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core, Table SQL / API
>            Reporter: Jianhui Dong
>            Priority: Major
>         Attachments: MultiAggToJsonArrayV2.java
>
>
> We have a udaf which has some overloaded methods and it can work in flink 
> 1.12.2 with deprecated api StreamTableEnrivorment#registerFunction, but when 
> use TableEnvironment#executeSql in flink 1.12.2 or flink 1.16, it will throw 
> an exception as follows:
> {code:java}
> Caused by: org.apache.flink.table.api.ValidationException: Considering all 
> hints, the method should comply with the signature:
> accumulate(_, java.lang.String, java.lang.Object, java.lang.Object)
>     at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
>     at 
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
>     at 
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.createMethodNotFoundError(FunctionMappingExtractor.java:535)
>  {code}
> In flink 1.16, the method
> <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> 
> aggregateFunction); does noe exist anymore, and I want to how to rewrite the 
> udaf to make it works.
> The test code is as follows: 
> {code:java}
> TableEnvironment tableEnvironment = 
> StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment());
> //        tableEnvironment.registerFunction("MultiAggToJsonArrayV2", new 
> MultiAggToJsonArrayV2());
>         tableEnvironment.executeSql("CREATE FUNCTION `MultiAggToJsonArrayV2` 
> AS 'com.sankuai.flink.streaming.udf.MultiAggToJsonArrayV2'");
>         tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test`(`a` 
> VARCHAR,`b` INTEGER,`c` VARCHAR,`d` VARCHAR) WITH " +
>                 "('connector'='datagen')\n");
>         tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test_sink`(`a` 
> VARCHAR,`res` VARCHAR) WITH ('connector'='blackhole')\n" );
>         tableEnvironment.executeSql("INSERT INTO `grocery_udf_test_sink` 
> SELECT `a`, `MultiAggToJsonArrayV2`('b', '', '', '') AS `res` FROM 
> `grocery_udf_test` GROUP BY `a`"); {code}
> [^MultiAggToJsonArrayV2.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to