I actually have already specified the data type int, but that doesn't work:
public void accumulate(LastDecimalAccumulator accumulator, > @DataTypeHint("DECIMAL(38, 18)") BigDecimal value) > { > if (value != null) { > accumulator.f0 = value; > } > } > I did some experiments, and the result is: *1. The accumulator has also to be specified by a type hint (I think the documents didn't say that)* *2. FunctionHint did the work, DataTypeHint did not.* This is the code snippet: @FunctionHint( > input = {@DataTypeHint("DECIMAL(38,18)")}, > accumulator = @DataTypeHint(value = "RAW", bridgedTo = > LastDecimalAccumulator.class), > output = @DataTypeHint("DECIMAL(38,18)") > ) > public static class LastDecimalAggFunction extends > AggregateFunction<BigDecimal, LastDecimalAccumulator> { > > @Override > public > // @DataTypeHint("DECIMAL(38,18)") > BigDecimal getValue(LastDecimalAccumulator accumulator) { > return accumulator.f0; > } > > @Override > public > // @DataTypeHint(value = "RAW", bridgedTo = > LastDecimalAccumulator.class) > LastDecimalAccumulator createAccumulator() { > return new LastDecimalAccumulator(null); > } > > public void accumulate( > // @DataTypeHint(value = "RAW", bridgedTo = > LastDecimalAccumulator.class) > LastDecimalAccumulator accumulator, > // @DataTypeHint("DECIMAL(38,18)") > BigDecimal value) { > if (value != null) { > accumulator.f0 = value; > } > } > *The whole program:* package poc.flink.table; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple1; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.flink.table.annotation.DataTypeHint; import org.apache.flink.table.annotation.FunctionHint; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.types.Row; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.math.BigDecimal; import java.sql.Timestamp; import static org.apache.flink.table.api.Expressions.$; public class PocLastDecimalJob { private static final Logger LOGGER = LoggerFactory.getLogger(PocLastDecimalJob.class); public static class LastDecimalAccumulator extends Tuple1<BigDecimal> { public LastDecimalAccumulator() { f0 = null; } public LastDecimalAccumulator(BigDecimal f0) { super(f0); } } @FunctionHint( input = {@DataTypeHint("DECIMAL(38,18)")}, accumulator = @DataTypeHint(value = "RAW", bridgedTo = LastDecimalAccumulator.class), output = @DataTypeHint("DECIMAL(38,18)") ) public static class LastDecimalAggFunction extends AggregateFunction<BigDecimal, LastDecimalAccumulator> { @Override public // @DataTypeHint("DECIMAL(38,18)") BigDecimal getValue(LastDecimalAccumulator accumulator) { return accumulator.f0; } @Override public // @DataTypeHint(value = "RAW", bridgedTo = LastDecimalAccumulator.class) LastDecimalAccumulator createAccumulator() { return new LastDecimalAccumulator(null); } public void accumulate( // @DataTypeHint(value = "RAW", bridgedTo = LastDecimalAccumulator.class) LastDecimalAccumulator accumulator, // @DataTypeHint("DECIMAL(38,18)") BigDecimal value) { if (value != null) { accumulator.f0 = value; } } public void merge(LastDecimalAccumulator accumulator, Iterable<LastDecimalAccumulator> iterable) { if (iterable != null) { for (LastDecimalAccumulator item : iterable) { accumulator.f0 = item.f0; } } } } public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); RowTypeInfo rowTypeInfo = new RowTypeInfo( new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC}, new String[] {"ts", "id", "val"}); DataStream<Row> dataStream = env.fromElements( Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")), Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")), Row.of(new Timestamp(3_300L), 2, new BigDecimal("3")) ).assignTimestampsAndWatermarks(WatermarkStrategy .<Row>forMonotonousTimestamps() .withTimestampAssigner(((element, recordTimestamp) -> ((Timestamp) element.getField(0)).getTime())) ).returns(rowTypeInfo); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new LastDecimalAggFunction()); tableEnv.createTemporaryView("InputTable", dataStream, $("ts").rowtime(), $("id"), $("val")); Table resultTable = tableEnv.sqlQuery("" + "SELECT id, LAST_DECIMAL(val) " + "FROM InputTable " + "GROUP BY HOP(ts, INTERVAL '1' SECOND, INTERVAL '1' SECOND), id"); DataStream<Row> resultStream = tableEnv .toRetractStream(resultTable, new RowTypeInfo(Types.INT, Types.BIG_DEC)) .map((MapFunction<Tuple2<Boolean, Row>, Row>) value -> value.f1); resultStream.addSink(new SinkFunction<Row>() { @Override public void invoke(Row value, Context context) { LOGGER.info("SinkFunction.invoke(): value={}", value); } }); env.execute(); } } Ingo Bürk <i...@ververica.com> 于2021年7月30日周五 下午1:51写道: > Hi, > > for BigDecimal you need to specify a type hint to define the precision and > scale. For example, look at [1][2] and search for BigDecimal. Can you try > with that? > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/types/ > [2] > https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/table/functions/udfs/ > > > Best > Ingo > > On Fri, Jul 30, 2021 at 5:12 AM LIU Xiao <liuxiaogen...@gmail.com> wrote: > >> sorry for a little error, the program code should be: >> >> package poc.flink.table; >> >> import org.apache.flink.api.common.eventtime.WatermarkStrategy; >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.common.typeinfo.Types; >> import org.apache.flink.api.java.tuple.Tuple1; >> import org.apache.flink.api.java.tuple.Tuple2; >> import org.apache.flink.api.java.typeutils.RowTypeInfo; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import org.apache.flink.streaming.api.functions.sink.SinkFunction; >> import org.apache.flink.table.annotation.DataTypeHint; >> import org.apache.flink.table.api.Table; >> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >> import org.apache.flink.table.functions.AggregateFunction; >> import org.apache.flink.types.Row; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> import java.math.BigDecimal; >> import java.sql.Timestamp; >> >> import static org.apache.flink.table.api.Expressions.$; >> >> public class PocLastDecimalJob { >> private static final Logger LOGGER = >> LoggerFactory.getLogger(PocLastDecimalJob.class); >> >> public static class LastDecimalAccumulator extends Tuple1<BigDecimal> { >> public LastDecimalAccumulator(BigDecimal f0) { >> super(f0); >> } >> } >> >> public static class LastDecimalAggFunction extends >> AggregateFunction<BigDecimal, LastDecimalAccumulator> { >> >> @Override >> public BigDecimal getValue(LastDecimalAccumulator accumulator) { >> return accumulator.f0; >> } >> >> @Override >> public LastDecimalAccumulator createAccumulator() { >> return new LastDecimalAccumulator(null); >> } >> >> public void accumulate(LastDecimalAccumulator accumulator, >> @DataTypeHint("DECIMAL(38, 18)") BigDecimal >> value) { >> if (value != null) { >> accumulator.f0 = value; >> } >> } >> >> public void merge(LastDecimalAccumulator accumulator, >> Iterable<BigDecimal> iterable) { >> if (iterable != null) { >> for (BigDecimal item : iterable) { >> accumulator.f0 = item; >> } >> } >> } >> } >> >> public static void main(String[] args) throws Exception { >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); >> >> RowTypeInfo rowTypeInfo = new RowTypeInfo( >> new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT, >> Types.BIG_DEC}, >> new String[] {"rowtime", "id", "val"}); >> >> DataStream<Row> dataStream = env.fromElements( >> Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")), >> Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")), >> Row.of(new Timestamp(3_300L), 2, new BigDecimal("3")) >> >> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo); >> >> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); >> tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new >> LastDecimalAggFunction()); >> >> tableEnv.createTemporaryView("InputTable", dataStream, >> $("rowtime").rowtime(), $("id"), $("val")); >> >> Table resultTable = tableEnv.sqlQuery("" + >> "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " + >> "FROM InputTable " + >> "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' >> SECOND), id"); >> >> DataStream<Row> resultStream = tableEnv >> .toRetractStream(resultTable, new >> RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC)) >> .map((MapFunction<Tuple2<Boolean, Row>, Row>) value -> >> value.f1); >> >> resultStream.addSink(new SinkFunction<Row>() { >> @Override >> public void invoke(Row value, Context context) { >> LOGGER.info("SinkFunction.invoke(): value={}", value); >> } >> }); >> >> env.execute(); >> } >> } >> >> >> LIU Xiao <liuxiaogen...@gmail.com> 于2021年7月30日周五 上午11:04写道: >> >>> I'm currently converting our old code (based on Flink 1.6) to Flink 1.13 >>> and encountered a strange problem about the user-defined aggregate function >>> which takes BigDecimal as the parameter and output: >>> >>> Exception in thread "main" >>>> org.apache.flink.table.api.ValidationException: SQL validation failed. An >>>> error occurred in the type inference logic of function 'LAST_DECIMAL'. >>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) >>>> at >>>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) >>>> at >>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) >>>> at >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704) >>>> at poc.flink.table.PocTimestampUdf.main(PocTimestampUdf.java:101) >>>> Caused by: org.apache.flink.table.api.ValidationException: An error >>>> occurred in the type inference logic of function 'LAST_DECIMAL'. >>>> at >>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) >>>> at >>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) >>>> at >>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) >>>> at java.base/java.util.Optional.flatMap(Optional.java:294) >>>> at >>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) >>>> at >>>> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) >>>> at >>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183) >>>> at >>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200) >>>> at >>>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169) >>>> at >>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945) >>>> at >>>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) >>>> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org >>>> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) >>>> ... 5 more >>>> Caused by: org.apache.flink.table.api.ValidationException: Could not >>>> extract a valid type inference for function class >>>> 'poc.flink.table.LastDecimalAggFunction'. Please check for implementation >>>> mistakes and/or provide a corresponding hint. >>>> at >>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) >>>> at >>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) >>>> at >>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98) >>>> at >>>> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212) >>>> at >>>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) >>>> ... 16 more >>>> Caused by: org.apache.flink.table.api.ValidationException: Error in >>>> extracting a signature to output mapping. >>>> at >>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) >>>> at >>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117) >>>> at >>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161) >>>> at >>>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148) >>>> ... 19 more >>>> Caused by: org.apache.flink.table.api.ValidationException: Unable to >>>> extract a type inference from method: >>>> public void >>>> poc.flink.table.LastDecimalAggFunction.accumulate(poc.flink.table.LastDecimalAccumulator,java.math.BigDecimal) >>>> at >>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) >>>> at >>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183) >>>> at >>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114) >>>> ... 21 more >>>> Caused by: org.apache.flink.table.api.ValidationException: Could not >>>> extract a data type from 'class java.math.BigDecimal' in generic class >>>> 'org.apache.flink.table.functions.AggregateFunction' in class >>>> poc.flink.table.LastDecimalAggFunction. Please pass the required data type >>>> manually or allow RAW types. >>>> at >>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) >>>> at >>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241) >>>> at >>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219) >>>> at >>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195) >>>> at >>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125) >>>> at >>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478) >>>> at >>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319) >>>> at >>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269) >>>> at >>>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169) >>>> ... 22 more >>>> Caused by: org.apache.flink.table.api.ValidationException: Values of >>>> 'java.math.BigDecimal' need fixed precision and scale. >>>> at >>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) >>>> at >>>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356) >>>> at >>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractPredefinedType(DataTypeExtractor.java:398) >>>> at >>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:276) >>>> at >>>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233) >>>> ... 29 more >>>> >>> >>> The program source code: >>> >>> package poc.flink.table; >>> >>> import org.apache.flink.api.common.eventtime.WatermarkStrategy; >>> import org.apache.flink.api.common.functions.MapFunction; >>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>> import org.apache.flink.api.common.typeinfo.Types; >>> import org.apache.flink.api.java.tuple.Tuple1; >>> import org.apache.flink.api.java.tuple.Tuple2; >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>> import org.apache.flink.streaming.api.datastream.DataStream; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import org.apache.flink.streaming.api.functions.sink.SinkFunction; >>> import org.apache.flink.table.annotation.DataTypeHint; >>> import org.apache.flink.table.api.Table; >>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >>> import org.apache.flink.table.functions.AggregateFunction; >>> import org.apache.flink.types.Row; >>> import org.slf4j.Logger; >>> import org.slf4j.LoggerFactory; >>> >>> import java.math.BigDecimal; >>> import java.sql.Timestamp; >>> >>> import static org.apache.flink.table.api.Expressions.$; >>> >>> public class PocLastDecimalJob { >>> private static final Logger LOGGER = >>> LoggerFactory.getLogger(PocLastDecimalJob.class); >>> >>> public static class LastDecimalAccumulator extends Tuple1<BigDecimal> { >>> public LastDecimalAccumulator(BigDecimal f0) { >>> super(f0); >>> } >>> } >>> >>> public static class LastDecimalAggFunction extends >>> AggregateFunction<BigDecimal, LastDecimalAccumulator> { >>> >>> @Override >>> public BigDecimal getValue(LastDecimalAccumulator accumulator) { >>> return accumulator.f0; >>> } >>> >>> @Override >>> public LastDecimalAccumulator createAccumulator() { >>> return new LastDecimalAccumulator(null); >>> } >>> >>> public void accumulate(LastDecimalAccumulator accumulator, >>> @DataTypeHint("DECIMAL(38, 18)") BigDecimal >>> value) { >>> if (value != null) { >>> accumulator.f0 = value; >>> } >>> } >>> >>> public void merge(LastDecimalAccumulator accumulator, >>> Iterable<BigDecimal> iterable) { >>> if (iterable != null) { >>> for (BigDecimal item : iterable) { >>> accumulator.f0 = item; >>> } >>> } >>> } >>> } >>> >>> public static void main(String[] args) throws Exception { >>> >>> final StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); >>> >>> RowTypeInfo rowTypeInfo = new RowTypeInfo( >>> new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT, >>> Types.BIG_DEC}, >>> new String[] {"rowtime", "id", "val"}); >>> >>> DataStream<Row> dataStream = env.fromElements( >>> Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")), >>> Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")), >>> Row.of(new Timestamp(3_300L), 2, new BigDecimal("3")) >>> >>> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo); >>> >>> StreamTableEnvironment tableEnv = >>> StreamTableEnvironment.create(env); >>> tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new >>> LastDecimalAggFunction()); >>> >>> tableEnv.createTemporaryView("InputTable", dataStream, >>> $("rowtime.rowtime"), $("id"), $("val")); >>> >>> Table resultTable = tableEnv.sqlQuery("" + >>> "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " + >>> "FROM InputTable " + >>> "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' >>> SECOND), id"); >>> >>> DataStream<Row> resultStream = tableEnv >>> .toRetractStream(resultTable, new >>> RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC)) >>> .map((MapFunction<Tuple2<Boolean, Row>, Row>) value -> >>> value.f1); >>> >>> resultStream.addSink(new SinkFunction<Row>() { >>> @Override >>> public void invoke(Row value, Context context) { >>> LOGGER.info("SinkFunction.invoke(): value={}", value); >>> } >>> }); >>> >>> env.execute(); >>> } >>> } >>> >>>