agavra commented on code in PR #17892: URL: https://github.com/apache/kafka/pull/17892#discussion_r1853087162
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -61,16 +66,31 @@ import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST; import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST; import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE; +import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; public class InternalTopologyBuilder { public InternalTopologyBuilder() { this.topologyName = null; + this.processorWrapper = new NoOpProcessorWrapper(); } public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { this.topologyConfigs = topologyConfigs; this.topologyName = topologyConfigs.topologyName; + + try { + processorWrapper = topologyConfigs.getConfiguredInstance( + PROCESSOR_WRAPPER_CLASS_CONFIG, + ProcessorWrapper.class, + topologyConfigs.originals() + ); + } catch (final Exception e) { + log.error("Unable to instantiate ProcessorWrapper from value of config {}. " Review Comment: nit: does it make sense to also include the error in the logged message in case the user swallows this up higher in the stack somewhere? ########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -68,13 +72,23 @@ * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the * {@link org.apache.kafka.streams.KafkaStreams} constructor or the {@link KafkaStreamsNamedTopologyWrapper} constructor (deprecated) * will determine the defaults, which can then be overridden for specific topologies by passing them in when creating the - * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) StreamsBuilder(TopologyConfig)} method. + * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) StreamsBuilder(TopologyConfig)} method + * for DSL applications, or when creating a PAPI topology via the {@link Topology#Topology(TopologyConfig)} constructor. + * <p> + * Note that some configs that are only defined in the TopologyConfig and not in the StreamsConfig, such as the {@code processor.wrapper.class}, Review Comment: Is there a possibility of having this be available (and respected) in `StreamsConfig`? People tend to forget to properly pipe things through TopolgoyConfig, though having it only there makes it a bit more obvious where it needs to be passed in. ########## streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java: ########## @@ -78,18 +80,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut> fixedKeyProcessorSupplier() { public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, final String[] parentNodeNames) { if (processorSupplier != null) { - topologyBuilder.addProcessor(processorName, processorSupplier, parentNodeNames); - if (processorSupplier.stores() != null) { - for (final StoreBuilder<?> storeBuilder : processorSupplier.stores()) { + ApiUtils.checkSupplier(processorSupplier); + + final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped = Review Comment: curious why we do this here instead of inside `topologyBuilder.addProcessor` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org