chia7712 commented on code in PR #15840:
URL: https://github.com/apache/kafka/pull/15840#discussion_r1591853150


##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -171,120 +144,352 @@ public void 
testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception {
         KafkaZkClient zkClient = 
((ZkClusterInvocationContext.ZkClusterInstance) 
cluster).getUnderlying().zkClient();
 
         String brokerId = "1";
-        adminZkClient = new AdminZkClient(zkClient, scala.None$.empty());
-        alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
+        AdminZkClient adminZkClient = new AdminZkClient(zkClient, 
scala.None$.empty());
+        alterOpts = asList("--zookeeper", zkConnect, "--entity-type", 
"brokers", "--alter");
 
         // Add config
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId));
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "120000"), Optional.empty());
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+                singletonMap("message.max.size", "110000"));
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+                singletonMap("message.max.size", "120000"));
 
         // Change config
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId));
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "140000"), Optional.empty());
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+                singletonMap("message.max.size", "130000"));
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+                singletonMap("message.max.size", "140000"));
 
         // Delete config
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.of(brokerId));
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("message.max.size"), Optional.empty());
+        deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+                singleton("message.max.size"));
+        deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(),
+                singleton("message.max.size"));
 
         // Listener configs: should work only with listener name
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.of(brokerId));
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+                singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"));
         assertThrows(ConfigException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), 
Optional.of(brokerId)));
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(brokerId),
+                        singletonMap("ssl.keystore.location", 
"/tmp/test.jks")));
 
         // Per-broker config configured at default cluster-level should fail
         assertThrows(ConfigException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.location", 
"/tmp/test.jks"), Optional.empty()));
-        deleteAndVerifyConfig(zkClient, 
Collections.singleton("listener.name.external.ssl.keystore.location"), 
Optional.of(brokerId));
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.empty(),
+                        
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")));
+        deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId),
+                singleton("listener.name.external.ssl.keystore.location"));
 
         // Password config update without encoder secret should fail
         assertThrows(IllegalArgumentException.class,
-            () -> alterConfigWithZk(zkClient, 
Collections.singletonMap("listener.name.external.ssl.keystore.password", 
"secret"), Optional.of(brokerId)));
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.of(brokerId),
+                        
singletonMap("listener.name.external.ssl.keystore.password", "secret")));
 
         // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
         Map<String, String> configs = new HashMap<>();
         configs.put("listener.name.external.ssl.keystore.password", "secret");
         configs.put("log.cleaner.threads", "2");
-        Map<String, String> encoderConfigs = 
Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-        alterConfigWithZk(zkClient, configs, Optional.of(brokerId), 
encoderConfigs);
+        Map<String, String> encoderConfigs = new HashMap<>(configs);
+        encoderConfigs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+        alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), 
encoderConfigs);
         Properties brokerConfigs = zkClient.getEntityConfigs("brokers", 
brokerId);
-        
assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG),
 "Encoder secret stored in ZooKeeper");
+        assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), 
"Encoder secret stored in ZooKeeper");
         assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); 
// not encoded
         String encodedPassword = 
brokerConfigs.getProperty("listener.name.external.ssl.keystore.password");
         PasswordEncoder passwordEncoder = 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs));
         assertEquals("secret", 
passwordEncoder.decode(encodedPassword).value());
         assertEquals(configs.size(), brokerConfigs.size());
 
         // Password config update with overrides for encoder parameters
-        Map<String, String> configs2 = 
Collections.singletonMap("listener.name.internal.ssl.keystore.password", 
"secret2");
-        Map<String, String> encoderConfigs2 = new HashMap<>();
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, 
"encoder-secret");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG,
 "DES/CBC/PKCS5Padding");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, 
"1024");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG,
 "PBKDF2WithHmacSHA1");
-        
encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, 
"64");
-        alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), 
encoderConfigs2);
+        Map<String, String> encoderConfigs2 = generateEncodeConfig();
+        alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), 
encoderConfigs2);
         Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", 
brokerId);
         String encodedPassword2 = 
brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password");
-        assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value());
-        assertEquals("secret2", 
ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value());
+        assertEquals("secret2", ConfigCommand.createPasswordEncoder(
+                
JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value());
+        assertEquals("secret2", ConfigCommand.createPasswordEncoder(
+                
JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value());
 
         // Password config update at default cluster-level should fail
-        assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, 
configs, Optional.empty(), encoderConfigs));
+        assertThrows(ConfigException.class,
+                () -> alterConfigWithZk(zkClient, adminZkClient, 
Optional.empty(), encoderConfigs));
 
         // Dynamic config updates using ZK should fail if broker is running.
         registerBrokerInZk(zkClient, Integer.parseInt(brokerId));
-        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", 
"210000"), Optional.of(brokerId)));
-        assertThrows(IllegalArgumentException.class, () -> 
alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", 
"220000"), Optional.empty()));
+        assertThrows(IllegalArgumentException.class,
+                () -> alterConfigWithZk(zkClient, adminZkClient,
+                        Optional.of(brokerId), 
singletonMap("message.max.size", "210000")));
+        assertThrows(IllegalArgumentException.class,
+                () -> alterConfigWithZk(zkClient, adminZkClient,
+                        Optional.empty(), singletonMap("message.max.size", 
"220000")));
 
         // Dynamic config updates using ZK should for a different broker that 
is not running should succeed
-        alterAndVerifyConfig(zkClient, 
Collections.singletonMap("message.max.size", "230000"), Optional.of("2"));
+        alterAndVerifyConfig(zkClient, adminZkClient, Optional.of("2"), 
singletonMap("message.max.size", "230000"));
+    }
+
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT),
+            @ClusterTest(clusterType = Type.CO_KRAFT)
+    })
+    public void testDynamicBrokerConfigUpdateUsingKraft() {
+        alterOpts = generateKraftAlterOpts();
+
+        try (Admin client = cluster.createAdminClient()) {
+            // Add config
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singletonMap("message.max.size", "110000"));
+            alterAndVerifyConfig(client, Optional.empty(), 
singletonMap("message.max.size", "120000"));
+
+            // Change config
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singletonMap("message.max.size", "130000"));
+            alterAndVerifyConfig(client, Optional.empty(), 
singletonMap("message.max.size", "140000"));
+
+            // Delete config
+            deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), 
singleton("message.max.size"));
+
+            // Listener configs: should work only with listener name
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"));
+            alterConfigWithKraft(client, Optional.empty(),
+                    
singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"));
+            deleteAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singleton("listener.name.external.ssl.keystore.location"));
+            alterConfigWithKraft(client, Optional.of(defaultBrokerId),
+                    
singletonMap("listener.name.external.ssl.keystore.password", "secret"));
+
+            // Password config update with encoder secret should succeed and 
encoded password must be stored in ZK
+            Map<String, String> configs = new HashMap<>();
+            configs.put("listener.name.external.ssl.keystore.password", 
"secret");
+            configs.put("log.cleaner.threads", "2");
+            // Password encoder configs
+            configs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret");
+
+            // Password config update at default cluster-level should fail
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId), configs));
+        }
+    }
+
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT),
+            @ClusterTest(clusterType = Type.CO_KRAFT)
+    })
+    public void testAlterReadOnlyConfigInKRaftThenShouldFail() {
+        alterOpts = generateKraftAlterOpts();
+
+        try (Admin client = cluster.createAdminClient()) {
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("auto.create.topics.enable", 
"false")));
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("auto.leader.rebalance.enable", 
"false")));
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("broker.id", "1")));
+        }
+    }
+
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT),
+            @ClusterTest(clusterType = Type.CO_KRAFT)
+    })
+    public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful() {
+        alterOpts = generateKraftAlterOpts();
+
+        try (Admin client = cluster.createAdminClient()) {
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap("log.flush.interval.messages", "100"));
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap("log.retention.bytes", "20"));
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap("log.retention.ms", "2"));
+        }
+    }
+
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT),
+            @ClusterTest(clusterType = Type.CO_KRAFT)
+    })
+    public void 
testUpdatePerBrokerConfigWithListenerNameInKRaftThenShouldSuccessful() {
+        alterOpts = generateKraftAlterOpts();
+        String listenerName = "listener.name.internal.";
+
+        try (Admin client = cluster.createAdminClient()) {
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap(listenerName + "ssl.truststore.type", 
"PKCS12"));
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap(listenerName + "ssl.truststore.location", 
"/temp/test.jks"));
+            alterAndVerifyConfig(client, Optional.of(defaultBrokerId),
+                    singletonMap(listenerName + "ssl.truststore.password", 
"password"));
+        }
+    }
+
+    @ClusterTests({
+            @ClusterTest(clusterType = Type.KRAFT),
+            @ClusterTest(clusterType = Type.CO_KRAFT)
+    })
+    public void testUpdatePerBrokerConfigInKRaftThenShouldFail() {
+        alterOpts = generateKraftAlterOpts();
+
+        try (Admin client = cluster.createAdminClient()) {
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("ssl.truststore.type", "PKCS12")));
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("ssl.truststore.location", 
"/temp/test.jks")));
+            assertThrows(ExecutionException.class,
+                    () -> alterConfigWithKraft(client, 
Optional.of(defaultBrokerId),
+                            singletonMap("ssl.truststore.password", 
"password")));
+        }
+    }
+
+    private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> 
checkErrOut) {
+        AtomicReference<Integer> exitStatus = new AtomicReference<>();
+        Exit.setExitProcedure((status, __) -> {
+            exitStatus.set(status);
+            throw new RuntimeException();
+        });
+
+        String errOut = captureStandardMsg(run(args));
+
+        checkErrOut.accept(errOut);
+        assertNotNull(exitStatus.get());
+        assertEquals(1, exitStatus.get());
+    }
+
+    private Stream<String> quorumArgs() {
+        return cluster.isKRaftTest()

Review Comment:
   We need to test `--bootstrap-server` Even though the cluster is zk mode



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to