mjsax commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r432683735



##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -648,11 +648,12 @@ public synchronized Topology addSink(final String name,
      * If {@code supplier} provides stores via {@link 
ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
      * will be added to the topology and connected to this processor 
automatically.
      *
-     * @param name the unique name of the processor node
-     * @param supplier the supplier used to obtain this node's {@link 
Processor} instance
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this processor should receive
-     * and process
-     * @return itself
+     * @param name          the unique name of the processor node
+     * @param supplier      the supplier used to construct this node's {@link 
Processor} instance; the implementation of supplier
+     *                      should return a newly constructed {@link 
Processor} instance inside the scope of the lambda expression.

Review comment:
       I think it's better to add this in the main java docs above and keep the 
parameter description short. What about adding a similar sentence as suggested 
for the web-docs into the JavaDocs? (Same for the DSL operator on `KStream`)
   
   Also, we should not talk about lambdas, as we don't know if lambdas are used 
or not.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -842,7 +842,14 @@ public synchronized ProcessorTopology buildTopology() {
             nodeGroup.addAll(value);
         }
         nodeGroup.removeAll(globalNodeGroups());
-
+        for (final NodeFactory<?, ?> entry : nodeFactories.values()) {
+            if (entry instanceof ProcessorNodeFactory) {
+                ProcessorNodeFactory<?, ?> factory = (ProcessorNodeFactory<?, 
?>) entry;
+                if (factory.supplier.get() == factory.supplier.get()) {
+                    throw new TopologyException("topology has singleton result 
of ProcessorSupplier " + factory.name);
+                }
+            }
+        }

Review comment:
       It might be best to extract this into a helper method and call directly 
when `addProcessor()` is called. That way the corresponding stack trace is 
easier to map to the actually call to `addProcessor()` that passed in incorrect 
supplier?
   
   Similarly, we should add this check to `KStreamImpl#process()` and others

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -200,6 +200,9 @@ public StateStore getStateStore(final String name) {
             }
 
             final String sendTo = toInternal.child();
+            if (currentNode() == null) {
+                throw new StreamsException("Current node is unknown when 
forwarding to: " + key);

Review comment:
       Why do we add the `key` ? Does not seem to be useful and could 
potentially leak data into logs.
   
   We should also add a sentence about a potential root cause (even if we add 
other guards for it already in this PR):
   ```
   throw new StreamsException("Current node is unknown. This can happen if 
`forward()` is called in an illegal scope. The root cause could be that a 
`Processor` or `Transformer` instance is shared. To avoid this error, make sure 
that your suppliers return new instances each time `get()` is called and do not 
return the same object reference multiple times.");
   ```

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -439,6 +439,9 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
                 indicating that the state store cannot be found. If the state 
store is not associated with the processor
                 in the <code class="docutils literal"><span 
class="pre">Topology</span></code> code, accessing it in the processor&#8217;s 
<code class="docutils literal"><span class="pre">init()</span></code> method 
will also throw an exception at
                 runtime, indicating the state store is not accessible from 
this processor.</p>
+            <p>Note that there could be multiple <code class="docutils 
literal"><span class="pre">ProcessorContext</span></code> instances initialize 
your <code class="docutils literal"><span class="pre">Processor</span></code> 
during initialization.

Review comment:
       I think the first sentence might be too detailed? Maybe we can simplify:
   ```
   <p>Note that the `process()` function takes a `ProcessorSupplier` as 
argument, and that the supplier pattern requires that a new `Processor` 
instance is return each time `get()` is called. Creating a single `Processor` 
object and returning the same object reference in `get()` would be violation of 
the supplier pattern and leads to runtime exceptions.</p>
   ```

##########
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##########
@@ -2523,7 +2523,8 @@ void to(final TopicNameExtractor<K, V> topicExtractor,
      * for each input record, it is recommended to use {@link 
#flatTransform(TransformerSupplier, String...)
      * flatTransform()}.
      *
-     * @param transformerSupplier an instance of {@link TransformerSupplier} 
that generates a {@link Transformer}
+     * @param transformerSupplier an instance of {@link TransformerSupplier} 
that generates a newly constructed
+     *                            {@link Transformer}

Review comment:
       This is nice!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to