bbejeck commented on code in PR #19438:
URL: https://github.com/apache/kafka/pull/19438#discussion_r2038214625


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/topics/ChangelogTopicsTest.java:
##########
@@ -149,14 +149,14 @@ public void shouldContainNonSourceBasedChangelogs() {
     }
 
     @Test
-    public void shouldNotContainSourceBasedChangelogs() {
+    public void shouldContainSourceBasedChangelogs() {

Review Comment:
   The test is more or less the same, why the name change and the assertions?  
I understand some underlying behavior changed - just curious.



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java:
##########
@@ -446,6 +449,19 @@ public Properties getLogConfig(final String topic) {
         }
     }
 
+    public void setStandbyReplicas(final String groupId, final int 
numStandbyReplicas) {

Review Comment:
   NM my comment on dynamic behavior I see this now



##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StandbyTaskCreationIntegrationTest.java:
##########
@@ -87,19 +90,25 @@ public void after() {
         client2.close(Duration.ofSeconds(60));
     }
 
-    private Properties streamsConfiguration() {
+    private Properties streamsConfiguration(final boolean 
streamsProtocolEnabled) {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" + 
safeTestName);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
-        streamsConfiguration.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
+        if (streamsProtocolEnabled) {
+            streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+            CLUSTER.setStandbyReplicas("app-" + safeTestName, 1);

Review Comment:
   This is dynamic behavior? I thought this needed to be set on startup.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to