vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r960708055
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,6 +381,95 @@ private void mergeDuplicateSourceNodes() {
}
}
+ /**
+ * If the join is a self-join, remove the node KStreamJoinWindow
corresponding to the
+ */
+ @SuppressWarnings("unchecked")
+ private void rewriteSelfJoin(final GraphNode currentNode, final
Set<GraphNode> visited) {
+ visited.add(currentNode);
+ if (currentNode instanceof StreamStreamJoinNode &&
isSelfJoin(currentNode)) {
+ ((StreamStreamJoinNode) currentNode).setSelfJoin();
+ // Remove JoinOtherWindowed node
+ final GraphNode parent =
currentNode.parentNodes().stream().findFirst().get();
+ GraphNode left = null, right = null;
+ for (final GraphNode child: parent.children()) {
+ if (child instanceof ProcessorGraphNode
+ && isStreamJoinWindowNode((ProcessorGraphNode) child)) {
+ if (left == null) {
+ left = child;
+ } else {
+ right = child;
+ }
+ }
+ }
+ // Sanity check
+ if (left != null && right != null && left.buildPriority() <
right.buildPriority()) {
+ parent.removeChild(right);
+ }
+ }
+ for (final GraphNode child: currentNode.children()) {
+ if (!visited.contains(child)) {
+ rewriteSelfJoin(child, visited);
+ }
+ }
+ }
+
+ /**
+ * The self-join rewriting can be applied if:
+ * 1. The path from the StreamStreamJoinNode to the root contains a single
source node.
+ * 2. The StreamStreamJoinNode has a single parent.
+ * 3. There are no other nodes besides the KStreamJoinWindow that are
siblings of the
+ * StreamStreamJoinNode and have smaller build priority.
Review Comment:
This is the order in which the nodes in the graph are visited and added to
the topology. It matters here because the join node can have siblings with
smaller build priority, hence they will be before it in the topological order
of the topology (applied before the join). The only acceptable nodes are the
JoinWindow nodes. If there are others, then it is not a self-join.
This check covers cases like:
```
stream1 = builder.stream("topic1");
streams1.mapValues(v -> v);
stream2 = builder.stream("topic1"); // same topic
stream1.join(stream2)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]