chia7712 commented on code in PR #15715:
URL: https://github.com/apache/kafka/pull/15715#discussion_r1589861459


##########
core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java:
##########
@@ -27,6 +27,23 @@
 @Target({ElementType.ANNOTATION_TYPE})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface ClusterConfigProperty {
+    /**
+     * The config applies to the controller/broker with specified id. Default 
is -1, indicating the property applied to
+     * all controller/broker servers. Note that the "controller" here refers 
to the KRaft quorum controller.
+     * The id can vary depending on the different {@link 
kafka.test.annotation.Type}.
+     * <ul>
+     *  <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts 
from 0 and increases by 1
+     *  with each additional broker, and there is no controller server under 
this mode. </li>
+     *  <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id 
starts from 0, the controller id
+     *  starts from 3000 and increases by 1 with each addition 
broker/controller.</li>
+     *  <li> Under {@link kafka.test.annotation.Type#CO_KRAFT}, the broker id 
and controller id both start from 0
+     *  and increases by 1 with each additional broker/controller.</li>
+     * </ul>
+     *
+     * If the id doesn't correspond to any broker/controller server, the 
config will be ignored.

Review Comment:
   Maybe we should throw exception to do fast fail. WDTY?



##########
core/src/test/java/kafka/testkit/TestKitNodes.java:
##########
@@ -148,7 +151,7 @@ private int startControllerId() {
             if (combined) {
                 return startBrokerId();
             }
-            return startBrokerId() + 3000;
+            return startBrokerId() + CONTROLLER_ID_OFFSET;

Review Comment:
   Maybe we should replace `startBrokerId` by a constant variable too. 



##########
core/src/test/java/kafka/test/annotation/ClusterConfigProperty.java:
##########
@@ -27,6 +27,23 @@
 @Target({ElementType.ANNOTATION_TYPE})
 @Retention(RetentionPolicy.RUNTIME)
 public @interface ClusterConfigProperty {
+    /**
+     * The config applies to the controller/broker with specified id. Default 
is -1, indicating the property applied to
+     * all controller/broker servers. Note that the "controller" here refers 
to the KRaft quorum controller.
+     * The id can vary depending on the different {@link 
kafka.test.annotation.Type}.
+     * <ul>
+     *  <li> Under {@link kafka.test.annotation.Type#ZK}, the broker id starts 
from 0 and increases by 1
+     *  with each additional broker, and there is no controller server under 
this mode. </li>
+     *  <li> Under {@link kafka.test.annotation.Type#KRAFT}, the broker id 
starts from 0, the controller id
+     *  starts from 3000 and increases by 1 with each addition 
broker/controller.</li>

Review Comment:
   `3000` can be replaced by the `{@link }`



##########
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##########
@@ -80,28 +84,55 @@ public void testClusterTemplate() {
     @ClusterTests({
         @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "bar"),
-            @ClusterConfigProperty(key = "spam", value = "eggs")
+            @ClusterConfigProperty(key = "spam", value = "eggs"),
+            @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // 
this one will be ignored as there is no broker id is 86400
         }),
         @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
-            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value")
+            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value"),
+            @ClusterConfigProperty(id = 0, key = "queued.max.requests", value 
= "200"),
+            @ClusterConfigProperty(id = 3000, key = "queued.max.requests", 
value = "300")
         }),
         @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
-            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value")
+            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value"),
+            @ClusterConfigProperty(id = 0, key = "queued.max.requests", value 
= "200")
         })
     })
-    public void testClusterTests() {
+    public void testClusterTests() throws ExecutionException, 
InterruptedException {
         if 
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) {
             Assertions.assertEquals("bar", 
clusterInstance.config().serverProperties().get("foo"));
             Assertions.assertEquals("eggs", 
clusterInstance.config().serverProperties().get("spam"));
             Assertions.assertEquals("default.value", 
clusterInstance.config().serverProperties().get("default.key"));
+
+            try (Admin admin = clusterInstance.createAdminClient()) {
+                ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, "0");
+                Map<ConfigResource, Config> configs = 
admin.describeConfigs(Collections.singletonList(configResource)).all().get();
+                Assertions.assertEquals(1, configs.size());
+                Assertions.assertEquals("100", 
configs.get(configResource).get("queued.max.requests").value());
+            }
         } else if 
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.RAFT)) {

Review Comment:
   this can be replaced by `clusterInstance.isKRaftTest()`



##########
core/src/test/java/kafka/test/ClusterTestExtensionsTest.java:
##########
@@ -80,28 +84,55 @@ public void testClusterTemplate() {
     @ClusterTests({
         @ClusterTest(name = "cluster-tests-1", clusterType = Type.ZK, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "bar"),
-            @ClusterConfigProperty(key = "spam", value = "eggs")
+            @ClusterConfigProperty(key = "spam", value = "eggs"),
+            @ClusterConfigProperty(id = 86400, key = "baz", value = "qux"), // 
this one will be ignored as there is no broker id is 86400
         }),
         @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
-            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value")
+            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value"),
+            @ClusterConfigProperty(id = 0, key = "queued.max.requests", value 
= "200"),
+            @ClusterConfigProperty(id = 3000, key = "queued.max.requests", 
value = "300")
         }),
         @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, 
serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz"),
-            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value")
+            @ClusterConfigProperty(key = "default.key", value = 
"overwrite.value"),
+            @ClusterConfigProperty(id = 0, key = "queued.max.requests", value 
= "200")
         })
     })
-    public void testClusterTests() {
+    public void testClusterTests() throws ExecutionException, 
InterruptedException {
         if 
(clusterInstance.clusterType().equals(ClusterInstance.ClusterType.ZK)) {

Review Comment:
   this can be repalced by `!clusterInstance.isKRaftTest()`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to