[ https://issues.apache.org/jira/browse/FLINK-30849?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jianhui Dong closed FLINK-30849. -------------------------------- Resolution: Invalid > udaf validated failed with TableEnvironment#executeSql but work correctly > with StreamTableEnrivorment#registerFunction > ---------------------------------------------------------------------------------------------------------------------- > > Key: FLINK-30849 > URL: https://issues.apache.org/jira/browse/FLINK-30849 > Project: Flink > Issue Type: Bug > Components: API / Core, Table SQL / API > Reporter: Jianhui Dong > Priority: Major > Attachments: MultiAggToJsonArrayV2.java > > > We have a udaf which has some overloaded methods and it can work in flink > 1.12.2 with deprecated api StreamTableEnrivorment#registerFunction, but when > use TableEnvironment#executeSql in flink 1.12.2 or flink 1.16, it will throw > an exception as follows: > {code:java} > Caused by: org.apache.flink.table.api.ValidationException: Considering all > hints, the method should comply with the signature: > accumulate(_, java.lang.String, java.lang.Object, java.lang.Object) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.createMethodNotFoundError(FunctionMappingExtractor.java:535) > {code} > In flink 1.16, the method > <T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> > aggregateFunction); does noe exist anymore, and I want to how to rewrite the > udaf to make it works. > The test code is as follows: > {code:java} > TableEnvironment tableEnvironment = > StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment()); > // tableEnvironment.registerFunction("MultiAggToJsonArrayV2", new > MultiAggToJsonArrayV2()); > tableEnvironment.executeSql("CREATE FUNCTION `MultiAggToJsonArrayV2` > AS 'com.sankuai.flink.streaming.udf.MultiAggToJsonArrayV2'"); > tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test`(`a` > VARCHAR,`b` INTEGER,`c` VARCHAR,`d` VARCHAR) WITH " + > "('connector'='datagen')\n"); > tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test_sink`(`a` > VARCHAR,`res` VARCHAR) WITH ('connector'='blackhole')\n" ); > tableEnvironment.executeSql("INSERT INTO `grocery_udf_test_sink` > SELECT `a`, `MultiAggToJsonArrayV2`('b', '', '', '') AS `res` FROM > `grocery_udf_test` GROUP BY `a`"); {code} > [^MultiAggToJsonArrayV2.java] -- This message was sent by Atlassian Jira (v8.20.10#820010)