chia7712 commented on code in PR #15715: URL: https://github.com/apache/kafka/pull/15715#discussion_r1589861459
########## core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java: ########## @@ -27,6 +27,23 @@ @Target({ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ClusterConfigProperty { + /** + * The config applies to the controller/broker with specified id. Default is -1, indicating the property applied to + * all controller/broker servers. Note that the "controller" here refers to the KRaft quorum controller. + * The id can vary depending on the different {@link kafka.test.annotation.Type}. + * <ul> + * <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts from 0 and increases by 1 + * with each additional broker, and there is no controller server under this mode. </li> + * <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id starts from 0, the controller id + * starts from 3000 and increases by 1 with each addition broker/controller.</li> + * <li> Under {@link kafka.test.annotation.Type#CO_KRAFT}, the broker id and controller id both start from 0 + * and increases by 1 with each additional broker/controller.</li> + * </ul> + * + * If the id doesn't correspond to any broker/controller server, the config will be ignored. Review Comment: Maybe we should throw exception to do fast fail. WDTY? ########## core/src/test/java/kafka/testkit/TestKitNodes.java: ########## @@ -148,7 +151,7 @@ private int startControllerId() { if (combined) { return startBrokerId(); } - return startBrokerId() + 3000; + return startBrokerId() + CONTROLLER_ID_OFFSET; Review Comment: Maybe we should replace `startBrokerId` by a constant variable too. ########## core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java: ########## @@ -27,6 +27,23 @@ @Target({ElementType.ANNOTATION_TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface ClusterConfigProperty { + /** + * The config applies to the controller/broker with specified id. Default is -1, indicating the property applied to + * all controller/broker servers. Note that the "controller" here refers to the KRaft quorum controller. + * The id can vary depending on the different {@link kafka.test.annotation.Type}. + * <ul> + * <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts from 0 and increases by 1 + * with each additional broker, and there is no controller server under this mode. </li> + * <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id starts from 0, the controller id + * starts from 3000 and increases by 1 with each addition broker/controller.</li> Review Comment: `3000` can be replaced by the `{@link }` ########## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ########## @@ -80,28 +84,55 @@ public void testClusterTemplate() { @ClusterTests({ @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = { @ClusterConfigProperty(key = "foo", value = "bar"), - @ClusterConfigProperty(key = "spam", value = "eggs") + @ClusterConfigProperty(key = "spam", value = "eggs"), + @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // this one will be ignored as there is no broker id is 86400 }), @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), - @ClusterConfigProperty(key = "default.key", value = "overwrite.value") + @ClusterConfigProperty(key = "default.key", value = "overwrite.value"), + @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"), + @ClusterConfigProperty(id = 3000, key = "queued.max.requests", value = "300") }), @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), - @ClusterConfigProperty(key = "default.key", value = "overwrite.value") + @ClusterConfigProperty(key = "default.key", value = "overwrite.value"), + @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200") }) }) - public void testClusterTests() { + public void testClusterTests() throws ExecutionException, InterruptedException { if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) { Assertions.assertEquals("bar", clusterInstance.config().serverProperties().get("foo")); Assertions.assertEquals("eggs", clusterInstance.config().serverProperties().get("spam")); Assertions.assertEquals("default.value", clusterInstance.config().serverProperties().get("default.key")); + + try (Admin admin = clusterInstance.createAdminClient()) { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, "0"); + Map<ConfigResource, Config> configs = admin.describeConfigs(Collections.singletonList(configResource)).all().get(); + Assertions.assertEquals(1, configs.size()); + Assertions.assertEquals("100", configs.get(configResource).get("queued.max.requests").value()); + } } else if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) { Review Comment: this can be replaced by `clusterInstance.isKRaftTest()` ########## core/src/test/java/kafka/test/ClusterTestExtensionsTest.java: ########## @@ -80,28 +84,55 @@ public void testClusterTemplate() { @ClusterTests({ @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, serverProperties = { @ClusterConfigProperty(key = "foo", value = "bar"), - @ClusterConfigProperty(key = "spam", value = "eggs") + @ClusterConfigProperty(key = "spam", value = "eggs"), + @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // this one will be ignored as there is no broker id is 86400 }), @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), - @ClusterConfigProperty(key = "default.key", value = "overwrite.value") + @ClusterConfigProperty(key = "default.key", value = "overwrite.value"), + @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200"), + @ClusterConfigProperty(id = 3000, key = "queued.max.requests", value = "300") }), @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = { @ClusterConfigProperty(key = "foo", value = "baz"), @ClusterConfigProperty(key = "spam", value = "eggz"), - @ClusterConfigProperty(key = "default.key", value = "overwrite.value") + @ClusterConfigProperty(key = "default.key", value = "overwrite.value"), + @ClusterConfigProperty(id = 0, key = "queued.max.requests", value = "200") }) }) - public void testClusterTests() { + public void testClusterTests() throws ExecutionException, InterruptedException { if (clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) { Review Comment: this can be repalced by `!clusterInstance.isKRaftTest()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org