guozhangwang commented on code in PR #17892:
URL: https://github.com/apache/kafka/pull/17892#discussion_r1855228593


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -61,16 +66,32 @@
 import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST;
 import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST;
 import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE;
+import static 
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
 
 public class InternalTopologyBuilder {
 
     public InternalTopologyBuilder() {
         this.topologyName = null;
+        this.processorWrapper = new NoOpProcessorWrapper();
     }
 
     public InternalTopologyBuilder(final TopologyConfig topologyConfigs) {
         this.topologyConfigs = topologyConfigs;
         this.topologyName = topologyConfigs.topologyName;
+
+        try {
+            processorWrapper = topologyConfigs.getConfiguredInstance(
+                PROCESSOR_WRAPPER_CLASS_CONFIG,
+                ProcessorWrapper.class,
+                topologyConfigs.originals()
+            );
+        } catch (final Exception e) {
+            final String errorMessage = String.format(
+                "Unable to instantiate ProcessorWrapper from value of config 
%s. Please provide a valid class "
+                    + "that implements the ProcessorWrapper interface.", 
PROCESSOR_WRAPPER_CLASS_CONFIG);

Review Comment:
   nit: " that"?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() {
     public synchronized Map<String, StoreFactory> stateStores() {
         return stateFactories;
     }
+
+    public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> 
wrapFixedKeyProcessorSupplier(
+        final String name,
+        final FixedKeyProcessorSupplier<KIn, VIn,  VOut> processorSupplier
+    ) {
+        return ProcessorWrapper.asWrappedFixedKey(
+            processorWrapper.wrapFixedKeyProcessorSupplier(name, 
processorSupplier)
+        );
+    }
+
+    public <KIn, VIn, KOut, VOut> WrappedProcessorSupplier<KIn, VIn, KOut, 
VOut> wrapProcessorSupplier(
+        final String name,
+        final ProcessorSupplier<KIn, VIn, KOut, VOut> processorSupplier
+    ) {
+        return ProcessorWrapper.asWrapped(

Review Comment:
   Similar here.



##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -68,13 +72,26 @@
  * Streams configs that apply at the topology level. The values in the {@link 
StreamsConfig} parameter of the
  * {@link org.apache.kafka.streams.KafkaStreams} constructor or the {@link 
KafkaStreamsNamedTopologyWrapper} constructor (deprecated)
  * will determine the defaults, which can then be overridden for specific 
topologies by passing them in when creating the
- * topology builders via the {@link 
org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) 
StreamsBuilder(TopologyConfig)} method.
+ * topology builders via the {@link 
StreamsBuilder#StreamsBuilder(TopologyConfig)} constructor for DSL applications,
+ * or the {@link Topology#Topology(TopologyConfig)} for PAPI applications.
+ * <p>
+ * Note that some configs, such as the {@code processor.wrapper.class} config, 
can only take effect while the
+ * topology is being built, which means they have to be passed in as a 
TopologyConfig to the
+ * {@link Topology#Topology(TopologyConfig)} constructor (PAPI) or the

Review Comment:
   Thanks for adding the docs here, would be helpful to users.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -2245,4 +2272,22 @@ public boolean hasNamedTopology() {
     public synchronized Map<String, StoreFactory> stateStores() {
         return stateFactories;
     }
+
+    public <KIn, VIn, VOut> WrappedFixedKeyProcessorSupplier<KIn, VIn, VOut> 
wrapFixedKeyProcessorSupplier(
+        final String name,
+        final FixedKeyProcessorSupplier<KIn, VIn,  VOut> processorSupplier
+    ) {
+        return ProcessorWrapper.asWrappedFixedKey(
+            processorWrapper.wrapFixedKeyProcessorSupplier(name, 
processorSupplier)

Review Comment:
   Just checking if this is correct? `wrapFixedKeyProcessorSupplier` func 
itself calls `return ProcessorWrapper.asWrappedFixedKey(processorSupplier);` 
already, and hence seems a circular calling?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to