xuyangzhong commented on code in PR #24030: URL: https://github.com/apache/flink/pull/24030#discussion_r1448510717
########## flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/aggregate/window/processors/AbstractWindowAggProcessor.java: ########## @@ -231,4 +239,33 @@ protected void collect(RowData aggResult) { reuseOutput.replace(ctx.getKeyedStateBackend().getCurrentKey(), aggResult); ctx.output(reuseOutput); } + + /** A supplier that returns whether the window is empty. */ + protected final class WindowIsEmptySupplier implements Supplier<Boolean>, Serializable { + private static final long serialVersionUID = 1L; + + private final int indexOfCountStar; + + private WindowIsEmptySupplier(int indexOfCountStar, SliceAssigner assigner) { + if (assigner instanceof SliceAssigners.HoppingSliceAssigner) { + checkArgument( + indexOfCountStar >= 0, + "Hopping window requires a COUNT(*) in the aggregate functions."); + } + this.indexOfCountStar = indexOfCountStar; + } + + @Override + public Boolean get() { + if (indexOfCountStar < 0) { Review Comment: When this operator consumes append-only stream and using no-HOP window, `indexOfCountStar ` will be `-1`. That means we don't need an extra `count(*)` column to figure out how many datas are in window. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org