JingsongLi commented on a change in pull request #18135: URL: https://github.com/apache/flink/pull/18135#discussion_r773580701
########## File path: flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala ########## @@ -831,34 +921,53 @@ class AggregateITCase( @Test def testDifferentTypesSumWithRetract(): Unit = { - val data = List( - (1.toByte, 1.toShort, 1, 1L, 1.0F, 1.0, "a"), - (2.toByte, 2.toShort, 2, 2L, 2.0F, 2.0, "a"), - (3.toByte, 3.toShort, 3, 3L, 3.0F, 3.0, "a"), - (3.toByte, 3.toShort, 3, 3L, 3.0F, 3.0, "a"), - (1.toByte, 1.toShort, 1, 1L, 1.0F, 1.0, "b"), - (2.toByte, 2.toShort, 2, 2L, 2.0F, 2.0, "b"), - (3.toByte, 3.toShort, 3, 3L, 3.0F, 3.0, "c"), - (3.toByte, 3.toShort, 3, 3L, 3.0F, 3.0, "c") - ) - - val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g) - tEnv.registerTable("T", t) - - // We use sub-query + limit here to ensure retraction - val sql = - """ - |SELECT sum(a), sum(b), sum(c), sum(d), sum(e), sum(f), sum(h) FROM ( - | SELECT *, CAST(c AS DECIMAL(3, 2)) AS h FROM T LIMIT 8 - |) GROUP BY g - """.stripMargin + val upsertSourceCurrencyData = List( + changelogRow("+I", Byte.box(1), Short.box(1), Int.box(1), Long.box(1), + Float.box(1.0F), Double.box(1.0), "a"), + changelogRow("+I", Byte.box(2), Short.box(2), Int.box(2), Long.box(2), + Float.box(2.0F), Double.box(2.0), "a"), + changelogRow("-D", Byte.box(1), Short.box(1), Int.box(1), Long.box(1), + Float.box(1.0F), Double.box(1.0), "a"), + changelogRow("+I", Byte.box(3), Short.box(3), Int.box(3), Long.box(3), + Float.box(3.0F), Double.box(3.0), "a"), + changelogRow("-D", Byte.box(2), Short.box(2), Int.box(2), Long.box(2), + Float.box(2.0F), Double.box(2.0), "a"), + changelogRow("+I", Byte.box(1), Short.box(1), Int.box(1), Long.box(1), + Float.box(1.0F), Double.box(1.0), "a"), + changelogRow("-D", Byte.box(3), Short.box(3), Int.box(3), Long.box(3), + Float.box(3.0F), Double.box(3.0), "a"), + changelogRow("+I", Byte.box(2), Short.box(2), Int.box(2), Long.box(2), + Float.box(2.0F), Double.box(2.0), "a"), + changelogRow("+I", Byte.box(3), Short.box(3), Int.box(3), Long.box(3), + Float.box(3.0F), Double.box(3.0), "a")) + + val upsertSourceDataId = registerData(upsertSourceCurrencyData); Review comment: minor: remove ";" ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SumWithRetractAggFunction.java ########## @@ -194,21 +198,31 @@ protected Expression zeroLiteral() { /** Built-in Decimal Sum with retract aggregate function. */ public static class DecimalSumWithRetractAggFunction extends SumWithRetractAggFunction { - private DecimalType decimalType; + private final DataType resultType; public DecimalSumWithRetractAggFunction(DecimalType decimalType) { - this.decimalType = decimalType; + DecimalType sumType = (DecimalType) LogicalTypeMerging.findSumAggType(decimalType); + this.resultType = DataTypes.DECIMAL(sumType.getPrecision(), sumType.getScale()); } @Override public DataType getResultType() { - DecimalType sumType = (DecimalType) LogicalTypeMerging.findSumAggType(decimalType); - return DataTypes.DECIMAL(sumType.getPrecision(), sumType.getScale()); + return resultType; } @Override protected Expression zeroLiteral() { return literal(0); } + + protected UnresolvedCallExpression adjustedPlus( + UnresolvedReferenceExpression arg1, UnresolvedReferenceExpression arg2) { + return aggDecimalPlus(arg1, arg2); + } + + protected UnresolvedCallExpression adjustedMinus( Review comment: ditto ########## File path: flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SumWithRetractAggFunction.java ########## @@ -194,21 +198,31 @@ protected Expression zeroLiteral() { /** Built-in Decimal Sum with retract aggregate function. */ public static class DecimalSumWithRetractAggFunction extends SumWithRetractAggFunction { - private DecimalType decimalType; + private final DataType resultType; public DecimalSumWithRetractAggFunction(DecimalType decimalType) { - this.decimalType = decimalType; + DecimalType sumType = (DecimalType) LogicalTypeMerging.findSumAggType(decimalType); + this.resultType = DataTypes.DECIMAL(sumType.getPrecision(), sumType.getScale()); } @Override public DataType getResultType() { - DecimalType sumType = (DecimalType) LogicalTypeMerging.findSumAggType(decimalType); - return DataTypes.DECIMAL(sumType.getPrecision(), sumType.getScale()); + return resultType; } @Override protected Expression zeroLiteral() { return literal(0); } + + protected UnresolvedCallExpression adjustedPlus( Review comment: override? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org