chia7712 commented on code in PR #15840: URL: https://github.com/apache/kafka/pull/15840#discussion_r1591853150
########## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ########## @@ -171,120 +144,352 @@ public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception { KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient(); String brokerId = "1"; - adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); - alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); + AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); + alterOpts = asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); // Add config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "120000"), Optional.empty()); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("message.max.size", "110000")); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), + singletonMap("message.max.size", "120000")); // Change config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "140000"), Optional.empty()); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("message.max.size", "130000")); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), + singletonMap("message.max.size", "140000")); // Delete config - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.of(brokerId)); - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.empty()); + deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singleton("message.max.size")); + deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), + singleton("message.max.size")); // Listener configs: should work only with listener name - alterAndVerifyConfig(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId)); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")); assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("ssl.keystore.location", "/tmp/test.jks"))); // Per-broker config configured at default cluster-level should fail assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.empty())); - deleteAndVerifyConfig(zkClient, Collections.singleton("listener.name.external.ssl.keystore.location"), Optional.of(brokerId)); + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.empty(), + singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"))); + deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singleton("listener.name.external.ssl.keystore.location")); // Password config update without encoder secret should fail assertThrows(IllegalArgumentException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.password", "secret"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("listener.name.external.ssl.keystore.password", "secret"))); // Password config update with encoder secret should succeed and encoded password must be stored in ZK Map<String, String> configs = new HashMap<>(); configs.put("listener.name.external.ssl.keystore.password", "secret"); configs.put("log.cleaner.threads", "2"); - Map<String, String> encoderConfigs = Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - alterConfigWithZk(zkClient, configs, Optional.of(brokerId), encoderConfigs); + Map<String, String> encoderConfigs = new HashMap<>(configs); + encoderConfigs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); + alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), encoderConfigs); Properties brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId); - assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); + assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); // not encoded String encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password"); PasswordEncoder passwordEncoder = ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)); assertEquals("secret", passwordEncoder.decode(encodedPassword).value()); assertEquals(configs.size(), brokerConfigs.size()); // Password config update with overrides for encoder parameters - Map<String, String> configs2 = Collections.singletonMap("listener.name.internal.ssl.keystore.password", "secret2"); - Map<String, String> encoderConfigs2 = new HashMap<>(); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_CIPHER_ALGORITHM_CONFIG, "DES/CBC/PKCS5Padding"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_ITERATIONS_CONFIG, "1024"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEYFACTORY_ALGORITHM_CONFIG, "PBKDF2WithHmacSHA1"); - encoderConfigs2.put(PasswordEncoderConfigs.PASSWORD_ENCODER_KEY_LENGTH_CONFIG, "64"); - alterConfigWithZk(zkClient, configs2, Optional.of(brokerId), encoderConfigs2); + Map<String, String> encoderConfigs2 = generateEncodeConfig(); + alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), encoderConfigs2); Properties brokerConfigs2 = zkClient.getEntityConfigs("brokers", brokerId); String encodedPassword2 = brokerConfigs2.getProperty("listener.name.internal.ssl.keystore.password"); - assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value()); - assertEquals("secret2", ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value()); + assertEquals("secret2", ConfigCommand.createPasswordEncoder( + JavaConverters.mapAsScalaMap(encoderConfigs)).decode(encodedPassword2).value()); + assertEquals("secret2", ConfigCommand.createPasswordEncoder( + JavaConverters.mapAsScalaMap(encoderConfigs2)).decode(encodedPassword2).value()); // Password config update at default cluster-level should fail - assertThrows(ConfigException.class, () -> alterConfigWithZk(zkClient, configs, Optional.empty(), encoderConfigs)); + assertThrows(ConfigException.class, + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.empty(), encoderConfigs)); // Dynamic config updates using ZK should fail if broker is running. registerBrokerInZk(zkClient, Integer.parseInt(brokerId)); - assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "210000"), Optional.of(brokerId))); - assertThrows(IllegalArgumentException.class, () -> alterConfigWithZk(zkClient, Collections.singletonMap("message.max.size", "220000"), Optional.empty())); + assertThrows(IllegalArgumentException.class, + () -> alterConfigWithZk(zkClient, adminZkClient, + Optional.of(brokerId), singletonMap("message.max.size", "210000"))); + assertThrows(IllegalArgumentException.class, + () -> alterConfigWithZk(zkClient, adminZkClient, + Optional.empty(), singletonMap("message.max.size", "220000"))); // Dynamic config updates using ZK should for a different broker that is not running should succeed - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "230000"), Optional.of("2")); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of("2"), singletonMap("message.max.size", "230000")); + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT), + @ClusterTest(clusterType = Type.CO_KRAFT) + }) + public void testDynamicBrokerConfigUpdateUsingKraft() { + alterOpts = generateKraftAlterOpts(); + + try (Admin client = cluster.createAdminClient()) { + // Add config + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("message.max.size", "110000")); + alterAndVerifyConfig(client, Optional.empty(), singletonMap("message.max.size", "120000")); + + // Change config + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), singletonMap("message.max.size", "130000")); + alterAndVerifyConfig(client, Optional.empty(), singletonMap("message.max.size", "140000")); + + // Delete config + deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), singleton("message.max.size")); + + // Listener configs: should work only with listener name + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")); + alterConfigWithKraft(client, Optional.empty(), + singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks")); + deleteAndVerifyConfig(client, Optional.of(defaultBrokerId), + singleton("listener.name.external.ssl.keystore.location")); + alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("listener.name.external.ssl.keystore.password", "secret")); + + // Password config update with encoder secret should succeed and encoded password must be stored in ZK + Map<String, String> configs = new HashMap<>(); + configs.put("listener.name.external.ssl.keystore.password", "secret"); + configs.put("log.cleaner.threads", "2"); + // Password encoder configs + configs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); + + // Password config update at default cluster-level should fail + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), configs)); + } + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT), + @ClusterTest(clusterType = Type.CO_KRAFT) + }) + public void testAlterReadOnlyConfigInKRaftThenShouldFail() { + alterOpts = generateKraftAlterOpts(); + + try (Admin client = cluster.createAdminClient()) { + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("auto.create.topics.enable", "false"))); + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("auto.leader.rebalance.enable", "false"))); + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("broker.id", "1"))); + } + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT), + @ClusterTest(clusterType = Type.CO_KRAFT) + }) + public void testUpdateClusterWideConfigInKRaftThenShouldSuccessful() { + alterOpts = generateKraftAlterOpts(); + + try (Admin client = cluster.createAdminClient()) { + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap("log.flush.interval.messages", "100")); + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap("log.retention.bytes", "20")); + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap("log.retention.ms", "2")); + } + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT), + @ClusterTest(clusterType = Type.CO_KRAFT) + }) + public void testUpdatePerBrokerConfigWithListenerNameInKRaftThenShouldSuccessful() { + alterOpts = generateKraftAlterOpts(); + String listenerName = "listener.name.internal."; + + try (Admin client = cluster.createAdminClient()) { + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap(listenerName + "ssl.truststore.type", "PKCS12")); + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap(listenerName + "ssl.truststore.location", "/temp/test.jks")); + alterAndVerifyConfig(client, Optional.of(defaultBrokerId), + singletonMap(listenerName + "ssl.truststore.password", "password")); + } + } + + @ClusterTests({ + @ClusterTest(clusterType = Type.KRAFT), + @ClusterTest(clusterType = Type.CO_KRAFT) + }) + public void testUpdatePerBrokerConfigInKRaftThenShouldFail() { + alterOpts = generateKraftAlterOpts(); + + try (Admin client = cluster.createAdminClient()) { + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("ssl.truststore.type", "PKCS12"))); + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("ssl.truststore.location", "/temp/test.jks"))); + assertThrows(ExecutionException.class, + () -> alterConfigWithKraft(client, Optional.of(defaultBrokerId), + singletonMap("ssl.truststore.password", "password"))); + } + } + + private void assertNonZeroStatusExit(Stream<String> args, Consumer<String> checkErrOut) { + AtomicReference<Integer> exitStatus = new AtomicReference<>(); + Exit.setExitProcedure((status, __) -> { + exitStatus.set(status); + throw new RuntimeException(); + }); + + String errOut = captureStandardMsg(run(args)); + + checkErrOut.accept(errOut); + assertNotNull(exitStatus.get()); + assertEquals(1, exitStatus.get()); + } + + private Stream<String> quorumArgs() { + return cluster.isKRaftTest() Review Comment: We need to test `--bootstrap-server` Even though the cluster is zk mode -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org