appchemist commented on code in PR #18800:
URL: https://github.com/apache/kafka/pull/18800#discussion_r2328738608


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java:
##########
@@ -20,19 +20,25 @@
 
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.LinkedHashSet;
 import java.util.LinkedList;
 import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.function.Predicate;
 
 public abstract class GraphNode {
-
     private final Collection<GraphNode> childNodes = new LinkedHashSet<>();
     private final Collection<GraphNode> parentNodes = new LinkedHashSet<>();
 
     private final Collection<Label> labels = new LinkedList<>();
     private final String nodeName;
+    private boolean repartitionForbidden = false;
+    private boolean repartitionRequired = false;

Review Comment:
   @mjsax Since the `repartitionRequired` seemed likely to be removed,
   I changed the implementation from using an enum to splitting it into boolean 
variables.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java:
##########
@@ -1313,6 +1301,7 @@ public <KOut, VOut> KStream<KOut, VOut> process(
             processNode.setValueChangingOperation(true);
         }
 
+        processNode.mustRepartition();

Review Comment:
   @mjsax Since relying solely on `KeyChangingOperation` led to fail test, 
   I forced a repartition using `mustRepartition`. 
   
   The failing test was `StreamsGraphTest.shouldNotThrowNPEWithMergeNodes`.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java:
##########
@@ -93,6 +99,23 @@ public String nodeName() {
         return nodeName;
     }
 
+    public boolean canDetermineRepartition() {
+        return keyChangingOperation || repartitionForbidden || 
repartitionRequired;
+    }
+
+    public boolean isRepartitionRequired() {
+        final GraphNode find = 
findNearestParentNodeMatching(GraphNode::canDetermineRepartition);

Review Comment:
   @mjsax When deciding whether to repartition, I modified it because the BFS 
approach seemed more suitable than the existing DFS method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to