agavra commented on code in PR #17892:
URL: https://github.com/apache/kafka/pull/17892#discussion_r1853087162


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -61,16 +66,31 @@
 import static org.apache.kafka.clients.consumer.OffsetResetStrategy.EARLIEST;
 import static org.apache.kafka.clients.consumer.OffsetResetStrategy.LATEST;
 import static org.apache.kafka.clients.consumer.OffsetResetStrategy.NONE;
+import static 
org.apache.kafka.streams.StreamsConfig.PROCESSOR_WRAPPER_CLASS_CONFIG;
 
 public class InternalTopologyBuilder {
 
     public InternalTopologyBuilder() {
         this.topologyName = null;
+        this.processorWrapper = new NoOpProcessorWrapper();
     }
 
     public InternalTopologyBuilder(final TopologyConfig topologyConfigs) {
         this.topologyConfigs = topologyConfigs;
         this.topologyName = topologyConfigs.topologyName;
+
+        try {
+            processorWrapper = topologyConfigs.getConfiguredInstance(
+                PROCESSOR_WRAPPER_CLASS_CONFIG,
+                ProcessorWrapper.class,
+                topologyConfigs.originals()
+            );
+        } catch (final Exception e) {
+            log.error("Unable to instantiate ProcessorWrapper from value of 
config {}. "

Review Comment:
   nit: does it make sense to also include the error in the logged message in 
case the user swallows this up higher in the stack somewhere?



##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -68,13 +72,23 @@
  * Streams configs that apply at the topology level. The values in the {@link 
StreamsConfig} parameter of the
  * {@link org.apache.kafka.streams.KafkaStreams} constructor or the {@link 
KafkaStreamsNamedTopologyWrapper} constructor (deprecated)
  * will determine the defaults, which can then be overridden for specific 
topologies by passing them in when creating the
- * topology builders via the {@link 
org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) 
StreamsBuilder(TopologyConfig)} method.
+ * topology builders via the {@link 
org.apache.kafka.streams.StreamsBuilder#StreamsBuilder(TopologyConfig) 
StreamsBuilder(TopologyConfig)} method
+ * for DSL applications, or when creating a PAPI topology via the {@link 
Topology#Topology(TopologyConfig)} constructor.
+ * <p>
+ * Note that some configs that are only defined in the TopologyConfig and not 
in the StreamsConfig, such as the {@code processor.wrapper.class},

Review Comment:
   Is there a possibility of having this be available (and respected) in 
`StreamsConfig`? People tend to forget to properly pipe things through 
TopolgoyConfig, though having it only there makes it a bit more obvious where 
it needs to be passed in.



##########
streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ProcessorParameters.java:
##########
@@ -78,18 +80,30 @@ public FixedKeyProcessorSupplier<KIn, VIn, VOut> 
fixedKeyProcessorSupplier() {
 
     public void addProcessorTo(final InternalTopologyBuilder topologyBuilder, 
final String[] parentNodeNames) {
         if (processorSupplier != null) {
-            topologyBuilder.addProcessor(processorName, processorSupplier, 
parentNodeNames);
-            if (processorSupplier.stores() != null) {
-                for (final StoreBuilder<?> storeBuilder : 
processorSupplier.stores()) {
+            ApiUtils.checkSupplier(processorSupplier);
+
+            final ProcessorSupplier<KIn, VIn, KOut, VOut> wrapped =

Review Comment:
   curious why we do this here instead of inside `topologyBuilder.addProcessor`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to