lucasbru commented on code in PR #19700:
URL: https://github.com/apache/kafka/pull/19700#discussion_r2093593120


##########
streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java:
##########
@@ -248,21 +250,33 @@ public static void verify(final String kafka, final 
boolean withRepartitioning)
         System.out.flush();
     }
 
-    private static void ensureStreamsApplicationDown(final Admin adminClient) {
-
+    private static void ensureStreamsApplicationDown(final Admin adminClient, 
final String groupProtocol) {
         final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
-        ConsumerGroupDescription description;
-        do {
-            description = getConsumerGroupDescription(adminClient);
-
-            if (System.currentTimeMillis() > maxWaitTime && 
!description.members().isEmpty()) {
-                throw new RuntimeException(
-                    "Streams application not down after " + (MAX_IDLE_TIME_MS 
/ 1000L) + " seconds. " +
-                        "Group: " + description
-                );
-            }
-            sleep(1000L);
-        } while (!description.members().isEmpty());
+        if (Objects.equals(groupProtocol, "streams")) {
+            StreamsGroupDescription description;
+            do {
+                description = getStreamsGroupDescription(adminClient);
+                if (System.currentTimeMillis() > maxWaitTime && 
!description.members().isEmpty()) {
+                    throw new RuntimeException(
+                        "Streams application not down after " + 
MAX_IDLE_TIME_MS / 1000L + " seconds. " +
+                            "Group: " + description
+                    );
+                }
+                sleep(1000L);
+            } while (!description.members().isEmpty());
+        } else {
+            ConsumerGroupDescription description;
+            do {
+                description = getConsumerGroupDescription(adminClient);
+                if (System.currentTimeMillis() > maxWaitTime && 
!description.members().isEmpty()) {
+                    throw new RuntimeException(
+                        "Streams application not down after " + 
MAX_IDLE_TIME_MS / 1000L + " seconds. " +
+                            "Group: " + description
+                    );
+                }
+                sleep(1000L);
+            } while (!description.members().isEmpty());
+        }

Review Comment:
   A bit of an awkward refactoring, because `description` is used in the while 
condition, and the two description types only share `Object` as common 
ancestor. Tried my best



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to