ableegoldman commented on code in PR #17892:
URL: https://github.com/apache/kafka/pull/17892#discussion_r1855366279


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() {
     public synchronized Map<String, StoreFactory> stateStores() {
         return stateFactories;
     }
+
+    public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> 
wrapFixedKeyProcessorSupplier(
+        final String name,
+        final FixedKeyProcessorSupplier<KIn, VIn,  VOut> processorSupplier
+    ) {
+        return ProcessorWrapper.asWrappedFixedKey(
+            processorWrapper.wrapFixedKeyProcessorSupplier(name, 
processorSupplier)

Review Comment:
   Ah, yes, it's correct but I see how it's confusing -- basically the 
non-static `#wrapFixedKeyProcessorSupplier` is what does the actual wrapping 
via the configured ProcessorWrapper instance, whereas the static 
`ProcessorWrapper#asWrapped` method is what converts a regular 
ProcessorSupplier into the `WrappedProcessorSupplier` subclass that we use as a 
marker interface to distinguish between processors that are and aren't wrapped 
yet.
   
   I will add a comment to the code to clarify for future readers, but perhaps 
it's also worth choosing a better name for the static `#asWrapped` utility 
methods. Maybe `#toWrappedProcessorSupplier` or something like that?
   
   @guozhangwang @agavra thoughts?
   
   (btw I am going to merge the PR as-is since we can address this in a 
followup PR, I'll make sure to add the code comments and/or change the name in 
another PR)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() {
     public synchronized Map<String, StoreFactory> stateStores() {
         return stateFactories;
     }
+
+    public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> 
wrapFixedKeyProcessorSupplier(
+        final String name,
+        final FixedKeyProcessorSupplier<KIn, VIn,  VOut> processorSupplier
+    ) {
+        return ProcessorWrapper.asWrappedFixedKey(
+            processorWrapper.wrapFixedKeyProcessorSupplier(name, 
processorSupplier)
+        );
+    }
+
+    public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, 
VOut> wrapProcessorSupplier(
+        final String name,
+        final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
+    ) {
+        return ProcessorWrapper.asWrapped(

Review Comment:
   see above 🙂 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to