rodesai commented on code in PR #17892: URL: https://github.com/apache/kafka/pull/17892#discussion_r1855993900
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() { public synchronized Map<String, StoreFactory> stateStores() { return stateFactories; } + + public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier( + final String name, + final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier + ) { + return ProcessorWrapper.asWrappedFixedKey( + processorWrapper.wrapFixedKeyProcessorSupplier(name, processorSupplier) Review Comment: > Ah, yes, it's correct but I see how it's confusing -- basically the non-static #wrapFixedKeyProcessorSupplier is what does the actual wrapping via the configured ProcessorWrapper instance, whereas the static ProcessorWrapper#asWrapped method is what converts a regular ProcessorSupplier into the WrappedProcessorSupplier subclass that we use as a marker interface to distinguish between processors that are and aren't wrapped yet. I'm not following this. `wrappedFixedKeyProcessorSupplier` is already expected to return an instance that implements `WrappedProcessorSupplier` - so why does `InternalTopologyBuilder` also need to enclose it in a `WrappedProcessorSupplierImpl`? Do we explicitly check for the `WrappedProcessorSupplierImpl` class elsewhere? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org