ableegoldman commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r465379071
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ########## @@ -132,16 +135,19 @@ final boolean stateCreated, final StoreBuilder<?> storeBuilder, final Windows<W> windows, + final SlidingWindows slidingWindows, final SessionWindows sessionWindows, final Merger<? super K, VOut> sessionMerger) { final ProcessorSupplier<K, ?> kStreamAggregate; - if (windows == null && sessionWindows == null) { + if (windows == null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator); - } else if (windows != null && sessionWindows == null) { + } else if (windows != null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator); - } else if (windows == null && sessionMerger != null) { + } else if (windows == null && slidingWindows != null && sessionWindows == null) { + kStreamAggregate = new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, aggregator); + } else if (windows == null && slidingWindows == null && sessionMerger != null) { Review comment: Not saying all this needs to be cleaned up in this PR. If we check one thing (eg `sessionMerger`) then we should check everything (eg `sessionMerged != null && sessionWindows != null`). We can decide whether we really need to check anything as followup ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org