bbejeck commented on code in PR #19700: URL: https://github.com/apache/kafka/pull/19700#discussion_r2093435225
########## tests/kafkatest/tests/streams/streams_eos_test.py: ########## @@ -160,10 +168,16 @@ def abort_streams(self, keep_alive_processor1, keep_alive_processor2, processor_ self.wait_for_startup(monitor1, keep_alive_processor1) def wait_for_startup(self, monitor, processor): - self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING") + if self.group_protocol == "classic": + self.wait_for(monitor, processor, "StateChange: REBALANCING -> RUNNING") + else: + # In the streams group protocol, not all members will take part in the rebalance. + # We can indirectly observe the progress of the group by seeing the member epoch being bumped. + self.wait_for(monitor, processor, "MemberEpochBump") self.wait_for(monitor, processor, "processed [0-9]* records from topic") - def wait_for(self, monitor, processor, output): + @staticmethod + def wait_for(monitor, processor, output): monitor.wait_until(output, - timeout_sec=480, + timeout_sec=60, Review Comment: I agree that `480` seconds is a big time to wait are we sure 60 is enough to not have flaky results? Although I guess it really should be enough. ########## streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java: ########## @@ -248,21 +250,33 @@ public static void verify(final String kafka, final boolean withRepartitioning) System.out.flush(); } - private static void ensureStreamsApplicationDown(final Admin adminClient) { - + private static void ensureStreamsApplicationDown(final Admin adminClient, final String groupProtocol) { final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS; - ConsumerGroupDescription description; - do { - description = getConsumerGroupDescription(adminClient); - - if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) { - throw new RuntimeException( - "Streams application not down after " + (MAX_IDLE_TIME_MS / 1000L) + " seconds. " + - "Group: " + description - ); - } - sleep(1000L); - } while (!description.members().isEmpty()); + if (Objects.equals(groupProtocol, "streams")) { + StreamsGroupDescription description; + do { + description = getStreamsGroupDescription(adminClient); + if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) { + throw new RuntimeException( + "Streams application not down after " + MAX_IDLE_TIME_MS / 1000L + " seconds. " + + "Group: " + description + ); + } + sleep(1000L); + } while (!description.members().isEmpty()); + } else { + ConsumerGroupDescription description; + do { + description = getConsumerGroupDescription(adminClient); + if (System.currentTimeMillis() > maxWaitTime && !description.members().isEmpty()) { + throw new RuntimeException( + "Streams application not down after " + MAX_IDLE_TIME_MS / 1000L + " seconds. " + + "Group: " + description + ); + } + sleep(1000L); + } while (!description.members().isEmpty()); + } Review Comment: nit: I think this could be refactored into one `do-while` loop with the `if-else` separating out the `StreamsGroupDescription` vs. the `ConsumerGroupDescription` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org