mjsax commented on a change in pull request #9039: URL: https://github.com/apache/kafka/pull/9039#discussion_r466032331
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java ########## @@ -132,16 +135,19 @@ final boolean stateCreated, final StoreBuilder<?> storeBuilder, final Windows<W> windows, + final SlidingWindows slidingWindows, final SessionWindows sessionWindows, final Merger<? super K, VOut> sessionMerger) { final ProcessorSupplier<K, ?> kStreamAggregate; - if (windows == null && sessionWindows == null) { + if (windows == null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), initializer, aggregator); - } else if (windows != null && sessionWindows == null) { + } else if (windows != null && slidingWindows == null && sessionWindows == null) { kStreamAggregate = new KStreamWindowAggregate<>(windows, storeBuilder.name(), initializer, aggregator); - } else if (windows == null && sessionMerger != null) { + } else if (windows == null && slidingWindows != null && sessionWindows == null) { + kStreamAggregate = new KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), initializer, aggregator); + } else if (windows == null && slidingWindows == null && sessionMerger != null) { Review comment: We pass in all parameter to sharing to code that creates the `StatefulProcessorNode` -- not sure if it's the best way to structure the code and I am happy to split it up into multiple methods call (as long as we avoid code duplication). And yes, you are right, it's internal and the checks are just for us to avoid programming errors. Users should never be exposed to it. I personally tend to make a lot of mistakes and the more checks we have in place the better IMHO :) If @lct45 want's she can just do a side cleanup PR to fix it, and rebase this PR after the cleanup PR was merged? Or we do it as follow up. Whatever works best for you. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org