LadyForest commented on code in PR #24030: URL: https://github.com/apache/flink/pull/24030#discussion_r1446810962
########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggregateUtil.scala: ########## @@ -279,19 +279,19 @@ object AggregateUtil extends Enumeration { typeFactory: FlinkTypeFactory, inputRowType: RowType, aggCalls: Seq[AggregateCall], + needRetraction: Boolean, windowSpec: WindowSpec, isStateBackendDataViews: Boolean): AggregateInfoList = { - // Hopping window requires additional COUNT(*) to determine whether to register next timer - // through whether the current fired window is empty, see SliceSharedWindowAggProcessor. - val needInputCount = windowSpec.isInstanceOf[HoppingWindowSpec] + // Hopping window always requires additional COUNT(*) to determine whether to register next Review Comment: Nit: remove extra whitespace between "determine" and "whether" ########## flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala: ########## @@ -219,8 +219,13 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti val providedTrait = new ModifyKindSetTrait(builder.build()) createNewNode(window, children, providedTrait, requiredTrait, requester) - case _: StreamPhysicalWindowAggregate | _: StreamPhysicalWindowRank | - _: StreamPhysicalWindowDeduplicate => + case window: StreamPhysicalWindowAggregate => + // WindowAggregate and WindowTableAggregate support all changes in input + val children = visitChildren(window, ModifyKindSetTrait.ALL_CHANGES) + val providedTrait = new ModifyKindSetTrait(ModifyKindSet.INSERT_ONLY) Review Comment: Nit: Add a TODO to mark the provided trait set can be extended once we support emit strategy using the TVF syntax ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala: ########## @@ -127,6 +162,26 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl util.verifyRelPlan(sql) } + @TestTemplate + def testTumble_OnProctimeWithCDCSource(): Unit = { + assumeThat(isTwoPhase).isTrue Review Comment: Is there any particular reason to just enable two-phase test? ########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java: ########## @@ -231,4 +239,33 @@ protected void collect(RowData aggResult) { reuseOutput.replace(ctx.getKeyedStateBackend().getCurrentKey(), aggResult); ctx.output(reuseOutput); } + + /** A supplier that returns whether the window is empty. */ + protected final class WindowIsEmptySupplier implements Supplier<Boolean>, Serializable { + private static final long serialVersionUID = 1L; + + private final int indexOfCountStar; + + private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner assigner) { + if (assigner instanceof SliceAssigners.HoppingSliceAssigner) { + checkArgument( + indexOfCountStar >= 0, + "Hopping window requires a COUNT(*) in the aggregate functions."); + } + this.indexOfCountStar = indexOfCountStar; + } + + @Override + public Boolean get() { + if (indexOfCountStar < 0) { Review Comment: I saw there is a precondition check `indexOfCounter`. In which condition might it be negative? ########## flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/utils/TestData.scala: ########## @@ -756,6 +756,147 @@ object TestData { row("2020-10-10 00:00:34", 1, 3d, 3f, new JBigDecimal("3.33"), "Comment#3", "b") ) + val windowChangelogDataWithTimestamp: Seq[Row] = List( Review Comment: +----+---------------------+---+-----+-----+-------+------------+---+ | Op | Timestamp | 1 | 2 | 3 | 4 | 5 | 6 | +----+---------------------+---+-----+-----+-------+------------+---+ | +I | 2020-10-10 00:00:01 | 1 | 1.0 | 1.0 | 1.11 | Hi | a | | +I | 2020-10-10 00:00:02 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | -D | 2020-10-10 00:00:03 | 1 | 1.0 | 1.0 | 1.11 | Hi | a | | +I | 2020-10-10 00:00:03 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | +I | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 | 5.55 | null | a | | -U | 2020-10-10 00:00:04 | 2 | 2.0 | 2.0 | 2.22 | Comment#1 | a | | +U | 2020-10-10 00:00:04 | 22|22.0 |22.2 | 22.22 | Comment#22 | a | | +I | 2020-10-10 00:00:07 | 3 | 3.0 | 3.0 | null | Hello | b | | +I | 2020-10-10 00:00:06 | 6 | 6.0 | 6.0 | 6.66 | Hi | b | | +I | 2020-10-10 00:00:08 | 3 |null | 3.0 | 3.33 | Comment#2 | a | | +I | 2020-10-10 00:00:04 | 5 | 5.0 |null | 5.55 | Hi | a | | +I | 2020-10-10 00:00:16 | 4 | 4.0 | 4.0 | 4.44 | Hi | b | | -D | 2020-10-10 00:00:04 | 5 | 5.0 | 5.0 | 5.55 | null | a | | +I | 2020-10-10 00:00:38 | 8 | 8.0 | 8.0 | 8.88 | Comment#4 | b | | -D | 2020-10-10 00:00:39 | 8 | 8.0 | 8.0 | 8.88 | Comment#4 | b | +----+---------------------+---+-----+-----+-------+------------+---+ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org