xuyangzhong commented on code in PR #24030:
URL: https://github.com/apache/flink/pull/24030#discussion_r1448510717


##########
flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java:
##########
@@ -231,4 +239,33 @@ protected void collect(RowData aggResult) {
         reuseOutput.replace(ctx.getKeyedStateBackend().getCurrentKey(), 
aggResult);
         ctx.output(reuseOutput);
     }
+
+    /** A supplier that returns whether the window is empty. */
+    protected final class WindowIsEmptySupplier implements Supplier<Boolean>, 
Serializable {
+        private static final long serialVersionUID = 1L;
+
+        private final int indexOfCountStar;
+
+        private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner 
assigner) {
+            if (assigner instanceof SliceAssigners.HoppingSliceAssigner) {
+                checkArgument(
+                        indexOfCountStar >= 0,
+                        "Hopping window requires a COUNT(*) in the aggregate 
functions.");
+            }
+            this.indexOfCountStar = indexOfCountStar;
+        }
+
+        @Override
+        public Boolean get() {
+            if (indexOfCountStar < 0) {

Review Comment:
   When this operator consumes append-only stream and using no-HOP window, 
`indexOfCountStar ` will be `-1`. That means we don't need an extra `count(*)` 
column to figure out how many datas are in window. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to