I'm currently converting our old code (based on Flink 1.6) to Flink 1.13 and encountered a strange problem about the user-defined aggregate function which takes BigDecimal as the parameter and output:
Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. An error occurred in the type inference logic of > function 'LAST_DECIMAL'. > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704) > at poc.flink.table.PocTimestampUdf.main(PocTimestampUdf.java:101) > Caused by: org.apache.flink.table.api.ValidationException: An error > occurred in the type inference logic of function 'LAST_DECIMAL'. > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) > at java.base/java.util.Optional.flatMap(Optional.java:294) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) > at > org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945) > at > org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) > ... 5 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a valid type inference for function class > 'poc.flink.table.LastDecimalAggFunction'. Please check for implementation > mistakes and/or provide a corresponding hint. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98) > at > org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212) > at > org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) > ... 16 more > Caused by: org.apache.flink.table.api.ValidationException: Error in > extracting a signature to output mapping. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161) > at > org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148) > ... 19 more > Caused by: org.apache.flink.table.api.ValidationException: Unable to > extract a type inference from method: > public void > poc.flink.table.LastDecimalAggFunction.accumulate(poc.flink.table.LastDecimalAccumulator,java.math.BigDecimal) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114) > ... 21 more > Caused by: org.apache.flink.table.api.ValidationException: Could not > extract a data type from 'class java.math.BigDecimal' in generic class > 'org.apache.flink.table.functions.AggregateFunction' in class > poc.flink.table.LastDecimalAggFunction. Please pass the required data type > manually or allow RAW types. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269) > at > org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169) > ... 22 more > Caused by: org.apache.flink.table.api.ValidationException: Values of > 'java.math.BigDecimal' need fixed precision and scale. > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) > at > org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractPredefinedType(DataTypeExtractor.java:398) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:276) > at > org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233) > ... 29 more > The program source code: package poc.flink.table; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.sql.Timestamp; import static org.apache.flink.table.api.Expressions.$; public class PocLastDecimalJob { private static final Logger LOGGER = LoggerFactory.getLogger(PocLastDecimalJob.class); public static class LastDecimalAccumulator extends Tuple1<BigDecimal> { public LastDecimalAccumulator(BigDecimal f0) { super(f0); } } public static class LastDecimalAggFunction extends AggregateFunction<BigDecimal, LastDecimalAccumulator> { @Override public BigDecimal getValue(LastDecimalAccumulator accumulator) { return accumulator.f0; } @Override public LastDecimalAccumulator createAccumulator() { return new LastDecimalAccumulator(null); } public void accumulate(LastDecimalAccumulator accumulator, @DataTypeHint("DECIMAL(38, 18)") BigDecimal value) { if (value != null) { accumulator.f0 = value; } } public void merge(LastDecimalAccumulator accumulator, Iterable<BigDecimal> iterable) { if (iterable != null) { for (BigDecimal item : iterable) { accumulator.f0 = item; } } } } public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); RowTypeInfo rowTypeInfo = new RowTypeInfo( new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC}, new String[] {"rowtime", "id", "val"}); DataStream<Row> dataStream = env.fromElements( Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")), Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")), Row.of(new Timestamp(3_300L), 2, new BigDecimal("3")) ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new LastDecimalAggFunction()); tableEnv.createTemporaryView("InputTable", dataStream, $("rowtime.rowtime"), $("id"), $("val")); Table resultTable = tableEnv.sqlQuery("" + "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " + "FROM InputTable " + "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' SECOND), id"); DataStream<Row> resultStream = tableEnv .toRetractStream(resultTable, new RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC)) .map((MapFunction<Tuple2<Boolean, Row>, Row>) value -> value.f1); resultStream.addSink(new SinkFunction<Row>() { @Override public void invoke(Row value, Context context) { LOGGER.info("SinkFunction.invoke(): value={}", value); } }); env.execute(); } }