lct45 commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r463026843
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ########## @@ -132,16 +135,19 @@ final boolean stateCreated, final StoreBuilder<?> storeBuilder, final Windows<W> windows, + final SlidingWindows slidingWindows, final SessionWindows sessionWindows, final Merger<? super K, VOut> sessionMerger) { final ProcessorSupplier<K, ?> kStreamAggregate; - if (windows == null && sessionWindows == null) { + if (windows == null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator); - } else if (windows != null && sessionWindows == null) { + } else if (windows != null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator); - } else if (windows == null && sessionMerger != null) { + } else if (windows == null && slidingWindows != null && sessionWindows == null) { + kStreamAggregate = new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, aggregator); + } else if (windows == null && slidingWindows == null && sessionMerger != null) { Review comment: The original just had the check for `sessionMerger != null`, are there scenarios where the sessionMerger would be null but the sessionWindows wouldn't? I did think it was kind of inconsistent to check 'sessionMerger' just that one time and check 'sessionWindows' the other times so maybe it was a mistake ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org