I'm currently converting our old code (based on Flink 1.6) to Flink 1.13
and encountered a strange problem about the user-defined aggregate function
which takes BigDecimal as the parameter and output:

Exception in thread "main" org.apache.flink.table.api.ValidationException:
> SQL validation failed. An error occurred in the type inference logic of
> function 'LAST_DECIMAL'.
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:157)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:110)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:201)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:101)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:704)
> at poc.flink.table.PocTimestampUdf.main(PocTimestampUdf.java:101)
> Caused by: org.apache.flink.table.api.ValidationException: An error
> occurred in the type inference logic of function 'LAST_DECIMAL'.
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:163)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToSqlFunction(FunctionCatalogOperatorTable.java:146)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lambda$lookupOperatorOverloads$0(FunctionCatalogOperatorTable.java:100)
> at java.base/java.util.Optional.flatMap(Optional.java:294)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.lookupOperatorOverloads(FunctionCatalogOperatorTable.java:98)
> at
> org.apache.calcite.sql.util.ChainedSqlOperatorTable.lookupOperatorOverloads(ChainedSqlOperatorTable.java:67)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1183)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1200)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.performUnconditionalRewrites(SqlValidatorImpl.java:1169)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:945)
> at
> org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:152)
> ... 5 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a valid type inference for function class
> 'poc.flink.table.LastDecimalAggFunction'. Please check for implementation
> mistakes and/or provide a corresponding hint.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:150)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.forAggregateFunction(TypeInferenceExtractor.java:98)
> at
> org.apache.flink.table.functions.AggregateFunction.getTypeInference(AggregateFunction.java:212)
> at
> org.apache.flink.table.planner.catalog.FunctionCatalogOperatorTable.convertToBridgingSqlFunction(FunctionCatalogOperatorTable.java:160)
> ... 16 more
> Caused by: org.apache.flink.table.api.ValidationException: Error in
> extracting a signature to output mapping.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:117)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInferenceOrError(TypeInferenceExtractor.java:161)
> at
> org.apache.flink.table.types.extraction.TypeInferenceExtractor.extractTypeInference(TypeInferenceExtractor.java:148)
> ... 19 more
> Caused by: org.apache.flink.table.api.ValidationException: Unable to
> extract a type inference from method:
> public void
> poc.flink.table.LastDecimalAggFunction.accumulate(poc.flink.table.LastDecimalAccumulator,java.math.BigDecimal)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:183)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractOutputMapping(FunctionMappingExtractor.java:114)
> ... 21 more
> Caused by: org.apache.flink.table.api.ValidationException: Could not
> extract a data type from 'class java.math.BigDecimal' in generic class
> 'org.apache.flink.table.functions.AggregateFunction' in class
> poc.flink.table.LastDecimalAggFunction. Please pass the required data type
> manually or allow RAW types.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:241)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRaw(DataTypeExtractor.java:219)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeWithClassContext(DataTypeExtractor.java:195)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractFromGeneric(DataTypeExtractor.java:125)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.lambda$createGenericResultExtraction$13(FunctionMappingExtractor.java:478)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.putExtractedResultMappings(FunctionMappingExtractor.java:319)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.collectMethodMappings(FunctionMappingExtractor.java:269)
> at
> org.apache.flink.table.types.extraction.FunctionMappingExtractor.extractResultMappings(FunctionMappingExtractor.java:169)
> ... 22 more
> Caused by: org.apache.flink.table.api.ValidationException: Values of
> 'java.math.BigDecimal' need fixed precision and scale.
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:361)
> at
> org.apache.flink.table.types.extraction.ExtractionUtils.extractionError(ExtractionUtils.java:356)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractPredefinedType(DataTypeExtractor.java:398)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrError(DataTypeExtractor.java:276)
> at
> org.apache.flink.table.types.extraction.DataTypeExtractor.extractDataTypeOrRawWithTemplate(DataTypeExtractor.java:233)
> ... 29 more
>

The program source code:

package poc.flink.table;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.types.Row;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.math.BigDecimal;
import java.sql.Timestamp;

import static org.apache.flink.table.api.Expressions.$;

public class PocLastDecimalJob {
    private static final Logger LOGGER =
LoggerFactory.getLogger(PocLastDecimalJob.class);

    public static class LastDecimalAccumulator extends Tuple1<BigDecimal> {
        public LastDecimalAccumulator(BigDecimal f0) {
            super(f0);
        }
    }

    public static class LastDecimalAggFunction extends
AggregateFunction<BigDecimal, LastDecimalAccumulator> {

        @Override
        public BigDecimal getValue(LastDecimalAccumulator accumulator) {
            return accumulator.f0;
        }

        @Override
        public LastDecimalAccumulator createAccumulator() {
            return new LastDecimalAccumulator(null);
        }

        public void accumulate(LastDecimalAccumulator accumulator,
                               @DataTypeHint("DECIMAL(38, 18)")
BigDecimal value) {
            if (value != null) {
                accumulator.f0 = value;
            }
        }

        public void merge(LastDecimalAccumulator accumulator,
Iterable<BigDecimal> iterable) {
            if (iterable != null) {
                for (BigDecimal item : iterable) {
                    accumulator.f0 = item;
                }
            }
        }
    }

    public static void main(String[] args) throws Exception {

        final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);

        RowTypeInfo rowTypeInfo = new RowTypeInfo(
                new TypeInformation[] {Types.SQL_TIMESTAMP, Types.INT,
Types.BIG_DEC},
                new String[] {"rowtime", "id", "val"});

        DataStream<Row> dataStream = env.fromElements(
                Row.of(new Timestamp(1_100L), 1, new BigDecimal("1")),
                Row.of(new Timestamp(2_200L), 1, new BigDecimal("2")),
                Row.of(new Timestamp(3_300L), 2, new BigDecimal("3"))
        
).assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()).returns(rowTypeInfo);

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        tableEnv.createTemporarySystemFunction("LAST_DECIMAL", new
LastDecimalAggFunction());

        tableEnv.createTemporaryView("InputTable", dataStream,
$("rowtime.rowtime"), $("id"), $("val"));

        Table resultTable = tableEnv.sqlQuery("" +
                "SELECT LAST_TIMESTAMP(rowtime), id, LAST_DECIMAL(bd) " +
                "FROM InputTable " +
                "GROUP BY HOP(rowtime, INTERVAL '1' SECOND, INTERVAL
'1' SECOND), id");

        DataStream<Row> resultStream = tableEnv
                .toRetractStream(resultTable, new
RowTypeInfo(Types.SQL_TIMESTAMP, Types.INT, Types.BIG_DEC))
                .map((MapFunction<Tuple2<Boolean, Row>, Row>) value ->
value.f1);

        resultStream.addSink(new SinkFunction<Row>() {
            @Override
            public void invoke(Row value, Context context) {
                LOGGER.info("SinkFunction.invoke(): value={}", value);
            }
        });

        env.execute();
    }
}

Reply via email to