ableegoldman commented on code in PR #17892: URL: https://github.com/apache/kafka/pull/17892#discussion_r1853099939
########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java: ########## @@ -78,18 +80,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut> fixedKeyProcessorSupplier() { public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, final String[] parentNodeNames) { if (processorSupplier != null) { - topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames); - if (processorSupplier.stores() != null) { - for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) { + ApiUtils.checkSupplier(processorSupplier); + + final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = Review Comment: Ultimately it might make sense to move the extraction of the stores from a ProcessorSupplier into a single InternalTopologyBuilder method (eg #addStatefulProcessor) that calls both #addProcessor and #addStateStore, and do the wrapping there. This will also help future-proof new DSL operators from being implemented incorrectly and missing the wrapper step. But I think it makes sense to wait until we've completed all the followup PRs for the remaining DSL operators, in case there are any weird edge cases we haven't thought of/seen yet. Then we can do one final PR to clean up and future-proof the wrapping process -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org