sneakyburro commented on a change in pull request #8752:
URL: https://github.com/apache/kafka/pull/8752#discussion_r437317697



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##########
@@ -842,7 +842,14 @@ public synchronized ProcessorTopology buildTopology() {
             nodeGroup.addAll(value);
         }
         nodeGroup.removeAll(globalNodeGroups());
-
+        for (final NodeFactory<?, ?> entry : nodeFactories.values()) {
+            if (entry instanceof ProcessorNodeFactory) {
+                ProcessorNodeFactory<?, ?> factory = (ProcessorNodeFactory<?, 
?>) entry;
+                if (factory.supplier.get() == factory.supplier.get()) {
+                    throw new TopologyException("topology has singleton result 
of ProcessorSupplier " + factory.name);
+                }
+            }
+        }

Review comment:
       That's a great idea. I introduced a util class `TopologyUtil` under 
processor package.

##########
File path: docs/streams/developer-guide/processor-api.html
##########
@@ -439,6 +439,9 @@ <h2><a class="toc-backref" href="#id8">Connecting 
Processors and State Stores</a
                 indicating that the state store cannot be found. If the state 
store is not associated with the processor
                 in the <code class="docutils literal"><span 
class="pre">Topology</span></code> code, accessing it in the processor&#8217;s 
<code class="docutils literal"><span class="pre">init()</span></code> method 
will also throw an exception at
                 runtime, indicating the state store is not accessible from 
this processor.</p>
+            <p>Note that there could be multiple <code class="docutils 
literal"><span class="pre">ProcessorContext</span></code> instances initialize 
your <code class="docutils literal"><span class="pre">Processor</span></code> 
during initialization.

Review comment:
       Adopted and revised a little. Thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/Topology.java
##########
@@ -648,11 +648,12 @@ public synchronized Topology addSink(final String name,
      * If {@code supplier} provides stores via {@link 
ConnectedStoreProvider#stores()}, the provided {@link StoreBuilder}s
      * will be added to the topology and connected to this processor 
automatically.
      *
-     * @param name the unique name of the processor node
-     * @param supplier the supplier used to obtain this node's {@link 
Processor} instance
-     * @param parentNames the name of one or more source or processor nodes 
whose output records this processor should receive
-     * and process
-     * @return itself
+     * @param name          the unique name of the processor node
+     * @param supplier      the supplier used to construct this node's {@link 
Processor} instance; the implementation of supplier
+     *                      should return a newly constructed {@link 
Processor} instance inside the scope of the lambda expression.

Review comment:
       I reverted java docs on parameter and added web-docs-like docs on main 
java docs for this function. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -200,6 +200,9 @@ public StateStore getStateStore(final String name) {
             }
 
             final String sendTo = toInternal.child();
+            if (currentNode() == null) {
+                throw new StreamsException("Current node is unknown when 
forwarding to: " + key);

Review comment:
       Made the change!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to