Jianhui Dong created FLINK-30849:
------------------------------------

             Summary: udaf validated failed with TableEnvironment#executeSql 
but work correctly with StreamTableEnrivorment#registerFunction
                 Key: FLINK-30849
                 URL: https://issues.apache.org/jira/browse/FLINK-30849
             Project: Flink
          Issue Type: Bug
          Components: API / Core, Table SQL / API
            Reporter: Jianhui Dong
         Attachments: MultiAggToJsonArrayV2.java

We have a udaf which has some overloaded methods and it can work in flink 
1.12.2 with deprecated api StreamTableEnrivorment#registerFunction, but when 
use TableEnvironment#executeSql in flink 1.12.2 or flink 1.16, it will throw an 
exception as follows:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Considering all 
hints, the method should comply with the signature:
accumulate(_, java.lang.String, java.lang.Object, java.lang.Object)
    at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
    at 
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
    at 
org.apache.flink.table.types.extraction.FunctionMappingExtractor.createMethodNotFoundError(FunctionMappingExtractor.java:535)
 {code}
In flink 1.16, the method
<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC> 
aggregateFunction); does noe exist anymore, and I want to how to rewrite the 
udaf to make it works.
The test code is as follows: 
{code:java}
TableEnvironment tableEnvironment = 
StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment());
//        tableEnvironment.registerFunction("MultiAggToJsonArrayV2", new 
MultiAggToJsonArrayV2());
        tableEnvironment.executeSql("CREATE FUNCTION `MultiAggToJsonArrayV2` AS 
'com.sankuai.flink.streaming.udf.MultiAggToJsonArrayV2'");
        tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test`(`a` 
VARCHAR,`b` INTEGER,`c` VARCHAR,`d` VARCHAR) WITH " +
                "('connector'='datagen')\n");
        tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test_sink`(`a` 
VARCHAR,`res` VARCHAR) WITH ('connector'='blackhole')\n" );
        tableEnvironment.executeSql("INSERT INTO `grocery_udf_test_sink` SELECT 
`a`, `MultiAggToJsonArrayV2`('b', '', '', '') AS `res` FROM `grocery_udf_test` 
GROUP BY `a`"); {code}
[^MultiAggToJsonArrayV2.java]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to