chia7712 commented on code in PR #15840:
URL: https://github.com/apache/kafka/pull/15840#discussion_r1591747966


##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -172,72 +138,210 @@ public void 
testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
 
         String brokerId = "1";
         adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
-        alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
+        alterOpts = asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
 
         // Add config
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId));
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "120000"), Optional.empty());
+        alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "110000"));
+        alterAndVerifyConfig(zkClient, Optional.empty(), 
singletonMap("message.max.size", "120000"));
 
         // Change config
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId));
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "140000"), Optional.empty());
+        alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "130000"));
+        alterAndVerifyConfig(zkClient, Optional.empty(), 
singletonMap("message.max.size", "140000"));
 
         // Delete config
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.of(brokerId));
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.empty());
+        deleteAndVerifyConfig(zkClient, Optional.of(brokerId), 
singleton("message.max.size"));
+        deleteAndVerifyConfig(zkClient, Optional.empty(), 
singleton("message.max.size"));
 
         // Listener configs: should work only with listener name
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.of(brokerId));
+        alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"));
         assertThrows(ConfigException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), 
Optional.of(brokerId)));
+                () -> alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("ssl.keystore.location", "/tmp/test.jks")));
 
         // Per-broker config configured at default cluster-level should fail
         assertThrows(ConfigException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.empty()));
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("listener.name.external.ssl.keystore.location"), 
Optional.of(brokerId));
+                () -> alterConfigWithZk(zkClient, Optional.empty(), 
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")));
+        deleteAndVerifyConfig(zkClient, Optional.of(brokerId), 
singleton("listener.name.external.ssl.keystore.location"));
 
         // Password config update without encoder secret should fail
         assertThrows(IllegalArgumentException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.password", 
"secret"), Optional.of(brokerId)));
+                () -> alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
 
         // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
         Map<String, String> configs = new HashMap<>();
         configs.put("listener.name.external.ssl.keystore.password", "secret");
         configs.put("log.cleaner.threads", "2");
-        Map<String, String> encoderConfigs = 
Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-        alterConfigWithZk(zkClient, configs, Optional.of(brokerId), 
encoderConfigs);
+        Map<String, String> encoderConfigs = 
singletonMap(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+        alterConfigWithZk(zkClient, Optional.of(brokerId), configs, 
encoderConfigs);
         Properties brokerConfigs = zkClient.getEntityConfigs("brokers", 
brokerId);
-        
assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
 "Encoder secret stored in ZooKeeper");
+        assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), 
"Encoder secret stored in ZooKeeper");
         assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); 
// not encoded
         String encodedPassword = 
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
         PasswordEncoder passwordEncoder = 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs));
         assertEquals("secret", 
passwordEncoder.decode(encodedPassword).value());
         assertEquals(configs.size(), brokerConfigs.size());
 
         // Password config update with overrides for encoder parameters
-        Map<String, String> configs2 = 
Collections.singletonMap("listener.name.internal.ssl.keystore.password", 
"secret2");
-        Map<String, String> encoderConfigs2 = new HashMap<>();
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
 "DES/CBC/PKCS5Padding");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, 
"1024");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG,
 "PBKDF2WithHmacSHA1");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, 
"64");
-        alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), 
encoderConfigs2);
+        Map<String, String> configs2 = 
singletonMap("listener.name.internal.ssl.keystore.password", "secret2");
+        Map<String, String> encoderConfigs2 = generateEncodeConfig();
+        alterConfigWithZk(zkClient, Optional.of(brokerId), configs2, 
encoderConfigs2);
         Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", 
brokerId);
         String encodedPassword2 = 
brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password");
         assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value());
         assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value());
 
         // Password config update at default cluster-level should fail
-        assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, 
configs, Optional.empty(), encoderConfigs));
+        assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, 
Optional.empty(), configs, encoderConfigs));
 
         // Dynamic config updates using ZK should fail if broker is running.
         registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
-        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", 
"210000"), Optional.of(brokerId)));
-        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", 
"220000"), Optional.empty()));
+        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "210000")));
+        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Optional.empty(), singletonMap("message.max.size", 
"220000")));
 
         // Dynamic config updates using ZK should for a different broker that 
is not running should succeed
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "230000"), Optional.of("2"));
+        alterAndVerifyConfig(zkClient, Optional.of("2"), 
singletonMap("message.max.size", "230000"));
+    }
+
+    private void alterAndVerifyConfig(KafkaZkClient zkClient, Optional<String> 
brokerId, Map<String, String>... configs) {

Review Comment:
   We should avoid using variages for collection types. The callers should 
merge those collections into single one.



##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -172,72 +138,210 @@ public void 
testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
 
         String brokerId = "1";
         adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
-        alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
+        alterOpts = asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
 
         // Add config
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId));
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "120000"), Optional.empty());
+        alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "110000"));
+        alterAndVerifyConfig(zkClient, Optional.empty(), 
singletonMap("message.max.size", "120000"));
 
         // Change config
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId));
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "140000"), Optional.empty());
+        alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "130000"));
+        alterAndVerifyConfig(zkClient, Optional.empty(), 
singletonMap("message.max.size", "140000"));
 
         // Delete config
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.of(brokerId));
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.empty());
+        deleteAndVerifyConfig(zkClient, Optional.of(brokerId), 
singleton("message.max.size"));
+        deleteAndVerifyConfig(zkClient, Optional.empty(), 
singleton("message.max.size"));
 
         // Listener configs: should work only with listener name
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.of(brokerId));
+        alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"));
         assertThrows(ConfigException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), 
Optional.of(brokerId)));
+                () -> alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("ssl.keystore.location", "/tmp/test.jks")));
 
         // Per-broker config configured at default cluster-level should fail
         assertThrows(ConfigException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.empty()));
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("listener.name.external.ssl.keystore.location"), 
Optional.of(brokerId));
+                () -> alterConfigWithZk(zkClient, Optional.empty(), 
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")));
+        deleteAndVerifyConfig(zkClient, Optional.of(brokerId), 
singleton("listener.name.external.ssl.keystore.location"));
 
         // Password config update without encoder secret should fail
         assertThrows(IllegalArgumentException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.password", 
"secret"), Optional.of(brokerId)));
+                () -> alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
 
         // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
         Map<String, String> configs = new HashMap<>();
         configs.put("listener.name.external.ssl.keystore.password", "secret");
         configs.put("log.cleaner.threads", "2");
-        Map<String, String> encoderConfigs = 
Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-        alterConfigWithZk(zkClient, configs, Optional.of(brokerId), 
encoderConfigs);
+        Map<String, String> encoderConfigs = 
singletonMap(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+        alterConfigWithZk(zkClient, Optional.of(brokerId), configs, 
encoderConfigs);
         Properties brokerConfigs = zkClient.getEntityConfigs("brokers", 
brokerId);
-        
assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
 "Encoder secret stored in ZooKeeper");
+        assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), 
"Encoder secret stored in ZooKeeper");
         assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); 
// not encoded
         String encodedPassword = 
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
         PasswordEncoder passwordEncoder = 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs));
         assertEquals("secret", 
passwordEncoder.decode(encodedPassword).value());
         assertEquals(configs.size(), brokerConfigs.size());
 
         // Password config update with overrides for encoder parameters
-        Map<String, String> configs2 = 
Collections.singletonMap("listener.name.internal.ssl.keystore.password", 
"secret2");
-        Map<String, String> encoderConfigs2 = new HashMap<>();
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
 "DES/CBC/PKCS5Padding");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, 
"1024");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG,
 "PBKDF2WithHmacSHA1");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, 
"64");
-        alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), 
encoderConfigs2);
+        Map<String, String> configs2 = 
singletonMap("listener.name.internal.ssl.keystore.password", "secret2");
+        Map<String, String> encoderConfigs2 = generateEncodeConfig();
+        alterConfigWithZk(zkClient, Optional.of(brokerId), configs2, 
encoderConfigs2);
         Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", 
brokerId);
         String encodedPassword2 = 
brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password");
         assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value());
         assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value());
 
         // Password config update at default cluster-level should fail
-        assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, 
configs, Optional.empty(), encoderConfigs));
+        assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, 
Optional.empty(), configs, encoderConfigs));
 
         // Dynamic config updates using ZK should fail if broker is running.
         registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
-        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", 
"210000"), Optional.of(brokerId)));
-        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", 
"220000"), Optional.empty()));
+        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "210000")));
+        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Optional.empty(), singletonMap("message.max.size", 
"220000")));
 
         // Dynamic config updates using ZK should for a different broker that 
is not running should succeed
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "230000"), Optional.of("2"));
+        alterAndVerifyConfig(zkClient, Optional.of("2"), 
singletonMap("message.max.size", "230000"));
+    }
+
+    private void alterAndVerifyConfig(KafkaZkClient zkClient, Optional<String> 
brokerId, Map<String, String>... configs) {
+        alterConfigWithZk(zkClient, brokerId, configs);
+        verifyConfig(zkClient, brokerId, configs);
+    }
+
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT),
+            @ClusterTest(clusterType = Type.CO_KRAFT)
+    })
+    public void testDynamicBrokerConfigUpdateUsingKraft() {
+        alterOpts = generateKraftAlterOpts();
+        Admin client = cluster.createAdminClient();

Review Comment:
   Could you use `try-resource` to make sure `admin` get closed?



##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -283,8 +436,26 @@ private static String captureStandardStream(boolean isErr, 
Runnable runnable) {
                 System.setErr(currentStream);
             else
                 System.setOut(currentStream);
+        }
+    }
+
+    private static class BrokerCommand implements Runnable {

Review Comment:
   This can be simplified to a lambda function. For example:
   ```java
       private static Runnable run(Stream<String> command) {
           return () -> {
               try {
                   ConfigCommand.main(command.toArray(String[]::new));
               } catch (RuntimeException e) {
                   // do nothing.
               } finally {
                   Exit.resetExitProcedure();
               }
           };
       }
   ```



##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -248,26 +352,75 @@ private void registerBrokerInZk(KafkaZkClient zkClient, 
int id) {
         zkClient.registerBroker(brokerInfo);
     }
 
+    private List<String> generateKraftAlterOpts() {
+        return asList("--bootstrap-server", cluster.bootstrapServers(),
+            "--entity-type", "brokers",
+            "--entity-name", "0",
+            "--alter");
+    }
+
+    private void alterAndVerifyConfig(Admin client, Optional<String> brokerId, 
Map<String, String>... configs) {
+        alterConfigWithKraft(client, brokerId, configs);
+        verifyConfig(client, brokerId, configs);
+    }
+
+    private void alterConfigWithKraft(Admin client, Optional<String> brokerId, 
Map<String, String>... configs) {
+        String configStr = transferConfigMapsToString(configs);
+        ConfigCommand.ConfigCommandOptions addOpts =
+                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId), asList("--add-config", configStr)));
+        ConfigCommand.alterConfig(client, addOpts);
+    }
+
+    private void verifyConfig(Admin client, Optional<String> brokerId, 
Map<String, String>[] configs) throws RuntimeException {
+        ConfigResource configResource = new 
ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId));
+        client.describeConfigs(singletonList(configResource))
+                .all()
+                .whenComplete((c, e) -> {
+                    if (e != null) {
+                        throw new RuntimeException(e);
+                    }
+                    for (Map<String, String> config : configs) {
+                        assertEquals(config, c.get(configResource).entries());
+                    }
+                });
+    }
+
+    private void deleteAndVerifyConfig(Admin client, Optional<String> 
brokerId, Set<String> singleton) {
+        ConfigCommand.ConfigCommandOptions deleteOpts =
+                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId),
+                        asList("--delete-config", String.join(",", 
singleton))));
+        ConfigCommand.alterConfig(client, deleteOpts);
+        verifyConfig(client, brokerId, new Map[]{Collections.emptyMap()});
+    }
+
     @SafeVarargs
-    static <T> Seq<T> seq(T...seq) {
-        return seq(Arrays.asList(seq));
+    static <T> Seq<T> seq(T... seq) {

Review Comment:
   It seems we need this function to pass single `endpoint` to create  
`Broker`. Maybe we can add a new constructor to `Broker` to accept single 
`endpoint`. WDYT?



##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -172,72 +138,210 @@ public void 
testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
 
         String brokerId = "1";
         adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
-        alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
+        alterOpts = asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
 
         // Add config
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId));
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "120000"), Optional.empty());
+        alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "110000"));
+        alterAndVerifyConfig(zkClient, Optional.empty(), 
singletonMap("message.max.size", "120000"));
 
         // Change config
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId));
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "140000"), Optional.empty());
+        alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "130000"));
+        alterAndVerifyConfig(zkClient, Optional.empty(), 
singletonMap("message.max.size", "140000"));
 
         // Delete config
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.of(brokerId));
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.empty());
+        deleteAndVerifyConfig(zkClient, Optional.of(brokerId), 
singleton("message.max.size"));
+        deleteAndVerifyConfig(zkClient, Optional.empty(), 
singleton("message.max.size"));
 
         // Listener configs: should work only with listener name
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.of(brokerId));
+        alterAndVerifyConfig(zkClient, Optional.of(brokerId), 
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"));
         assertThrows(ConfigException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), 
Optional.of(brokerId)));
+                () -> alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("ssl.keystore.location", "/tmp/test.jks")));
 
         // Per-broker config configured at default cluster-level should fail
         assertThrows(ConfigException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.empty()));
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("listener.name.external.ssl.keystore.location"), 
Optional.of(brokerId));
+                () -> alterConfigWithZk(zkClient, Optional.empty(), 
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")));
+        deleteAndVerifyConfig(zkClient, Optional.of(brokerId), 
singleton("listener.name.external.ssl.keystore.location"));
 
         // Password config update without encoder secret should fail
         assertThrows(IllegalArgumentException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.password", 
"secret"), Optional.of(brokerId)));
+                () -> alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
 
         // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
         Map<String, String> configs = new HashMap<>();
         configs.put("listener.name.external.ssl.keystore.password", "secret");
         configs.put("log.cleaner.threads", "2");
-        Map<String, String> encoderConfigs = 
Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-        alterConfigWithZk(zkClient, configs, Optional.of(brokerId), 
encoderConfigs);
+        Map<String, String> encoderConfigs = 
singletonMap(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+        alterConfigWithZk(zkClient, Optional.of(brokerId), configs, 
encoderConfigs);
         Properties brokerConfigs = zkClient.getEntityConfigs("brokers", 
brokerId);
-        
assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
 "Encoder secret stored in ZooKeeper");
+        assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), 
"Encoder secret stored in ZooKeeper");
         assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); 
// not encoded
         String encodedPassword = 
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
         PasswordEncoder passwordEncoder = 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs));
         assertEquals("secret", 
passwordEncoder.decode(encodedPassword).value());
         assertEquals(configs.size(), brokerConfigs.size());
 
         // Password config update with overrides for encoder parameters
-        Map<String, String> configs2 = 
Collections.singletonMap("listener.name.internal.ssl.keystore.password", 
"secret2");
-        Map<String, String> encoderConfigs2 = new HashMap<>();
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
 "DES/CBC/PKCS5Padding");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, 
"1024");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG,
 "PBKDF2WithHmacSHA1");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, 
"64");
-        alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), 
encoderConfigs2);
+        Map<String, String> configs2 = 
singletonMap("listener.name.internal.ssl.keystore.password", "secret2");
+        Map<String, String> encoderConfigs2 = generateEncodeConfig();
+        alterConfigWithZk(zkClient, Optional.of(brokerId), configs2, 
encoderConfigs2);
         Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", 
brokerId);
         String encodedPassword2 = 
brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password");
         assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value());
         assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value());
 
         // Password config update at default cluster-level should fail
-        assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, 
configs, Optional.empty(), encoderConfigs));
+        assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, 
Optional.empty(), configs, encoderConfigs));
 
         // Dynamic config updates using ZK should fail if broker is running.
         registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
-        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", 
"210000"), Optional.of(brokerId)));
-        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", 
"220000"), Optional.empty()));
+        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Optional.of(brokerId), 
singletonMap("message.max.size", "210000")));
+        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Optional.empty(), singletonMap("message.max.size", 
"220000")));
 
         // Dynamic config updates using ZK should for a different broker that 
is not running should succeed
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "230000"), Optional.of("2"));
+        alterAndVerifyConfig(zkClient, Optional.of("2"), 
singletonMap("message.max.size", "230000"));
+    }
+
+    private void alterAndVerifyConfig(KafkaZkClient zkClient, Optional<String> 
brokerId, Map<String, String>... configs) {
+        alterConfigWithZk(zkClient, brokerId, configs);
+        verifyConfig(zkClient, brokerId, configs);
+    }
+
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT),
+            @ClusterTest(clusterType = Type.CO_KRAFT)
+    })
+    public void testDynamicBrokerConfigUpdateUsingKraft() {
+        alterOpts = generateKraftAlterOpts();
+        Admin client = cluster.createAdminClient();
+
+        // Add config
+        alterAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singletonMap("message.max.size", "110000"));
+        alterAndVerifyConfig(client, Optional.empty(), 
singletonMap("message.max.size", "120000"));
+
+        // Change config
+        alterAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singletonMap("message.max.size", "130000"));
+        alterAndVerifyConfig(client, Optional.empty(), 
singletonMap("message.max.size", "140000"));
+
+        // Delete config
+        deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singleton("message.max.size"));
+
+        // Listener configs: should work only with listener name
+        alterAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"));
+
+        alterConfigWithKraft(client, Optional.empty(), 
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"));
+        deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singleton("listener.name.external.ssl.keystore.location"));
+        alterConfigWithKraft(client, Optional.of(defaultBrokerId), 
singletonMap("listener.name.external.ssl.keystore.password", "secret"));
+
+        // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
+        Map<String, String> configs = new HashMap<>();
+        configs.put("listener.name.external.ssl.keystore.password", "secret");
+        configs.put("log.cleaner.threads", "2");
+        Map<String, String> encoderConfigs = 
singletonMap(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+
+        // Password config update at default cluster-level should fail
+        assertThrows(ExecutionException.class, () -> 
alterConfigWithKraft(client, Optional.of(defaultBrokerId), configs, 
encoderConfigs));
+
+        client.close();
+    }
+
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT),
+            @ClusterTest(clusterType = Type.CO_KRAFT)
+    })
+    public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
+        alterOpts = generateKraftAlterOpts();
+        Admin client = cluster.createAdminClient();
+
+        assertThrows(ExecutionException.class, () -> 
alterConfigWithKraft(client, Optional.of(defaultBrokerId), 
singletonMap("auto.create.topics.enable", "false")));
+        assertThrows(ExecutionException.class, () -> 
alterConfigWithKraft(client, Optional.of(defaultBrokerId), 
singletonMap("auto.leader.rebalance.enable", "false")));
+        assertThrows(ExecutionException.class, () -> 
alterConfigWithKraft(client, Optional.of(defaultBrokerId), 
singletonMap("broker.id", "1")));
+
+        client.close();
+    }
+
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT),
+            @ClusterTest(clusterType = Type.CO_KRAFT)
+    })
+    public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful() {
+        alterOpts = generateKraftAlterOpts();
+        Admin client = cluster.createAdminClient();
+
+        alterAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singletonMap("log.flush.interval.messages", "100"));
+        alterAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singletonMap("log.retention.bytes", "20"));
+        alterAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singletonMap("log.retention.ms", "2"));
+
+        client.close();
+    }
+
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT),
+            @ClusterTest(clusterType = Type.CO_KRAFT)
+    })
+    public void testUpdatePerBrokerConfigInKRaftThenShouldSuccessful() {
+        alterOpts = generateKraftAlterOpts();
+        Admin client = cluster.createAdminClient();
+
+        alterAndVerifyConfig(client, Optional.empty(), 
singletonMap("ssl.truststore.type", "PKCS12"));
+        alterAndVerifyConfig(client, Optional.empty(), 
singletonMap("ssl.truststore.location", "/temp/test.jks"));
+        alterAndVerifyConfig(client, Optional.empty(), 
singletonMap("ssl.truststore.password", "password"));
+
+        client.close();
+    }
+
+    private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> 
checkErrOut) {
+        AtomicReference<Integer> exitStatus = new AtomicReference<>();
+        Exit.setExitProcedure((status, __) -> {
+            exitStatus.set(status);
+            throw new RuntimeException();
+        });
+
+        String errOut = captureStandardMsg(new BrokerCommand(args));
+
+        checkErrOut.accept(errOut);
+        assertNotNull(exitStatus.get());
+        assertEquals(1, exitStatus.get());
+    }
+
+    private Stream<String> quorumArgs() {
+        return cluster.isKRaftTest()
+                ? Stream.of("--bootstrap-server", cluster.bootstrapServers())
+                : Stream.of("--zookeeper", 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkConnect());
+    }
+
+    private void verifyConfig(KafkaZkClient zkClient, Optional<String> 
brokerId, Map<String, String>... configs) {
+        Properties entityConfigs = zkClient.getEntityConfigs("brokers", 
brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING));
+        for (Map<String, String> config : configs) {
+            assertEquals(config, entityConfigs);
+        }
+    }
+
+    private void alterConfigWithZk(KafkaZkClient zkClient, Optional<String> 
brokerId, Map<String, String>... configs) {
+        String configStr = transferConfigMapsToString(configs);
+        ConfigCommand.ConfigCommandOptions addOpts =
+                new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, 
entityOp(brokerId), asList("--add-config", configStr)));
+        ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient);

Review Comment:
   Could we convert `adminZkClient` to a local variable here? A class-level 
`adminZkClient` is error-prone in virtue of NPE



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to