ableegoldman commented on code in PR #18048:
URL: https://github.com/apache/kafka/pull/18048#discussion_r1879290561


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/KTableKTableJoinNode.java:
##########
@@ -120,30 +115,13 @@ private void enableVersionedSemantics(final 
ProcessorParameters<K, ?, ?, ?> proc
     public void writeToTopology(final InternalTopologyBuilder topologyBuilder) 
{
         final String thisProcessorName = 
thisProcessorParameters().processorName();
         final String otherProcessorName = 
otherProcessorParameters().processorName();
-        final String mergeProcessorName = 
mergeProcessorParameters().processorName();
 
-        topologyBuilder.addProcessor(
-            thisProcessorName,
-            thisProcessorParameters().processorSupplier(),
-            thisJoinSideNodeName());
-
-        topologyBuilder.addProcessor(
-            otherProcessorName,
-            otherProcessorParameters().processorSupplier(),
-            otherJoinSideNodeName());
-
-        topologyBuilder.addProcessor(
-            mergeProcessorName,
-            mergeProcessorParameters().processorSupplier(),
-            thisProcessorName,
-            otherProcessorName);
+        thisProcessorParameters().addProcessorTo(topologyBuilder, 
thisJoinSideNodeName());
+        otherProcessorParameters().addProcessorTo(topologyBuilder, 
otherJoinSideNodeName());
+        mergeProcessorParameters().addProcessorTo(topologyBuilder, 
thisProcessorName, otherProcessorName);
 
         topologyBuilder.connectProcessorAndStateStores(thisProcessorName, 
joinOtherStoreNames);
         topologyBuilder.connectProcessorAndStateStores(otherProcessorName, 
joinThisStoreNames);

Review Comment:
   Following up on this -- make sure to read my comment 
[here](https://github.com/apache/kafka/pull/18048/files#r1879285311) first for 
full context -- the reason we have to explicitly connect these stores with the 
opposite side's processor and can't do this via the `#stores` implementation is 
because the stores are actually associated with the upstream KTable, and not 
with any of the join nodes. So each KTable of course only returns its own state 
store from its own `#stores` method (what you implemented in your last KIP-1112 
PR), since that processor only needs access to its own store. And the 
downstream joins just connect to the upstream KTable's store
   
   So this explains why removing these `#connectProcessorAndStateStores` makes 
things fail. One question I still have is whether we should try to get a handle 
on the upstream KTable's store itself and return that from the `#stores` method 
of the downstream join that uses that store, rather than have them be connected 
manually using `#connectProcessorAndStateStores`. At the moment I'm thinking 
(and hoping) that it's ok to leave things as-is, since the join nodes actually 
don't access the stores directly but instead go through something called a 
`ValueGetter`. But I'm planning to tackle table-table joins next in my 
end-to-end integration tests of processor/store wrapping, so we'll see if this 
becomes an issue



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to