vvcephei commented on a change in pull request #9221:
URL: https://github.com/apache/kafka/pull/9221#discussion_r485735152



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java
##########
@@ -26,22 +30,57 @@
  * Used by the Join nodes as there are several parameters, this abstraction 
helps
  * keep the number of arguments more reasonable.
  */
-public class ProcessorParameters<K, V> {
+public class ProcessorParameters<KIn, VIn, KOut, VOut> {
 
-    private final ProcessorSupplier<K, V> processorSupplier;
+    // During the transition to KIP-478, we capture arguments passed from the 
old API to simplify
+    // the performance of casts that we still need to perform. This will 
eventually be removed.
+    private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, 
VIn> oldProcessorSupplier;
+    private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier;
     private final String processorName;
 
-    public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier,
+    public ProcessorParameters(final 
org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> 
processorSupplier,
                                final String processorName) {
+        oldProcessorSupplier = processorSupplier;
+        this.processorSupplier = () -> 
ProcessorAdapter.adapt(processorSupplier.get());
+        this.processorName = processorName;
+    }
 
+    public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> 
processorSupplier,
+                               final String processorName) {
+        oldProcessorSupplier = null;
         this.processorSupplier = processorSupplier;
         this.processorName = processorName;
     }
 
-    public ProcessorSupplier<K, V> processorSupplier() {
+    public ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier() {
         return processorSupplier;
     }
 
+    public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> 
oldProcessorSupplier() {
+        return oldProcessorSupplier;
+    }
+
+    @SuppressWarnings("unchecked")
+    KTableSource<KIn, VIn> kTableSourceSupplier() {
+        // This cast always works because KTableSource hasn't been converted 
yet.
+        return oldProcessorSupplier == null
+            ? null
+            : !(oldProcessorSupplier instanceof KTableSource)
+              ? null
+              : (KTableSource<KIn, VIn>) oldProcessorSupplier;
+    }

Review comment:
       Thanks; yes, let's revisit it after the dust settles from KIP-478. These 
methods are for the most part temporary, since it's a real pain to do the cast 
when you have to deal with the current "dual interface" state in which 
processors might be old-style or new-style.
   
   I have a feeling I'll be able to eliminate these methods completely when I 
convert the relevant processors to the new API again.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to