ableegoldman commented on code in PR #17892:
URL: https://github.com/apache/kafka/pull/17892#discussion_r1853099939


##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java:
##########
@@ -78,18 +80,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut> 
fixedKeyProcessorSupplier() {
 
     public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, 
final String[] parentNodeNames) {
         if (processorSupplier != null) {
-            topologyBuilder.addProcessor(processorName, processorSupplier, 
parentNodeNames);
-            if (processorSupplier.stores() != null) {
-                for (final StoreBuilder<?> storeBuilder : 
processorSupplier.stores()) {
+            ApiUtils.checkSupplier(processorSupplier);
+
+            final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =

Review Comment:
   Ultimately it might make sense to move the extraction of the stores from a 
ProcessorSupplier into a single InternalTopologyBuilder method (eg 
#addStatefulProcessor) that calls both #addProcessor and #addStateStore, and do 
the wrapping there. This will also help future-proof new DSL operators from 
being implemented incorrectly and missing the wrapper step. 
   
   But I think it makes sense to wait until we've completed all the followup 
PRs for the remaining DSL operators, in case there are any weird edge cases we 
haven't thought of/seen yet. Then we can do one final PR to clean up and 
future-proof the wrapping process



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to