bbejeck commented on code in PR #19700:
URL: https://github.com/apache/kafka/pull/19700#discussion_r2093435225


##########
tests/kafkatest/tests/streams/streams_eos_test.py:
##########
@@ -160,10 +168,16 @@ def abort_streams(self, keep_alive_processor1, 
keep_alive_processor2, processor_
         self.wait_for_startup(monitor1, keep_alive_processor1)
 
     def wait_for_startup(self, monitor, processor):
-        self.wait_for(monitor, processor, "StateChange: REBALANCING -> 
RUNNING")
+        if self.group_protocol == "classic":
+            self.wait_for(monitor, processor, "StateChange: REBALANCING -> 
RUNNING")
+        else:
+            # In the streams group protocol, not all members will take part in 
the rebalance.
+            # We can indirectly observe the progress of the group by seeing 
the member epoch being bumped.
+            self.wait_for(monitor, processor, "MemberEpochBump")
         self.wait_for(monitor, processor, "processed [0-9]* records from 
topic")
 
-    def wait_for(self, monitor, processor, output):
+    @staticmethod
+    def wait_for(monitor, processor, output):
         monitor.wait_until(output,
-                           timeout_sec=480,
+                           timeout_sec=60,

Review Comment:
   I agree that `480` seconds is a big time to wait are we sure 60 is enough to 
not have flaky results? Although I guess it really should be enough.



##########
streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java:
##########
@@ -248,21 +250,33 @@ public static void verify(final String kafka, final 
boolean withRepartitioning)
         System.out.flush();
     }
 
-    private static void ensureStreamsApplicationDown(final Admin adminClient) {
-
+    private static void ensureStreamsApplicationDown(final Admin adminClient, 
final String groupProtocol) {
         final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
-        ConsumerGroupDescription description;
-        do {
-            description = getConsumerGroupDescription(adminClient);
-
-            if (System.currentTimeMillis() > maxWaitTime && 
!description.members().isEmpty()) {
-                throw new RuntimeException(
-                    "Streams application not down after " + (MAX_IDLE_TIME_MS 
/ 1000L) + " seconds. " +
-                        "Group: " + description
-                );
-            }
-            sleep(1000L);
-        } while (!description.members().isEmpty());
+        if (Objects.equals(groupProtocol, "streams")) {
+            StreamsGroupDescription description;
+            do {
+                description = getStreamsGroupDescription(adminClient);
+                if (System.currentTimeMillis() > maxWaitTime && 
!description.members().isEmpty()) {
+                    throw new RuntimeException(
+                        "Streams application not down after " + 
MAX_IDLE_TIME_MS / 1000L + " seconds. " +
+                            "Group: " + description
+                    );
+                }
+                sleep(1000L);
+            } while (!description.members().isEmpty());
+        } else {
+            ConsumerGroupDescription description;
+            do {
+                description = getConsumerGroupDescription(adminClient);
+                if (System.currentTimeMillis() > maxWaitTime && 
!description.members().isEmpty()) {
+                    throw new RuntimeException(
+                        "Streams application not down after " + 
MAX_IDLE_TIME_MS / 1000L + " seconds. " +
+                            "Group: " + description
+                    );
+                }
+                sleep(1000L);
+            } while (!description.members().isEmpty());
+        }

Review Comment:
   nit: I think this could be refactored into one `do-while` loop with the 
`if-else` separating out the `StreamsGroupDescription` vs. the 
`ConsumerGroupDescription`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to