JingsongLi commented on a change in pull request #18135:
URL: https://github.com/apache/flink/pull/18135#discussion_r773580701



##########
File path: 
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
##########
@@ -831,34 +921,53 @@ class AggregateITCase(
 
   @Test
   def testDifferentTypesSumWithRetract(): Unit = {
-    val data = List(
-      (1.toByte, 1.toShort, 1, 1L, 1.0F, 1.0, "a"),
-      (2.toByte, 2.toShort, 2, 2L, 2.0F, 2.0, "a"),
-      (3.toByte, 3.toShort, 3, 3L, 3.0F, 3.0, "a"),
-      (3.toByte, 3.toShort, 3, 3L, 3.0F, 3.0, "a"),
-      (1.toByte, 1.toShort, 1, 1L, 1.0F, 1.0, "b"),
-      (2.toByte, 2.toShort, 2, 2L, 2.0F, 2.0, "b"),
-      (3.toByte, 3.toShort, 3, 3L, 3.0F, 3.0, "c"),
-      (3.toByte, 3.toShort, 3, 3L, 3.0F, 3.0, "c")
-    )
-
-    val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'f, 'g)
-    tEnv.registerTable("T", t)
-
-    // We use sub-query + limit here to ensure retraction
-    val sql =
-      """
-        |SELECT sum(a), sum(b), sum(c), sum(d), sum(e), sum(f), sum(h) FROM (
-        |  SELECT *, CAST(c AS DECIMAL(3, 2)) AS h FROM T LIMIT 8
-        |) GROUP BY g
-      """.stripMargin
+    val upsertSourceCurrencyData = List(
+      changelogRow("+I", Byte.box(1), Short.box(1), Int.box(1), Long.box(1),
+        Float.box(1.0F), Double.box(1.0), "a"),
+      changelogRow("+I", Byte.box(2), Short.box(2), Int.box(2), Long.box(2),
+        Float.box(2.0F), Double.box(2.0), "a"),
+      changelogRow("-D", Byte.box(1), Short.box(1), Int.box(1), Long.box(1),
+        Float.box(1.0F), Double.box(1.0), "a"),
+      changelogRow("+I", Byte.box(3), Short.box(3), Int.box(3), Long.box(3),
+        Float.box(3.0F), Double.box(3.0), "a"),
+      changelogRow("-D", Byte.box(2), Short.box(2), Int.box(2), Long.box(2),
+        Float.box(2.0F), Double.box(2.0), "a"),
+      changelogRow("+I", Byte.box(1), Short.box(1), Int.box(1), Long.box(1),
+        Float.box(1.0F), Double.box(1.0), "a"),
+      changelogRow("-D", Byte.box(3), Short.box(3), Int.box(3), Long.box(3),
+        Float.box(3.0F), Double.box(3.0), "a"),
+      changelogRow("+I", Byte.box(2), Short.box(2), Int.box(2), Long.box(2),
+        Float.box(2.0F), Double.box(2.0), "a"),
+      changelogRow("+I", Byte.box(3), Short.box(3), Int.box(3), Long.box(3),
+        Float.box(3.0F), Double.box(3.0), "a"))
+
+    val upsertSourceDataId = registerData(upsertSourceCurrencyData);

Review comment:
       minor: remove ";"

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SumWithRetractAggFunction.java
##########
@@ -194,21 +198,31 @@ protected Expression zeroLiteral() {
 
     /** Built-in Decimal Sum with retract aggregate function. */
     public static class DecimalSumWithRetractAggFunction extends 
SumWithRetractAggFunction {
-        private DecimalType decimalType;
+        private final DataType resultType;
 
         public DecimalSumWithRetractAggFunction(DecimalType decimalType) {
-            this.decimalType = decimalType;
+            DecimalType sumType = (DecimalType) 
LogicalTypeMerging.findSumAggType(decimalType);
+            this.resultType = DataTypes.DECIMAL(sumType.getPrecision(), 
sumType.getScale());
         }
 
         @Override
         public DataType getResultType() {
-            DecimalType sumType = (DecimalType) 
LogicalTypeMerging.findSumAggType(decimalType);
-            return DataTypes.DECIMAL(sumType.getPrecision(), 
sumType.getScale());
+            return resultType;
         }
 
         @Override
         protected Expression zeroLiteral() {
             return literal(0);
         }
+
+        protected UnresolvedCallExpression adjustedPlus(
+                UnresolvedReferenceExpression arg1, 
UnresolvedReferenceExpression arg2) {
+            return aggDecimalPlus(arg1, arg2);
+        }
+
+        protected UnresolvedCallExpression adjustedMinus(

Review comment:
       ditto

##########
File path: 
flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/aggfunctions/SumWithRetractAggFunction.java
##########
@@ -194,21 +198,31 @@ protected Expression zeroLiteral() {
 
     /** Built-in Decimal Sum with retract aggregate function. */
     public static class DecimalSumWithRetractAggFunction extends 
SumWithRetractAggFunction {
-        private DecimalType decimalType;
+        private final DataType resultType;
 
         public DecimalSumWithRetractAggFunction(DecimalType decimalType) {
-            this.decimalType = decimalType;
+            DecimalType sumType = (DecimalType) 
LogicalTypeMerging.findSumAggType(decimalType);
+            this.resultType = DataTypes.DECIMAL(sumType.getPrecision(), 
sumType.getScale());
         }
 
         @Override
         public DataType getResultType() {
-            DecimalType sumType = (DecimalType) 
LogicalTypeMerging.findSumAggType(decimalType);
-            return DataTypes.DECIMAL(sumType.getPrecision(), 
sumType.getScale());
+            return resultType;
         }
 
         @Override
         protected Expression zeroLiteral() {
             return literal(0);
         }
+
+        protected UnresolvedCallExpression adjustedPlus(

Review comment:
       override?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to