mjsax commented on a change in pull request #9064: URL: https://github.com/apache/kafka/pull/9064#discussion_r464752987
########## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ########## @@ -50,6 +50,14 @@ /** * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology. * + * <p> + * It is a requirement that the processing logic ({@link Topology}) be defined in a deterministic way, + * as in, the order in which all operators are added must be predictable and the same across all application + * instances. + * Topologies are only identical if all operators are added in the same order. + * If different KafkaStream instances of the same application build different topologies the result may be Review comment: {@link KafkaStreams} ########## File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java ########## @@ -50,6 +50,14 @@ /** * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify a Kafka Streams topology. * + * <p> + * It is a requirement that the processing logic ({@link Topology}) be defined in a deterministic way, + * as in, the order in which all operators are added must be predictable and the same across all application + * instances. + * Topologies are only identical if all operators are added in the same order. + * If different KafkaStream instances of the same application build different topologies the result may be + * incompatible runtimes and unexpected results. Review comment: `incompatible runtimes and unexpected results` -> `incompatible runtime code and unexpected results or errors.` ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -1916,6 +1917,40 @@ public void shouldAlwaysSuspendRunningTasks() { assertThat(task.state(), equalTo(SUSPENDED)); } + @Test + public void szTest() { + final InternalProcessorContext context = new ProcessorContextImpl( + taskId, + createConfig(false, "100"), + stateManager, + streamsMetrics, + null + ); + final StreamsMetricsImpl metrics = new StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST); + EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet()); + EasyMock.replay(stateManager); + + // The processor topology is missing the topics + final ProcessorTopology topology = withSources(asList(), mkMap()); + + assertThrows(TopologyException.class, () -> Review comment: nit: formatting: (we should also get the exception an verify the error message) ``` final TopologyException exception = assertThrows( TopologyException.class, () -> new StreamTask( ... ) ); assertThat(exception.getMessage(), equalTo("...")); ``` ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -1916,6 +1917,40 @@ public void shouldAlwaysSuspendRunningTasks() { assertThat(task.state(), equalTo(SUSPENDED)); } + @Test + public void szTest() { Review comment: `szTest` is a terrible test name: `shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org