ableegoldman commented on code in PR #17892:
URL: https://github.com/apache/kafka/pull/17892#discussion_r1853099036


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java:
##########
@@ -78,18 +80,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut> 
fixedKeyProcessorSupplier() {
 
     public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, 
final String[] parentNodeNames) {
         if (processorSupplier != null) {
-            topologyBuilder.addProcessor(processorName, processorSupplier, 
parentNodeNames);
-            if (processorSupplier.stores() != null) {
-                for (final StoreBuilder<?> storeBuilder : 
processorSupplier.stores()) {
+            ApiUtils.checkSupplier(processorSupplier);
+
+            final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =

Review Comment:
   ah, good question! that would definitely be simpler. however, to maximize 
the utility of the new wrapper, we want to allow users to wrap not only the 
Processor returned by the ProcessorSupplier#get method, but also the 
StoreBuilders returned from ProcessorSupplier#stores. Unfortunately, the stores 
are currently extracted and connected outside of the #addProcessor method, and 
since they aren't attached (via #addStateStore) until after #addProcessor is 
called, we can't even look up and wrap the stores for a processor from within 
#addProcessor



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to