FrankYang0529 commented on code in PR #19457: URL: https://github.com/apache/kafka/pull/19457#discussion_r2056032099
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ########## @@ -3573,6 +3582,329 @@ public void shouldTimeOutOnProducerInstanceId(final boolean stateUpdaterEnabled, ); } + @Test + public void testNamedTopologyWithStreamsProtocol() { + final Properties props = configProps(false, false, false); + props.setProperty(StreamsConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.STREAMS.toString()); + final StreamsConfig config = new StreamsConfig(props); + final InternalTopologyBuilder topologyBuilder = new InternalTopologyBuilder( + new TopologyConfig( + "my-topology", + config, + new Properties()) + ); + + final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl( + metrics, + APPLICATION_ID, + PROCESS_ID.toString(), + mockTime + ); + + final TopologyMetadata topologyMetadata = new TopologyMetadata(topologyBuilder, config); + topologyMetadata.buildAndRewriteTopology(); + + stateDirectory = new StateDirectory(config, mockTime, true, false); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState( + new TopologyMetadata(internalTopologyBuilder, config), + StreamsMetadataState.UNKNOWN_HOST, + new LogContext(String.format("stream-client [%s] ", CLIENT_ID)) + ); + final IllegalStateException exception = assertThrows(IllegalStateException.class, () -> + StreamThread.create( + topologyMetadata, + config, + clientSupplier, + clientSupplier.getAdmin(config.getAdminConfigs(CLIENT_ID)), + PROCESS_ID, + CLIENT_ID, + streamsMetrics, + mockTime, + streamsMetadataState, + 0, + stateDirectory, + new MockStateRestoreListener(), + new MockStandbyUpdateListener(), + threadIdx, + null, + HANDLER + ) + ); + assertEquals("Named topologies and the CONSUMER protocol cannot be used at the same time.", exception.getMessage()); Review Comment: Addressed it. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org