rodesai commented on code in PR #17892:
URL: https://github.com/apache/kafka/pull/17892#discussion_r1855993900


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() {
     public synchronized Map<String, StoreFactory> stateStores() {
         return stateFactories;
     }
+
+    public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> 
wrapFixedKeyProcessorSupplier(
+        final String name,
+        final FixedKeyProcessorSupplier<KIn, VIn,  VOut> processorSupplier
+    ) {
+        return ProcessorWrapper.asWrappedFixedKey(
+            processorWrapper.wrapFixedKeyProcessorSupplier(name, 
processorSupplier)

Review Comment:
   > Ah, yes, it's correct but I see how it's confusing -- basically the 
non-static #wrapFixedKeyProcessorSupplier is what does the actual wrapping via 
the configured ProcessorWrapper instance, whereas the static 
ProcessorWrapper#asWrapped method is what converts a regular ProcessorSupplier 
into the WrappedProcessorSupplier subclass that we use as a marker interface to 
distinguish between processors that are and aren't wrapped yet.
   
   I'm not following this. `wrappedFixedKeyProcessorSupplier` is already 
expected to return an instance that implements `WrappedProcessorSupplier` - so 
why does `InternalTopologyBuilder` also need to enclose it in a 
`WrappedProcessorSupplierImpl`? Do we explicitly check for the 
`WrappedProcessorSupplierImpl` class elsewhere?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to