vvcephei commented on a change in pull request #9221: URL: https://github.com/apache/kafka/pull/9221#discussion_r485735152
########## File path: streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java ########## @@ -26,22 +30,57 @@ * Used by the Join nodes as there are several parameters, this abstraction helps * keep the number of arguments more reasonable. */ -public class ProcessorParameters<K, V> { +public class ProcessorParameters<KIn, VIn, KOut, VOut> { - private final ProcessorSupplier<K, V> processorSupplier; + // During the transition to KIP-478, we capture arguments passed from the old API to simplify + // the performance of casts that we still need to perform. This will eventually be removed. + private final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier; + private final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier; private final String processorName; - public ProcessorParameters(final ProcessorSupplier<K, V> processorSupplier, + public ProcessorParameters(final org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> processorSupplier, final String processorName) { + oldProcessorSupplier = processorSupplier; + this.processorSupplier = () -> ProcessorAdapter.adapt(processorSupplier.get()); + this.processorName = processorName; + } + public ProcessorParameters(final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier, + final String processorName) { + oldProcessorSupplier = null; this.processorSupplier = processorSupplier; this.processorName = processorName; } - public ProcessorSupplier<K, V> processorSupplier() { + public ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier() { return processorSupplier; } + public org.apache.kafka.streams.processor.ProcessorSupplier<KIn, VIn> oldProcessorSupplier() { + return oldProcessorSupplier; + } + + @SuppressWarnings("unchecked") + KTableSource<KIn, VIn> kTableSourceSupplier() { + // This cast always works because KTableSource hasn't been converted yet. + return oldProcessorSupplier == null + ? null + : !(oldProcessorSupplier instanceof KTableSource) + ? null + : (KTableSource<KIn, VIn>) oldProcessorSupplier; + } Review comment: Thanks; yes, let's revisit it after the dust settles from KIP-478. These methods are for the most part temporary, since it's a real pain to do the cast when you have to deal with the current "dual interface" state in which processors might be old-style or new-style. I have a feeling I'll be able to eliminate these methods completely when I convert the relevant processors to the new API again. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org