ableegoldman commented on code in PR #18048: URL: https://github.com/apache/kafka/pull/18048#discussion_r1879290561
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java: ########## @@ -120,30 +115,13 @@ private void enableVersionedSemantics(final ProcessorParameters<K, ?, ?, ?> proc public void writeToTopology(final InternalTopologyBuilder topologyBuilder) { final String thisProcessorName = thisProcessorParameters().processorName(); final String otherProcessorName = otherProcessorParameters().processorName(); - final String mergeProcessorName = mergeProcessorParameters().processorName(); - topologyBuilder.addProcessor( - thisProcessorName, - thisProcessorParameters().processorSupplier(), - thisJoinSideNodeName()); - - topologyBuilder.addProcessor( - otherProcessorName, - otherProcessorParameters().processorSupplier(), - otherJoinSideNodeName()); - - topologyBuilder.addProcessor( - mergeProcessorName, - mergeProcessorParameters().processorSupplier(), - thisProcessorName, - otherProcessorName); + thisProcessorParameters().addProcessorTo(topologyBuilder, thisJoinSideNodeName()); + otherProcessorParameters().addProcessorTo(topologyBuilder, otherJoinSideNodeName()); + mergeProcessorParameters().addProcessorTo(topologyBuilder, thisProcessorName, otherProcessorName); topologyBuilder.connectProcessorAndStateStores(thisProcessorName, joinOtherStoreNames); topologyBuilder.connectProcessorAndStateStores(otherProcessorName, joinThisStoreNames); Review Comment: Following up on this -- make sure to read my comment [here](https://github.com/apache/kafka/pull/18048/files#r1879285311) first for full context -- the reason we have to explicitly connect these stores with the opposite side's processor and can't do this via the `#stores` implementation is because the stores are actually associated with the upstream KTable, and not with any of the join nodes. So each KTable of course only returns its own state store from its own `#stores` method (what you implemented in your last KIP-1112 PR), since that processor only needs access to its own store. And the downstream joins just connect to the upstream KTable's store So this explains why removing these `#connectProcessorAndStateStores` makes things fail. One question I still have is whether we should try to get a handle on the upstream KTable's store itself and return that from the `#stores` method of the downstream join that uses that store, rather than have them be connected manually using `#connectProcessorAndStateStores`. At the moment I'm thinking (and hoping) that it's ok to leave things as-is, since the join nodes actually don't access the stores directly but instead go through something called a `ValueGetter`. But I'm planning to tackle table-table joins next in my end-to-end integration tests of processor/store wrapping, so we'll see if this becomes an issue -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org