bbejeck commented on code in PR #18778: URL: https://github.com/apache/kafka/pull/18778#discussion_r1943825510
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java: ########## @@ -701,6 +679,8 @@ public final <KIn, VIn> void addGlobalStore(final String sourceName, nodeGrouper.add(processorName); nodeGrouper.unite(processorName, predecessors); globalStateBuilders.put(storeFactory.storeName(), storeFactory); + // connect source topic as (read-only) changelog topic for fault-tolerance Review Comment: ```suggestion // connect the source topic as (read-only) changelog topic for fault-tolerance ``` ########## streams/src/main/java/org/apache/kafka/streams/Topology.java: ########## @@ -1016,82 +775,86 @@ public synchronized <KIn, VIn> Topology addReadOnlyStateStore(final StoreBuilder ); internalTopologyBuilder.addProcessor(processorName, stateUpdateSupplier, sourceName); internalTopologyBuilder.addStateStore(storeBuilder, processorName); + + // connect source topic as (read-only) changelog topic for fault-tolerance Review Comment: ```suggestion // connect the source topic as (read-only) changelog topic for fault-tolerance ``` ########## streams/src/main/java/org/apache/kafka/streams/Topology.java: ########## @@ -119,10 +119,14 @@ private static AutoOffsetResetInternal convertOldToNew(final Topology.AutoOffset * @return itself * * @throws TopologyException - * if the provided source name is not unique, or - * if topics have already been registered by another source, + * if the provided source name is not unique, + * no topics are specified, or + * a topic has already been registered by another source, * {@link #addReadOnlyStateStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) read-only state store}, or * {@link #addGlobalStore(StoreBuilder, String, Deserializer, Deserializer, String, String, ProcessorSupplier) global state store} + * @throws NullPointerException + * if {@code name} or {@code topics} is {@code null}, or + * {@code topics} contains a {@code null} topic Review Comment: I'm ok with it ########## streams/src/main/java/org/apache/kafka/streams/Topology.java: ########## @@ -677,10 +662,10 @@ public synchronized <KIn, VIn, KOut, VOut> Topology addProcessor(final String na * * @throws TopologyException * if the {@link StoreBuilder#name() state store} was already added, or - * if a processor name is unknown + * if a processor name is unknown or specifies a source or sink Review Comment: Good call -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org