guozhangwang commented on code in PR #17892: URL: https://github.com/apache/kafka/pull/17892#discussion_r1855228593
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -61,16 +66,32 @@ import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST; import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST; import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE; +import static org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG; public class InternalTopologyBuilder { public InternalTopologyBuilder() { this.topologyName = null; + this.processorWrapper = new NoOpProcessorWrapper(); } public InternalTopologyBuilder(final TopologyConfig topologyConfigs) { this.topologyConfigs = topologyConfigs; this.topologyName = topologyConfigs.topologyName; + + try { + processorWrapper = topologyConfigs.getConfiguredInstance( + PROCESSOR_WRAPPER_CLASS_CONFIG, + ProcessorWrapper.class, + topologyConfigs.originals() + ); + } catch (final Exception e) { + final String errorMessage = String.format( + "Unable to instantiate ProcessorWrapper from value of config %s. Please provide a valid class " + + "that implements the ProcessorWrapper interface.", PROCESSOR_WRAPPER_CLASS_CONFIG); Review Comment: nit: " that"? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() { public synchronized Map<String, StoreFactory> stateStores() { return stateFactories; } + + public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier( + final String name, + final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier + ) { + return ProcessorWrapper.asWrappedFixedKey( + processorWrapper.wrapFixedKeyProcessorSupplier(name, processorSupplier) + ); + } + + public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, VOut> wrapProcessorSupplier( + final String name, + final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier + ) { + return ProcessorWrapper.asWrapped( Review Comment: Similar here. ########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -68,13 +72,26 @@ * Streams configs that apply at the topology level. The values in the {@link StreamsConfig} parameter of the * {@link org.apache.kafka.streams.KafkaStreams} constructor or the {@link KafkaStreamsNamedTopologyWrapper} constructor (deprecated) * will determine the defaults, which can then be overridden for specific topologies by passing them in when creating the - * topology builders via the {@link org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) StreamsBuilder(TopologyConfig)} method. + * topology builders via the {@link StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor for DSL applications, + * or the {@link Topology#Topology(TopologyConfig)} for PAPI applications. + * <p> + * Note that some configs, such as the {@code processor.wrapper.class} config, can only take effect while the + * topology is being built, which means they have to be passed in as a TopologyConfig to the + * {@link Topology#Topology(TopologyConfig)} constructor (PAPI) or the Review Comment: Thanks for adding the docs here, would be helpful to users. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() { public synchronized Map<String, StoreFactory> stateStores() { return stateFactories; } + + public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> wrapFixedKeyProcessorSupplier( + final String name, + final FixedKeyProcessorSupplier<KIn, VIn, VOut> processorSupplier + ) { + return ProcessorWrapper.asWrappedFixedKey( + processorWrapper.wrapFixedKeyProcessorSupplier(name, processorSupplier) Review Comment: Just checking if this is correct? `wrapFixedKeyProcessorSupplier` func itself calls `return ProcessorWrapper.asWrappedFixedKey(processorSupplier);` already, and hence seems a circular calling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org