chia7712 commented on code in PR #15840: URL: https://github.com/apache/kafka/pull/15840#discussion_r1591747966
########## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ########## @@ -172,72 +138,210 @@ public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception { String brokerId = "1"; adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); - alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); + alterOpts = asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); // Add config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "120000"), Optional.empty()); + alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "110000")); + alterAndVerifyConfig(zkClient, Optional.empty(), singletonMap("message.max.size", "120000")); // Change config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "140000"), Optional.empty()); + alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "130000")); + alterAndVerifyConfig(zkClient, Optional.empty(), singletonMap("message.max.size", "140000")); // Delete config - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.of(brokerId)); - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.empty()); + deleteAndVerifyConfig(zkClient, Optional.of(brokerId), singleton("message.max.size")); + deleteAndVerifyConfig(zkClient, Optional.empty(), singleton("message.max.size")); // Listener configs: should work only with listener name - alterAndVerifyConfig(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId)); + alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")); assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("ssl.keystore.location", "/tmp/test.jks"))); // Per-broker config configured at default cluster-level should fail assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.empty())); - deleteAndVerifyConfig(zkClient, Collections.singleton("listener.name.external.ssl.keystore.location"), Optional.of(brokerId)); + () -> alterConfigWithZk(zkClient, Optional.empty(), singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"))); + deleteAndVerifyConfig(zkClient, Optional.of(brokerId), singleton("listener.name.external.ssl.keystore.location")); // Password config update without encoder secret should fail assertThrows(IllegalArgumentException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.password", "secret"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("listener.name.external.ssl.keystore.password", "secret"))); // Password config update with encoder secret should succeed and encoded password must be stored in ZK Map<String, String> configs = new HashMap<>(); configs.put("listener.name.external.ssl.keystore.password", "secret"); configs.put("log.cleaner.threads", "2"); - Map<String, String> encoderConfigs = Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - alterConfigWithZk(zkClient, configs, Optional.of(brokerId), encoderConfigs); + Map<String, String> encoderConfigs = singletonMap(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); + alterConfigWithZk(zkClient, Optional.of(brokerId), configs, encoderConfigs); Properties brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId); - assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); + assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); // not encoded String encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password"); PasswordEncoder passwordEncoder = ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)); assertEquals("secret", passwordEncoder.decode(encodedPassword).value()); assertEquals(configs.size(), brokerConfigs.size()); // Password config update with overrides for encoder parameters - Map<String, String> configs2 = Collections.singletonMap("listener.name.internal.ssl.keystore.password", "secret2"); - Map<String, String> encoderConfigs2 = new HashMap<>(); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, "DES/CBC/PKCS5Padding"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, "PBKDF2WithHmacSHA1"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64"); - alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), encoderConfigs2); + Map<String, String> configs2 = singletonMap("listener.name.internal.ssl.keystore.password", "secret2"); + Map<String, String> encoderConfigs2 = generateEncodeConfig(); + alterConfigWithZk(zkClient, Optional.of(brokerId), configs2, encoderConfigs2); Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId); String encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password"); assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value()); assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value()); // Password config update at default cluster-level should fail - assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, configs, Optional.empty(), encoderConfigs)); + assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, Optional.empty(), configs, encoderConfigs)); // Dynamic config updates using ZK should fail if broker is running. registerBrokerInZk(zkClient, Integer.parseInt(brokerId)); - assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "210000"), Optional.of(brokerId))); - assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "220000"), Optional.empty())); + assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "210000"))); + assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Optional.empty(), singletonMap("message.max.size", "220000"))); // Dynamic config updates using ZK should for a different broker that is not running should succeed - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "230000"), Optional.of("2")); + alterAndVerifyConfig(zkClient, Optional.of("2"), singletonMap("message.max.size", "230000")); + } + + private void alterAndVerifyConfig(KafkaZkClient zkClient, Optional<String> brokerId, Map<String, String>... configs) { Review Comment: We should avoid using variages for collection types. The callers should merge those collections into single one. ########## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ########## @@ -172,72 +138,210 @@ public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception { String brokerId = "1"; adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); - alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); + alterOpts = asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); // Add config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "120000"), Optional.empty()); + alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "110000")); + alterAndVerifyConfig(zkClient, Optional.empty(), singletonMap("message.max.size", "120000")); // Change config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "140000"), Optional.empty()); + alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "130000")); + alterAndVerifyConfig(zkClient, Optional.empty(), singletonMap("message.max.size", "140000")); // Delete config - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.of(brokerId)); - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.empty()); + deleteAndVerifyConfig(zkClient, Optional.of(brokerId), singleton("message.max.size")); + deleteAndVerifyConfig(zkClient, Optional.empty(), singleton("message.max.size")); // Listener configs: should work only with listener name - alterAndVerifyConfig(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId)); + alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")); assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("ssl.keystore.location", "/tmp/test.jks"))); // Per-broker config configured at default cluster-level should fail assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.empty())); - deleteAndVerifyConfig(zkClient, Collections.singleton("listener.name.external.ssl.keystore.location"), Optional.of(brokerId)); + () -> alterConfigWithZk(zkClient, Optional.empty(), singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"))); + deleteAndVerifyConfig(zkClient, Optional.of(brokerId), singleton("listener.name.external.ssl.keystore.location")); // Password config update without encoder secret should fail assertThrows(IllegalArgumentException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.password", "secret"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("listener.name.external.ssl.keystore.password", "secret"))); // Password config update with encoder secret should succeed and encoded password must be stored in ZK Map<String, String> configs = new HashMap<>(); configs.put("listener.name.external.ssl.keystore.password", "secret"); configs.put("log.cleaner.threads", "2"); - Map<String, String> encoderConfigs = Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - alterConfigWithZk(zkClient, configs, Optional.of(brokerId), encoderConfigs); + Map<String, String> encoderConfigs = singletonMap(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); + alterConfigWithZk(zkClient, Optional.of(brokerId), configs, encoderConfigs); Properties brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId); - assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); + assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); // not encoded String encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password"); PasswordEncoder passwordEncoder = ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)); assertEquals("secret", passwordEncoder.decode(encodedPassword).value()); assertEquals(configs.size(), brokerConfigs.size()); // Password config update with overrides for encoder parameters - Map<String, String> configs2 = Collections.singletonMap("listener.name.internal.ssl.keystore.password", "secret2"); - Map<String, String> encoderConfigs2 = new HashMap<>(); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, "DES/CBC/PKCS5Padding"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, "PBKDF2WithHmacSHA1"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64"); - alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), encoderConfigs2); + Map<String, String> configs2 = singletonMap("listener.name.internal.ssl.keystore.password", "secret2"); + Map<String, String> encoderConfigs2 = generateEncodeConfig(); + alterConfigWithZk(zkClient, Optional.of(brokerId), configs2, encoderConfigs2); Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId); String encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password"); assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value()); assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value()); // Password config update at default cluster-level should fail - assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, configs, Optional.empty(), encoderConfigs)); + assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, Optional.empty(), configs, encoderConfigs)); // Dynamic config updates using ZK should fail if broker is running. registerBrokerInZk(zkClient, Integer.parseInt(brokerId)); - assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "210000"), Optional.of(brokerId))); - assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "220000"), Optional.empty())); + assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "210000"))); + assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Optional.empty(), singletonMap("message.max.size", "220000"))); // Dynamic config updates using ZK should for a different broker that is not running should succeed - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "230000"), Optional.of("2")); + alterAndVerifyConfig(zkClient, Optional.of("2"), singletonMap("message.max.size", "230000")); + } + + private void alterAndVerifyConfig(KafkaZkClient zkClient, Optional<String> brokerId, Map<String, String>... configs) { + alterConfigWithZk(zkClient, brokerId, configs); + verifyConfig(zkClient, brokerId, configs); + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT), + @ClusterTest(clusterType = Type.CO_KRAFT) + }) + public void testDynamicBrokerConfigUpdateUsingKraft() { + alterOpts = generateKraftAlterOpts(); + Admin client = cluster.createAdminClient(); Review Comment: Could you use `try-resource` to make sure `admin` get closed? ########## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ########## @@ -283,8 +436,26 @@ private static String captureStandardStream(boolean isErr, Runnable runnable) { System.setErr(currentStream); else System.setOut(currentStream); + } + } + + private static class BrokerCommand implements Runnable { Review Comment: This can be simplified to a lambda function. For example: ```java private static Runnable run(Stream<String> command) { return () -> { try { ConfigCommand.main(command.toArray(String[]::new)); } catch (RuntimeException e) { // do nothing. } finally { Exit.resetExitProcedure(); } }; } ``` ########## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ########## @@ -248,26 +352,75 @@ private void registerBrokerInZk(KafkaZkClient zkClient, int id) { zkClient.registerBroker(brokerInfo); } + private List<String> generateKraftAlterOpts() { + return asList("--bootstrap-server", cluster.bootstrapServers(), + "--entity-type", "brokers", + "--entity-name", "0", + "--alter"); + } + + private void alterAndVerifyConfig(Admin client, Optional<String> brokerId, Map<String, String>... configs) { + alterConfigWithKraft(client, brokerId, configs); + verifyConfig(client, brokerId, configs); + } + + private void alterConfigWithKraft(Admin client, Optional<String> brokerId, Map<String, String>... configs) { + String configStr = transferConfigMapsToString(configs); + ConfigCommand.ConfigCommandOptions addOpts = + new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), asList("--add-config", configStr))); + ConfigCommand.alterConfig(client, addOpts); + } + + private void verifyConfig(Admin client, Optional<String> brokerId, Map<String, String>[] configs) throws RuntimeException { + ConfigResource configResource = new ConfigResource(ConfigResource.Type.BROKER, brokerId.orElse(defaultBrokerId)); + client.describeConfigs(singletonList(configResource)) + .all() + .whenComplete((c, e) -> { + if (e != null) { + throw new RuntimeException(e); + } + for (Map<String, String> config : configs) { + assertEquals(config, c.get(configResource).entries()); + } + }); + } + + private void deleteAndVerifyConfig(Admin client, Optional<String> brokerId, Set<String> singleton) { + ConfigCommand.ConfigCommandOptions deleteOpts = + new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), + asList("--delete-config", String.join(",", singleton)))); + ConfigCommand.alterConfig(client, deleteOpts); + verifyConfig(client, brokerId, new Map[]{Collections.emptyMap()}); + } + @SafeVarargs - static <T> Seq<T> seq(T...seq) { - return seq(Arrays.asList(seq)); + static <T> Seq<T> seq(T... seq) { Review Comment: It seems we need this function to pass single `endpoint` to create `Broker`. Maybe we can add a new constructor to `Broker` to accept single `endpoint`. WDYT? ########## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ########## @@ -172,72 +138,210 @@ public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception { String brokerId = "1"; adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); - alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); + alterOpts = asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); // Add config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "120000"), Optional.empty()); + alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "110000")); + alterAndVerifyConfig(zkClient, Optional.empty(), singletonMap("message.max.size", "120000")); // Change config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "140000"), Optional.empty()); + alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "130000")); + alterAndVerifyConfig(zkClient, Optional.empty(), singletonMap("message.max.size", "140000")); // Delete config - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.of(brokerId)); - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.empty()); + deleteAndVerifyConfig(zkClient, Optional.of(brokerId), singleton("message.max.size")); + deleteAndVerifyConfig(zkClient, Optional.empty(), singleton("message.max.size")); // Listener configs: should work only with listener name - alterAndVerifyConfig(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId)); + alterAndVerifyConfig(zkClient, Optional.of(brokerId), singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")); assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("ssl.keystore.location", "/tmp/test.jks"))); // Per-broker config configured at default cluster-level should fail assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.empty())); - deleteAndVerifyConfig(zkClient, Collections.singleton("listener.name.external.ssl.keystore.location"), Optional.of(brokerId)); + () -> alterConfigWithZk(zkClient, Optional.empty(), singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"))); + deleteAndVerifyConfig(zkClient, Optional.of(brokerId), singleton("listener.name.external.ssl.keystore.location")); // Password config update without encoder secret should fail assertThrows(IllegalArgumentException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.password", "secret"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("listener.name.external.ssl.keystore.password", "secret"))); // Password config update with encoder secret should succeed and encoded password must be stored in ZK Map<String, String> configs = new HashMap<>(); configs.put("listener.name.external.ssl.keystore.password", "secret"); configs.put("log.cleaner.threads", "2"); - Map<String, String> encoderConfigs = Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - alterConfigWithZk(zkClient, configs, Optional.of(brokerId), encoderConfigs); + Map<String, String> encoderConfigs = singletonMap(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); + alterConfigWithZk(zkClient, Optional.of(brokerId), configs, encoderConfigs); Properties brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId); - assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); + assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); // not encoded String encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password"); PasswordEncoder passwordEncoder = ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)); assertEquals("secret", passwordEncoder.decode(encodedPassword).value()); assertEquals(configs.size(), brokerConfigs.size()); // Password config update with overrides for encoder parameters - Map<String, String> configs2 = Collections.singletonMap("listener.name.internal.ssl.keystore.password", "secret2"); - Map<String, String> encoderConfigs2 = new HashMap<>(); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, "DES/CBC/PKCS5Padding"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, "PBKDF2WithHmacSHA1"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64"); - alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), encoderConfigs2); + Map<String, String> configs2 = singletonMap("listener.name.internal.ssl.keystore.password", "secret2"); + Map<String, String> encoderConfigs2 = generateEncodeConfig(); + alterConfigWithZk(zkClient, Optional.of(brokerId), configs2, encoderConfigs2); Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId); String encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password"); assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value()); assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value()); // Password config update at default cluster-level should fail - assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, configs, Optional.empty(), encoderConfigs)); + assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, Optional.empty(), configs, encoderConfigs)); // Dynamic config updates using ZK should fail if broker is running. registerBrokerInZk(zkClient, Integer.parseInt(brokerId)); - assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "210000"), Optional.of(brokerId))); - assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "220000"), Optional.empty())); + assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Optional.of(brokerId), singletonMap("message.max.size", "210000"))); + assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Optional.empty(), singletonMap("message.max.size", "220000"))); // Dynamic config updates using ZK should for a different broker that is not running should succeed - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "230000"), Optional.of("2")); + alterAndVerifyConfig(zkClient, Optional.of("2"), singletonMap("message.max.size", "230000")); + } + + private void alterAndVerifyConfig(KafkaZkClient zkClient, Optional<String> brokerId, Map<String, String>... configs) { + alterConfigWithZk(zkClient, brokerId, configs); + verifyConfig(zkClient, brokerId, configs); + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT), + @ClusterTest(clusterType = Type.CO_KRAFT) + }) + public void testDynamicBrokerConfigUpdateUsingKraft() { + alterOpts = generateKraftAlterOpts(); + Admin client = cluster.createAdminClient(); + + // Add config + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("message.max.size", "110000")); + alterAndVerifyConfig(client, Optional.empty(), singletonMap("message.max.size", "120000")); + + // Change config + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("message.max.size", "130000")); + alterAndVerifyConfig(client, Optional.empty(), singletonMap("message.max.size", "140000")); + + // Delete config + deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), singleton("message.max.size")); + + // Listener configs: should work only with listener name + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")); + + alterConfigWithKraft(client, Optional.empty(), singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")); + deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), singleton("listener.name.external.ssl.keystore.location")); + alterConfigWithKraft(client, Optional.of(defaultBrokerId), singletonMap("listener.name.external.ssl.keystore.password", "secret")); + + // Password config update with encoder secret should succeed and encoded password must be stored in ZK + Map<String, String> configs = new HashMap<>(); + configs.put("listener.name.external.ssl.keystore.password", "secret"); + configs.put("log.cleaner.threads", "2"); + Map<String, String> encoderConfigs = singletonMap(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); + + // Password config update at default cluster-level should fail + assertThrows(ExecutionException.class, () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), configs, encoderConfigs)); + + client.close(); + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT), + @ClusterTest(clusterType = Type.CO_KRAFT) + }) + public void testAlterReadOnlyConfigInKRaftThenShouldFail() { + alterOpts = generateKraftAlterOpts(); + Admin client = cluster.createAdminClient(); + + assertThrows(ExecutionException.class, () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), singletonMap("auto.create.topics.enable", "false"))); + assertThrows(ExecutionException.class, () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), singletonMap("auto.leader.rebalance.enable", "false"))); + assertThrows(ExecutionException.class, () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), singletonMap("broker.id", "1"))); + + client.close(); + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT), + @ClusterTest(clusterType = Type.CO_KRAFT) + }) + public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful() { + alterOpts = generateKraftAlterOpts(); + Admin client = cluster.createAdminClient(); + + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("log.flush.interval.messages", "100")); + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("log.retention.bytes", "20")); + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("log.retention.ms", "2")); + + client.close(); + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT), + @ClusterTest(clusterType = Type.CO_KRAFT) + }) + public void testUpdatePerBrokerConfigInKRaftThenShouldSuccessful() { + alterOpts = generateKraftAlterOpts(); + Admin client = cluster.createAdminClient(); + + alterAndVerifyConfig(client, Optional.empty(), singletonMap("ssl.truststore.type", "PKCS12")); + alterAndVerifyConfig(client, Optional.empty(), singletonMap("ssl.truststore.location", "/temp/test.jks")); + alterAndVerifyConfig(client, Optional.empty(), singletonMap("ssl.truststore.password", "password")); + + client.close(); + } + + private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> checkErrOut) { + AtomicReference<Integer> exitStatus = new AtomicReference<>(); + Exit.setExitProcedure((status, __) -> { + exitStatus.set(status); + throw new RuntimeException(); + }); + + String errOut = captureStandardMsg(new BrokerCommand(args)); + + checkErrOut.accept(errOut); + assertNotNull(exitStatus.get()); + assertEquals(1, exitStatus.get()); + } + + private Stream<String> quorumArgs() { + return cluster.isKRaftTest() + ? Stream.of("--bootstrap-server", cluster.bootstrapServers()) + : Stream.of("--zookeeper", ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect()); + } + + private void verifyConfig(KafkaZkClient zkClient, Optional<String> brokerId, Map<String, String>... configs) { + Properties entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING)); + for (Map<String, String> config : configs) { + assertEquals(config, entityConfigs); + } + } + + private void alterConfigWithZk(KafkaZkClient zkClient, Optional<String> brokerId, Map<String, String>... configs) { + String configStr = transferConfigMapsToString(configs); + ConfigCommand.ConfigCommandOptions addOpts = + new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), asList("--add-config", configStr))); + ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient); Review Comment: Could we convert `adminZkClient` to a local variable here? A class-level `adminZkClient` is error-prone in virtue of NPE -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org