chia7712 commented on code in PR #20199: URL: https://github.com/apache/kafka/pull/20199#discussion_r2229040252
########## server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java: ########## @@ -180,8 +159,8 @@ public void testHighWatermarkShouldNotAdvanceIfUnderMinIsr() throws ExecutionExc producer.send(new ProducerRecord<>(testTopicName, "0", "0")).get(); waitUntilOneMessageIsConsumed(consumer); - killBroker(initialReplicas.get(0).id()); - killBroker(initialReplicas.get(1).id()); + clusterInstance.shutdownBroker(initialReplicas.get(0).id()); + clusterInstance.shutdownBroker(initialReplicas.get(1).id()); waitForIsrAndElr((isrSize, elrSize) -> { Review Comment: `waitForIsrAndElr((isrSize, elrSize) -> isrSize == 0 && elrSize == 3);` ########## server/src/test/java/org/apache/kafka/server/EligibleLeaderReplicasIntegrationTest.java: ########## @@ -331,17 +310,16 @@ public void testElrMemberShouldBeKickOutWhenUncleanShutdown() throws ExecutionEx .allTopicNames().get().get(testTopicName).partitions().get(0); int brokerToBeUncleanShutdown = topicPartitionInfo.elr().get(0).id(); - KafkaBroker broker = brokers().find(b -> { - return b.config().brokerId() == brokerToBeUncleanShutdown; - }).get(); + KafkaBroker broker = clusterInstance.brokers().values().stream().filter(b -> b.config().brokerId() == brokerToBeUncleanShutdown) + .findFirst().get(); Seq<File> dirs = broker.logManager().liveLogDirs(); assertEquals(1, dirs.size()); CleanShutdownFileHandler handler = new CleanShutdownFileHandler(dirs.apply(0).toString()); assertTrue(handler.exists()); assertDoesNotThrow(() -> handler.delete()); Review Comment: `assertDoesNotThrow(handler::delete);` ########## test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java: ########## @@ -444,6 +444,16 @@ default List<Integer> boundPorts() { .map(KafkaBroker::socketServer) .map(s -> s.boundPort(clientListener())) .toList(); + } + default void restartDeadBrokers() throws InterruptedException { + if (brokers().isEmpty()) { + throw new IllegalArgumentException("Must supply at least one server config."); + } + brokers().entrySet().forEach(entry -> { + if (!entry.getValue().isShutdown()) { Review Comment: it seems the method is used to restart "dead" brokers, so the condition should be `if (entry.getValue().isShutdown())`, right? ########## checkstyle/import-control-server.xml: ########## @@ -32,9 +32,8 @@ <allow pkg="javax.net.ssl" /> <allow pkg="javax.security" /> <allow pkg="net.jqwik.api" /> - - <!-- no one depends on the server --> - <disallow pkg="kafka" /> + <allow pkg="scala" /> Review Comment: Please use `var` to avoid importing those dependencies explicitly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org