FrankYang0529 commented on code in PR #19457:
URL: https://github.com/apache/kafka/pull/19457#discussion_r2056032099


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -3573,6 +3582,329 @@ public void shouldTimeOutOnProducerInstanceId(final 
boolean stateUpdaterEnabled,
         );
     }
 
+    @Test
+    public void testNamedTopologyWithStreamsProtocol() {
+        final Properties props = configProps(false, false, false);
+        props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.toString());
+        final StreamsConfig config = new StreamsConfig(props);
+        final InternalTopologyBuilder topologyBuilder = new 
InternalTopologyBuilder(
+            new TopologyConfig(
+                "my-topology",
+                config,
+                new Properties())
+        );
+
+        final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
+            metrics,
+            APPLICATION_ID,
+            PROCESS_ID.toString(),
+            mockTime
+        );
+
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(topologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+
+        stateDirectory = new StateDirectory(config, mockTime, true, false);
+        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(
+            new TopologyMetadata(internalTopologyBuilder, config),
+            StreamsMetadataState.UNKNOWN_HOST,
+            new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
+        );
+        final IllegalStateException exception = 
assertThrows(IllegalStateException.class, () ->
+            StreamThread.create(
+                topologyMetadata,
+                config,
+                clientSupplier,
+                clientSupplier.getAdmin(config.getAdminConfigs(CLIENT_ID)),
+                PROCESS_ID,
+                CLIENT_ID,
+                streamsMetrics,
+                mockTime,
+                streamsMetadataState,
+                0,
+                stateDirectory,
+                new MockStateRestoreListener(),
+                new MockStandbyUpdateListener(),
+                threadIdx,
+                null,
+                HANDLER
+            )
+        );
+        assertEquals("Named topologies and the CONSUMER protocol cannot be 
used at the same time.", exception.getMessage());

Review Comment:
   Addressed it. Thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to