mjsax commented on a change in pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#discussion_r464752987



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -50,6 +50,14 @@
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
  *
+ * <p>
+ * It is a requirement that the processing logic ({@link Topology}) be defined 
in a deterministic way,
+ * as in, the order in which all operators are added must be predictable and 
the same across all application
+ * instances.
+ * Topologies are only identical if all operators are added in the same order.
+ * If different KafkaStream instances of the same application build different 
topologies the result may be

Review comment:
       {@link KafkaStreams}

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
##########
@@ -50,6 +50,14 @@
 /**
  * {@code StreamsBuilder} provide the high-level Kafka Streams DSL to specify 
a Kafka Streams topology.
  *
+ * <p>
+ * It is a requirement that the processing logic ({@link Topology}) be defined 
in a deterministic way,
+ * as in, the order in which all operators are added must be predictable and 
the same across all application
+ * instances.
+ * Topologies are only identical if all operators are added in the same order.
+ * If different KafkaStream instances of the same application build different 
topologies the result may be
+ * incompatible runtimes and unexpected results.

Review comment:
       `incompatible runtimes and unexpected results` -> `incompatible runtime 
code and unexpected results or errors.`

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1916,6 +1917,40 @@ public void shouldAlwaysSuspendRunningTasks() {
         assertThat(task.state(), equalTo(SUSPENDED));
     }
 
+    @Test
+    public void szTest() {
+        final InternalProcessorContext context = new ProcessorContextImpl(
+                taskId,
+                createConfig(false, "100"),
+                stateManager,
+                streamsMetrics,
+                null
+        );
+        final StreamsMetricsImpl metrics = new 
StreamsMetricsImpl(this.metrics, "test", StreamsConfig.METRICS_LATEST);
+        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        EasyMock.replay(stateManager);
+
+        // The processor topology is missing the topics
+        final ProcessorTopology topology = withSources(asList(), mkMap());
+
+        assertThrows(TopologyException.class, () ->

Review comment:
       nit: formatting: (we should also get the exception an verify the error 
message)
   ```
   final TopologyException  exception = assertThrows(
       TopologyException.class,
       () -> new StreamTask(
           ...
       )
   );
   
   assertThat(exception.getMessage(), equalTo("..."));
   ```
   
   

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -1916,6 +1917,40 @@ public void shouldAlwaysSuspendRunningTasks() {
         assertThat(task.state(), equalTo(SUSPENDED));
     }
 
+    @Test
+    public void szTest() {

Review comment:
       `szTest` is a terrible test name: 
`shouldThrowTopologyExceptionIfTaskCreatedForUnknownTopic`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to