Hi, You can do sth like this: /** * UDF implementing Power function with Decimal */ public class PowerFunction extends ScalarFunction {
public static MathContext mc = new MathContext(18); public @DataTypeHint("DECIMAL(38,18)") BigDecimal eval(@DataTypeHint("DECIMAL(38,18)") BigDecimal x, @DataTypeHint("DECIMAL(38,18)") BigDecimal y) { return BigDecimalMath.pow(x, y, mc); } } pt., 30 lip 2021 o 05:12 LIU Xiao <liuxiaogen...@gmail.com> napisał(a): > > sorry for a little error, the program code should be: > > package poc.flink.table; > > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.functions.MapFunction; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.common.typeinfo.Types; > import org.apache.flink.api.java.tuple.Tuple1; > import org.apache.flink.api.java.tuple.Tuple2; > import org.apache.flink.api.java.typeutils.RowTypeInfo; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import org.apache.flink.streaming.api.functions.sink.SinkFunction; > import org.apache.flink.table.annotation.DataTypeHint; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.table.functions.AggregateFunction; > import org.apache.flink.types.Row; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import java.math.BigDecimal; > import java.sql.Timestamp; > > import static org.apache.flink.table.api.Expressions.$; > > public class PocLastDecimalJob { > private static final Logger LOGGER = > LoggerFactory.getLogger(PocLastDecimalJob.class); > > public static class LastDecimalAccumulator extends Tuple1<BigDecimal> { > public LastDecimalAccumulator(BigDecimal f0) { > super(f0); > } > } > > public static class LastDecimalAggFunction extends > AggregateFunction<BigDecimal, LastDecimalAccumulator> { > > @Override > public BigDecimal getValue(LastDecimalAccumulator accumulator) { > return accumulator.f0; > } > > @Override > public LastDecimalAccumulator createAccumulator() { > return new LastDecimalAccumulator(null); > } > > public void accumulate(LastDecimalAccumulator accumulator, > @DataTypeHint("DECIMAL(38, 18)") BigDecimal > value) { > if (value != null) { > accumulator.f0 = value; > } > } > > public void merge(LastDecimalAccumulator accumulator, > Iterable<BigDecimal> iterable) { > if (iterable != null) { > for (BigDecimal item : iterable) { > accumulator.f0 = item; > } > } > } > } > > public static void main(String[] args) throws Exception { > > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); > > RowTypeInfo rowTypeInfo = new RowTypeInfo( > new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT, > Types.BIG_DEC}, > new String[] {"rowtime", "id", "val"}); > > DataStream<Row> dataStream = env.fromElements( > Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")), > Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")), > Row.of(new Timestamp(3_300L), 2, new BigDecimal("3")) > > ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo); > > StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); > tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new > LastDecimalAggFunction()); > > tableEnv.createTemporaryView("InputTable", dataStream, > $("rowtime").rowtime(), $("id"), $("val")); > > Table resultTable = tableEnv.sqlQuery("" + > "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " + > "FROM InputTable " + > "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' > SECOND), id"); > > DataStream<Row> resultStream = tableEnv > .toRetractStream(resultTable, new > RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC)) > .map((MapFunction<Tuple2<Boolean, Row>, Row>) value -> > value.f1); > > resultStream.addSink(new SinkFunction<Row>() { > @Override > public void invoke(Row value, Context context) { > LOGGER.info("SinkFunction.invoke(): value={}", value); > } > }); > > env.execute(); > } > } > > > LIU Xiao <liuxiaogen...@gmail.com> 于2021年7月30日周五 上午11:04写道: >> >> I'm currently converting our old code (based on Flink 1.6) to Flink 1.13 and >> encountered a strange problem about the user-defined aggregate function >> which takes BigDecimal as the parameter and output: >> >>> Exception in thread "main" org.apache.flink.table.api.ValidationException: >>> SQL validation failed. An error occurred in the type inference logic of >>> function 'LAST_DECIMAL'. >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157) >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110) >>> at >>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201) >>> at >>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101) >>> at >>> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704) >>> at poc.flink.table.PocTimestampUdf.main(PocTimestampUdf.java:101) >>> Caused by: org.apache.flink.table.api.ValidationException: An error >>> occurred in the type inference logic of function 'LAST_DECIMAL'. >>> at >>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163) >>> at >>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146) >>> at >>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100) >>> at java.base/java.util.Optional.flatMap(Optional.java:294) >>> at >>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98) >>> at >>> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945) >>> at >>> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704) >>> at >>> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152) >>> ... 5 more >>> Caused by: org.apache.flink.table.api.ValidationException: Could not >>> extract a valid type inference for function class >>> 'poc.flink.table.LastDecimalAggFunction'. Please check for implementation >>> mistakes and/or provide a corresponding hint. >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) >>> at >>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150) >>> at >>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98) >>> at >>> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212) >>> at >>> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160) >>> ... 16 more >>> Caused by: org.apache.flink.table.api.ValidationException: Error in >>> extracting a signature to output mapping. >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117) >>> at >>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161) >>> at >>> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148) >>> ... 19 more >>> Caused by: org.apache.flink.table.api.ValidationException: Unable to >>> extract a type inference from method: >>> public void >>> poc.flink.table.LastDecimalAggFunction.accumulate(poc.flink.table.LastDecimalAccumulator,java.math.BigDecimal) >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114) >>> ... 21 more >>> Caused by: org.apache.flink.table.api.ValidationException: Could not >>> extract a data type from 'class java.math.BigDecimal' in generic class >>> 'org.apache.flink.table.functions.AggregateFunction' in class >>> poc.flink.table.LastDecimalAggFunction. Please pass the required data type >>> manually or allow RAW types. >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269) >>> at >>> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169) >>> ... 22 more >>> Caused by: org.apache.flink.table.api.ValidationException: Values of >>> 'java.math.BigDecimal' need fixed precision and scale. >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361) >>> at >>> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractPredefinedType(DataTypeExtractor.java:398) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:276) >>> at >>> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233) >>> ... 29 more >> >> >> The program source code: >> >> package poc.flink.table; >> >> import org.apache.flink.api.common.eventtime.WatermarkStrategy; >> import org.apache.flink.api.common.functions.MapFunction; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.common.typeinfo.Types; >> import org.apache.flink.api.java.tuple.Tuple1; >> import org.apache.flink.api.java.tuple.Tuple2; >> import org.apache.flink.api.java.typeutils.RowTypeInfo; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import org.apache.flink.streaming.api.functions.sink.SinkFunction; >> import org.apache.flink.table.annotation.DataTypeHint; >> import org.apache.flink.table.api.Table; >> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; >> import org.apache.flink.table.functions.AggregateFunction; >> import org.apache.flink.types.Row; >> import org.slf4j.Logger; >> import org.slf4j.LoggerFactory; >> >> import java.math.BigDecimal; >> import java.sql.Timestamp; >> >> import static org.apache.flink.table.api.Expressions.$; >> >> public class PocLastDecimalJob { >> private static final Logger LOGGER = >> LoggerFactory.getLogger(PocLastDecimalJob.class); >> >> public static class LastDecimalAccumulator extends Tuple1<BigDecimal> { >> public LastDecimalAccumulator(BigDecimal f0) { >> super(f0); >> } >> } >> >> public static class LastDecimalAggFunction extends >> AggregateFunction<BigDecimal, LastDecimalAccumulator> { >> >> @Override >> public BigDecimal getValue(LastDecimalAccumulator accumulator) { >> return accumulator.f0; >> } >> >> @Override >> public LastDecimalAccumulator createAccumulator() { >> return new LastDecimalAccumulator(null); >> } >> >> public void accumulate(LastDecimalAccumulator accumulator, >> @DataTypeHint("DECIMAL(38, 18)") BigDecimal >> value) { >> if (value != null) { >> accumulator.f0 = value; >> } >> } >> >> public void merge(LastDecimalAccumulator accumulator, >> Iterable<BigDecimal> iterable) { >> if (iterable != null) { >> for (BigDecimal item : iterable) { >> accumulator.f0 = item; >> } >> } >> } >> } >> >> public static void main(String[] args) throws Exception { >> >> final StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1); >> >> RowTypeInfo rowTypeInfo = new RowTypeInfo( >> new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT, >> Types.BIG_DEC}, >> new String[] {"rowtime", "id", "val"}); >> >> DataStream<Row> dataStream = env.fromElements( >> Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")), >> Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")), >> Row.of(new Timestamp(3_300L), 2, new BigDecimal("3")) >> >> ).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo); >> >> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); >> tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new >> LastDecimalAggFunction()); >> >> tableEnv.createTemporaryView("InputTable", dataStream, >> $("rowtime.rowtime"), $("id"), $("val")); >> >> Table resultTable = tableEnv.sqlQuery("" + >> "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " + >> "FROM InputTable " + >> "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL '1' >> SECOND), id"); >> >> DataStream<Row> resultStream = tableEnv >> .toRetractStream(resultTable, new >> RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC)) >> .map((MapFunction<Tuple2<Boolean, Row>, Row>) value -> >> value.f1); >> >> resultStream.addSink(new SinkFunction<Row>() { >> @Override >> public void invoke(Row value, Context context) { >> LOGGER.info("SinkFunction.invoke(): value={}", value); >> } >> }); >> >> env.execute(); >> } >> } -- Maciek Bryński