bbejeck commented on code in PR #18778:
URL: https://github.com/apache/kafka/pull/18778#discussion_r1943825510


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java:
##########
@@ -701,6 +679,8 @@ public final <KIn, VIn> void addGlobalStore(final String 
sourceName,
         nodeGrouper.add(processorName);
         nodeGrouper.unite(processorName, predecessors);
         globalStateBuilders.put(storeFactory.storeName(), storeFactory);
+        // connect source topic as (read-only) changelog topic for 
fault-tolerance

Review Comment:
   ```suggestion
           // connect the source topic as (read-only) changelog topic for 
fault-tolerance
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -1016,82 +775,86 @@ public synchronized <KIn, VIn> Topology 
addReadOnlyStateStore(final StoreBuilder
         );
         internalTopologyBuilder.addProcessor(processorName, 
stateUpdateSupplier, sourceName);
         internalTopologyBuilder.addStateStore(storeBuilder, processorName);
+
+        // connect source topic as (read-only) changelog topic for 
fault-tolerance

Review Comment:
   ```suggestion
           // connect the source topic as (read-only) changelog topic for 
fault-tolerance
   ```



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -119,10 +119,14 @@ private static AutoOffsetResetInternal 
convertOldToNew(final Topology.AutoOffset
      * @return itself
      *
      * @throws TopologyException
-     *         if the provided source name is not unique, or
-     *         if topics have already been registered by another source,
+     *         if the provided source name is not unique,
+     *         no topics are specified, or
+     *         a topic has already been registered by another source,
      *         {@link #addReadOnlyStateStore(StoreBuilder, String, 
Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state 
store}, or
      *         {@link #addGlobalStore(StoreBuilder, String, Deserializer, 
Deserializer, String, String, ProcessorSupplier) global state store}
+     * @throws NullPointerException
+     *         if {@code name} or {@code topics} is {@code null}, or
+     *         {@code topics} contains a {@code null} topic

Review Comment:
   I'm ok with it



##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -677,10 +662,10 @@ public synchronized <KIn, VIn, KOut, VOut> Topology 
addProcessor(final String na
      *
      * @throws TopologyException
      *         if the {@link StoreBuilder#name() state store} was already 
added, or
-     *         if a processor name is unknown
+     *         if a processor name is unknown or specifies a source or sink

Review Comment:
   Good call



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to