appchemist commented on code in PR #18800: URL: https://github.com/apache/kafka/pull/18800#discussion_r2328738608
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java: ########## @@ -20,19 +20,25 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; -import java.util.Arrays; import java.util.Collection; import java.util.LinkedHashSet; import java.util.LinkedList; import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.HashSet; +import java.util.ArrayDeque; +import java.util.Arrays; +import java.util.function.Predicate; public abstract class GraphNode { - private final Collection<GraphNode> childNodes = new LinkedHashSet<>(); private final Collection<GraphNode> parentNodes = new LinkedHashSet<>(); private final Collection<Label> labels = new LinkedList<>(); private final String nodeName; + private boolean repartitionForbidden = false; + private boolean repartitionRequired = false; Review Comment: @mjsax Since the `repartitionRequired` seemed likely to be removed, I changed the implementation from using an enum to splitting it into boolean variables. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -1313,6 +1301,7 @@ public <KOut, VOut> KStream<KOut, VOut> process( processNode.setValueChangingOperation(true); } + processNode.mustRepartition(); Review Comment: @mjsax Since relying solely on `KeyChangingOperation` led to fail test, I forced a repartition using `mustRepartition`. The failing test was `StreamsGraphTest.shouldNotThrowNPEWithMergeNodes`. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/GraphNode.java: ########## @@ -93,6 +99,23 @@ public String nodeName() { return nodeName; } + public boolean canDetermineRepartition() { + return keyChangingOperation || repartitionForbidden || repartitionRequired; + } + + public boolean isRepartitionRequired() { + final GraphNode find = findNearestParentNodeMatching(GraphNode::canDetermineRepartition); Review Comment: @mjsax When deciding whether to repartition, I modified it because the BFS approach seemed more suitable than the existing DFS method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org