vpapavas commented on code in PR #12555:
URL: https://github.com/apache/kafka/pull/12555#discussion_r959328394
##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java:
##########
@@ -270,16 +272,20 @@ private void maybeAddNodeForOptimizationMetadata(final
GraphNode node) {
// use this method for testing only
public void buildAndOptimizeTopology() {
- buildAndOptimizeTopology(false);
+ buildAndOptimizeTopology(false, false);
}
- public void buildAndOptimizeTopology(final boolean optimizeTopology) {
+ public void buildAndOptimizeTopology(
+ final boolean optimizeTopology, final boolean optimizeSelfJoin) {
mergeDuplicateSourceNodes();
if (optimizeTopology) {
LOG.debug("Optimizing the Kafka Streams graph for repartition
nodes");
optimizeKTableSourceTopics();
maybeOptimizeRepartitionOperations();
+ if (optimizeSelfJoin) {
Review Comment:
The main goal of the conditions is to ensure that the join arguments are the
same and are not altered somewhere along the path from the root to the join in
a unilateral (only one side) way.
The first condition is needed to make sure that the join has a single source
ancestor. We don't want to apply the optimization if there is another source
somewhere along the path from the root to the join node. This path might be
long and might go through multiple joins etc. so just looking for a single
parent is not enough.
The second condition is needed to ensure that no transformation is applied
to one side of the join and not the other.
The third condition is needed for the same reason because in my tests, I
have seen topologies where the `ProcessorNodes` are added as siblings to the
join and not as parents of it. For instance, in this example:
```
final KStream<String, String> stream1 =
builder.stream(Collections.singleton("t1"), consumed);
stream1.mapValues(v -> v);
final KStream<String, String> stream2 =
builder.stream(Collections.singleton("t1"), consumed);
stream1.join(stream2, MockValueJoiner.TOSTRING_JOINER,
JoinWindows.ofTimeDifferenceWithNoGrace(ofMillis(100)));
```
the node for the `map` is a sibling of the join node and not a parent.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]