LadyForest commented on code in PR #24030:
URL: https://github.com/apache/flink/pull/24030#discussion_r1446810962


##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala:
##########
@@ -279,19 +279,19 @@ object AggregateUtil extends Enumeration {
       typeFactory: FlinkTypeFactory,
       inputRowType: RowType,
       aggCalls: Seq[AggregateCall],
+      needRetraction: Boolean,
       windowSpec: WindowSpec,
       isStateBackendDataViews: Boolean): AggregateInfoList = {
-    // Hopping window requires additional COUNT(*) to determine  whether to 
register next timer
-    // through whether the current fired window is empty, see 
SliceSharedWindowAggProcessor.
-    val needInputCount = windowSpec.isInstanceOf[HoppingWindowSpec]
+    // Hopping window always requires additional COUNT(*) to determine  
whether to register next

Review Comment:
   Nit: remove extra whitespace between "determine" and "whether"



##########
flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala:
##########
@@ -219,8 +219,13 @@ class FlinkChangelogModeInferenceProgram extends 
FlinkOptimizeProgram[StreamOpti
         val providedTrait = new ModifyKindSetTrait(builder.build())
         createNewNode(window, children, providedTrait, requiredTrait, 
requester)
 
-      case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank |
-          _: StreamPhysicalWindowDeduplicate =>
+      case window: StreamPhysicalWindowAggregate =>
+        // WindowAggregate and WindowTableAggregate support all changes in 
input
+        val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES)
+        val providedTrait = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY)

Review Comment:
   Nit: Add a TODO to mark the provided trait set can be extended once we 
support emit strategy using the TVF syntax



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala:
##########
@@ -127,6 +162,26 @@ class WindowAggregateTest(aggPhaseEnforcer: 
AggregatePhaseStrategy) extends Tabl
     util.verifyRelPlan(sql)
   }
 
+  @TestTemplate
+  def testTumble_OnProctimeWithCDCSource(): Unit = {
+    assumeThat(isTwoPhase).isTrue

Review Comment:
   Is there any particular reason to just enable two-phase test?



##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java:
##########
@@ -231,4 +239,33 @@ protected void collect(RowData aggResult) {
         reuseOutput.replace(ctx.getKeyedStateBackend().getCurrentKey(), 
aggResult);
         ctx.output(reuseOutput);
     }
+
+    /** A supplier that returns whether the window is empty. */
+    protected final class WindowIsEmptySupplier implements Supplier<Boolean>, 
Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final int indexOfCountStar;
+
+        private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner 
assigner) {
+            if (assigner instanceof SliceAssigners.HoppingSliceAssigner) {
+                checkArgument(
+                        indexOfCountStar >= 0,
+                        "Hopping window requires a COUNT(*) in the aggregate 
functions.");
+            }
+            this.indexOfCountStar = indexOfCountStar;
+        }
+
+        @Override
+        public Boolean get() {
+            if (indexOfCountStar < 0) {

Review Comment:
   I saw there is a precondition check `indexOfCounter`. In which condition 
might it be negative?



##########
flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala:
##########
@@ -756,6 +756,147 @@ object TestData {
     row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"), 
"Comment#3", "b")
   )
 
+  val windowChangelogDataWithTimestamp: Seq[Row] = List(

Review Comment:
   +----+---------------------+---+-----+-----+-------+------------+---+
   | Op |      Timestamp      | 1 |  2  |  3  |   4   |     5      | 6 |
   +----+---------------------+---+-----+-----+-------+------------+---+
   | +I | 2020-10-10 00:00:01 | 1 | 1.0 | 1.0 |  1.11 |     Hi     | a |
   | +I | 2020-10-10 00:00:02 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | -D | 2020-10-10 00:00:03 | 1 | 1.0 | 1.0 |  1.11 |     Hi     | a |
   | +I | 2020-10-10 00:00:03 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | +I | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 |  5.55 |    null    | a |
   | -U | 2020-10-10 00:00:04 | 2 | 2.0 | 2.0 |  2.22 | Comment#1  | a |
   | +U | 2020-10-10 00:00:04 | 22|22.0 |22.2 | 22.22 | Comment#22 | a |
   | +I | 2020-10-10 00:00:07 | 3 | 3.0 | 3.0 |  null |   Hello    | b |
   | +I | 2020-10-10 00:00:06 | 6 | 6.0 | 6.0 |  6.66 |     Hi     | b |
   | +I | 2020-10-10 00:00:08 | 3 |null | 3.0 |  3.33 | Comment#2  | a |
   | +I | 2020-10-10 00:00:04 | 5 | 5.0 |null |  5.55 |     Hi     | a |
   | +I | 2020-10-10 00:00:16 | 4 | 4.0 | 4.0 |  4.44 |     Hi     | b |
   | -D | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 |  5.55 |    null    | a |
   | +I | 2020-10-10 00:00:38 | 8 | 8.0 | 8.0 |  8.88 | Comment#4  | b |
   | -D | 2020-10-10 00:00:39 | 8 | 8.0 | 8.0 |  8.88 | Comment#4  | b |
   +----+---------------------+---+-----+-----+-------+------------+---+
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to