ableegoldman commented on a change in pull request #8540:
URL: https://github.com/apache/kafka/pull/8540#discussion_r414822929



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -701,18 +703,33 @@ private KafkaStreams(final InternalTopologyBuilder 
internalTopologyBuilder,
                 internalTopologyBuilder,
                 
parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
+        final int numStreamThreads;
+        if (internalTopologyBuilder.hasNoNonGlobalTopology()) {
+            log.warn("Overriding number of StreamThreads to zero for 
global-only topology");
+            numStreamThreads = 0;
+        } else {
+            numStreamThreads = 
config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG);
+        }
+
         // create the stream thread, global update thread, and cleanup thread
-        threads = new 
StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
+        threads = new StreamThread[numStreamThreads];
+
+        final ProcessorTopology globalTaskTopology = 
internalTopologyBuilder.buildGlobalStateTopology();
+        final boolean hasGlobalTopology = globalTaskTopology != null;
+
+        if (numStreamThreads == 0 && !hasGlobalTopology) {
+            log.error("Must subscribe to at least one source topic or global 
table");
+            throw new IllegalArgumentException("Topology has no stream threads 
and no global threads");

Review comment:
       ~~Also, do we have an `InvalidTopologyException` or similar exception 
already? Or were you proposing to add a new type~~ 
   edit: Nevermind it's just `TopologyException`, found it




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to