vpapavas commented on code in PR #12644:
URL: https://github.com/apache/kafka/pull/12644#discussion_r977470895
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -356,12 +383,55 @@ private void mergeDuplicateSourceNodes() {
}
}
- private void optimizeKTableSourceTopics() {
+
+ /**
+ * The self-join rewriting can be applied if the StreamStreamJoinNode has
a single parent.
+ * If the join is a self-join, remove the node KStreamJoinWindow
corresponding to the
+ * right argument of the join (the "other"). The join node may have
multiple siblings but for
+ * this rewriting we only care about the ThisKStreamJoinWindow and the
OtherKStreamJoinWindow.
+ * We iterate over all the siblings to identify these two nodes so that we
can remove the
+ * latter.
+ */
+ @SuppressWarnings("unchecked")
+ private void rewriteSingleStoreSelfJoin(
+ final GraphNode currentNode, final Map<GraphNode, Boolean> visited) {
+ visited.put(currentNode, true);
+ if (currentNode instanceof StreamStreamJoinNode &&
currentNode.parentNodes().size() == 1) {
+ final StreamStreamJoinNode joinNode = (StreamStreamJoinNode)
currentNode;
+ // Remove JoinOtherWindowed node
+ final GraphNode parent =
joinNode.parentNodes().stream().findFirst().get();
+ GraphNode left = null, right = null;
+ for (final GraphNode child: parent.children()) {
+ if (child instanceof WindowedStreamProcessorNode
Review Comment:
The current JoinNode might have other JoinNodes as siblings. We need to
differentiate between the `WindowedStreamProcessorNode` nodes that belong the
current node versus others.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]