ableegoldman commented on code in PR #17892: URL: https://github.com/apache/kafka/pull/17892#discussion_r1855366279
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() { public synchronized Map<String, StoreFactory> stateStores() { return stateFactories; } + + public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier( + final String name, + final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier + ) { + return ProcessorWrapper.asWrappedFixedKey( + processorWrapper.wrapFixedKeyProcessorSupplier(name, processorSupplier) Review Comment: Ah, yes, it's correct but I see how it's confusing -- basically the non-static `#wrapFixedKeyProcessorSupplier` is what does the actual wrapping via the configured ProcessorWrapper instance, whereas the static `ProcessorWrapper#asWrapped` method is what converts a regular ProcessorSupplier into the `WrappedProcessorSupplier` subclass that we use as a marker interface to distinguish between processors that are and aren't wrapped yet. I will add a comment to the code to clarify for future readers, but perhaps it's also worth choosing a better name for the static `#asWrapped` utility methods. Maybe `#toWrappedProcessorSupplier` or something like that? @guozhangwang @agavra thoughts? (btw I am going to merge the PR as-is since we can address this in a followup PR, I'll make sure to add the code comments and/or change the name in another PR) ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() { public synchronized Map<String, StoreFactory> stateStores() { return stateFactories; } + + public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier( + final String name, + final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier + ) { + return ProcessorWrapper.asWrappedFixedKey( + processorWrapper.wrapFixedKeyProcessorSupplier(name, processorSupplier) + ); + } + + public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier( + final String name, + final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier + ) { + return ProcessorWrapper.asWrapped( Review Comment: see above 🙂 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org