Jianhui Dong created FLINK-30849:
------------------------------------
Summary: udaf validated failed with TableEnvironment#executeSql
but work correctly with StreamTableEnrivorment#registerFunction
Key: FLINK-30849
URL: https://issues.apache.org/jira/browse/FLINK-30849
Project: Flink
Issue Type: Bug
Components: API / Core, Table SQL / API
Reporter: Jianhui Dong
Attachments: MultiAggToJsonArrayV2.java
We have a udaf which has some overloaded methods and it can work in flink
1.12.2 with deprecated api StreamTableEnrivorment#registerFunction, but when
use TableEnvironment#executeSql in flink 1.12.2 or flink 1.16, it will throw an
exception as follows:
{code:java}
Caused by: org.apache.flink.table.api.ValidationException: Considering all
hints, the method should comply with the signature:
accumulate(_, java.lang.String, java.lang.Object, java.lang.Object)
at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:333)
at
org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:328)
at
org.apache.flink.table.types.extraction.FunctionMappingExtractor.createMethodNotFoundError(FunctionMappingExtractor.java:535)
{code}
In flink 1.16, the method
<T, ACC> void registerFunction(String name, AggregateFunction<T, ACC>
aggregateFunction); does noe exist anymore, and I want to how to rewrite the
udaf to make it works.
The test code is as follows:
{code:java}
TableEnvironment tableEnvironment =
StreamTableEnvironment.create(StreamExecutionEnvironment.getExecutionEnvironment());
// tableEnvironment.registerFunction("MultiAggToJsonArrayV2", new
MultiAggToJsonArrayV2());
tableEnvironment.executeSql("CREATE FUNCTION `MultiAggToJsonArrayV2` AS
'com.sankuai.flink.streaming.udf.MultiAggToJsonArrayV2'");
tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test`(`a`
VARCHAR,`b` INTEGER,`c` VARCHAR,`d` VARCHAR) WITH " +
"('connector'='datagen')\n");
tableEnvironment.executeSql("CREATE TABLE `grocery_udf_test_sink`(`a`
VARCHAR,`res` VARCHAR) WITH ('connector'='blackhole')\n" );
tableEnvironment.executeSql("INSERT INTO `grocery_udf_test_sink` SELECT
`a`, `MultiAggToJsonArrayV2`('b', '', '', '') AS `res` FROM `grocery_udf_test`
GROUP BY `a`"); {code}
[^MultiAggToJsonArrayV2.java]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)