mjsax commented on code in PR #12608: URL: https://github.com/apache/kafka/pull/12608#discussion_r1058650295
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java: ########## @@ -532,29 +532,36 @@ private KStream<K, V> merge(final InternalStreamsBuilder builder, final KStream<K, V> stream, final NamedInternal named) { final KStreamImpl<K, V> streamImpl = (KStreamImpl<K, V>) stream; - final boolean requireRepartitioning = streamImpl.repartitionRequired || repartitionRequired; - final String name = named.orElseGenerateWithPrefix(builder, MERGE_NAME); - final Set<String> allSubTopologySourceNodes = new HashSet<>(); - allSubTopologySourceNodes.addAll(subTopologySourceNodes); - allSubTopologySourceNodes.addAll(streamImpl.subTopologySourceNodes); - - final ProcessorParameters<? super K, ? super V, ?, ?> processorParameters = - new ProcessorParameters<>(new PassThrough<>(), name); - final ProcessorGraphNode<? super K, ? super V> mergeNode = - new ProcessorGraphNode<>(name, processorParameters); - mergeNode.setMergeNode(true); - - builder.addGraphNode(Arrays.asList(graphNode, streamImpl.graphNode), mergeNode); - - // drop the serde as we cannot safely use either one to represent both streams - return new KStreamImpl<>( - name, - null, - null, - allSubTopologySourceNodes, - requireRepartitioning, - mergeNode, - builder); + if (graphNode.equals(streamImpl.graphNode)) { + // We hide the current node through this processor node. + // Parent nodes are collected in a Set. + // The merge node would otherwise only have a single parent in this case. + return process(new KStreamPassThrough<>()).merge(stream, named); Review Comment: I was just thinking about this fix, and it feels a little bit like a "hack" to be honest. Digging around in the code a little bit, it seems the issue is this: ``` public abstract class GraphNode { private final Collection<GraphNode> childNodes = new LinkedHashSet<>(); private final Collection<GraphNode> parentNodes = new LinkedHashSet<>(); ``` Because we use a set for the parent nodes, adding the same node twice just results in a single parent, instead of two. Just replacing the set with a list breaks two tests though: `TepartitionTopicNamingTest` and `InternalStreamsBuilderTest`. Not sure if we could fix those tests. Overall, I would prefer a "clean" solution over the proposed fix, but I am not 100% sure if it's doable. \cc @bbejeck WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org