mjsax commented on a change in pull request #9039:
URL: https://github.com/apache/kafka/pull/9039#discussion_r466032331



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/CogroupedStreamAggregateBuilder.java
##########
@@ -132,16 +135,19 @@
                                                                                
     final boolean stateCreated,
                                                                                
     final StoreBuilder<?> storeBuilder,
                                                                                
     final Windows<W> windows,
+                                                                               
     final SlidingWindows slidingWindows,
                                                                                
     final SessionWindows sessionWindows,
                                                                                
     final Merger<? super K, VOut> sessionMerger) {
 
         final ProcessorSupplier<K, ?> kStreamAggregate;
 
-        if (windows == null && sessionWindows == null) {
+        if (windows == null && slidingWindows == null && sessionWindows == 
null) {
             kStreamAggregate = new KStreamAggregate<>(storeBuilder.name(), 
initializer, aggregator);
-        } else if (windows != null && sessionWindows == null) {
+        } else if (windows != null && slidingWindows == null && sessionWindows 
== null) {
             kStreamAggregate = new KStreamWindowAggregate<>(windows, 
storeBuilder.name(), initializer, aggregator);
-        } else if (windows == null && sessionMerger != null) {
+        } else if (windows == null && slidingWindows != null && sessionWindows 
== null) {
+            kStreamAggregate = new 
KStreamSlidingWindowAggregate<>(slidingWindows, storeBuilder.name(), 
initializer, aggregator);
+        } else if (windows == null && slidingWindows == null && sessionMerger 
!= null) {

Review comment:
       We pass in all parameter to sharing to code that creates the 
`StatefulProcessorNode` -- not sure if it's the best way to structure the code 
and I am happy to split it up into multiple methods call (as long as we avoid 
code duplication). And yes, you are right, it's internal and the checks are 
just for us to avoid programming errors. Users should never be exposed to it. I 
personally tend to make a lot of mistakes and the more checks we have in place 
the better IMHO :)
   
   If @lct45 want's she can just do a side cleanup PR to fix it, and rebase 
this PR after the cleanup PR was merged? Or we do it as follow up. Whatever 
works best for you.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to