m1a2st commented on code in PR #15840: URL: https://github.com/apache/kafka/pull/15840#discussion_r1615857895
########## core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java: ########## @@ -65,219 +75,494 @@ @ExtendWith(value = ClusterTestExtensions.class) @Tag("integration") public class ConfigCommandIntegrationTest { - AdminZkClient adminZkClient; - List<String> alterOpts; + private List<String> alterOpts; + private final String defaultBrokerId = "0"; private final ClusterInstance cluster; + private static Runnable run(Stream<String> command) { + return () -> { + try { + ConfigCommand.main(command.toArray(String[]::new)); + } catch (RuntimeException e) { + // do nothing. + } finally { + Exit.resetExitProcedure(); + } + }; + } + public ConfigCommandIntegrationTest(ClusterInstance cluster) { this.cluster = cluster; } - @ClusterTest(types = {Type.ZK, Type.KRAFT}) + @ClusterTest(types = {Type.ZK, Type.KRAFT, Type.CO_KRAFT}) public void testExitWithNonZeroStatusOnUpdatingUnallowedConfig() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( "--entity-name", cluster.isKRaftTest() ? "0" : "1", "--entity-type", "brokers", "--alter", "--add-config", "security.inter.broker.protocol=PLAINTEXT")), - errOut -> - assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut)); + errOut -> assertTrue(errOut.contains("Cannot update these configs dynamically: Set(security.inter.broker.protocol)"), errOut)); } - @ClusterTest(types = {Type.ZK}) public void testExitWithNonZeroStatusOnZkCommandAlterUserQuota() { assertNonZeroStatusExit(Stream.concat(quorumArgs(), Stream.of( "--entity-type", "users", "--entity-name", "admin", "--alter", "--add-config", "consumer_byte_rate=20000")), - errOut -> - assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut)); + errOut -> assertTrue(errOut.contains("User configuration updates using ZooKeeper are only supported for SCRAM credential updates."), errOut)); } - public static void assertNonZeroStatusExit(Stream<String> args, Consumer<String> checkErrOut) { - AtomicReference<Integer> exitStatus = new AtomicReference<>(); - Exit.setExitProcedure((status, __) -> { - exitStatus.set(status); - throw new RuntimeException(); - }); - - String errOut = captureStandardErr(() -> { - try { - ConfigCommand.main(args.toArray(String[]::new)); - } catch (RuntimeException e) { - // do nothing. - } finally { - Exit.resetExitProcedure(); - } - }); - - checkErrOut.accept(errOut); - assertNotNull(exitStatus.get()); - assertEquals(1, exitStatus.get()); - } - - private Stream<String> quorumArgs() { - return cluster.isKRaftTest() - ? Stream.of("--bootstrap-server", cluster.bootstrapServers()) - : Stream.of("--zookeeper", ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect()); - } - - public List<String> entityOp(Optional<String> brokerId) { - return brokerId.map(id -> Arrays.asList("--entity-name", id)).orElse(Collections.singletonList("--entity-default")); - } - - public void alterConfigWithZk(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId) throws Exception { - alterConfigWithZk(zkClient, configs, brokerId, Collections.emptyMap()); - } - - public void alterConfigWithZk(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId, Map<String, String> encoderConfigs) { - String configStr = Stream.of(configs.entrySet(), encoderConfigs.entrySet()) - .flatMap(Set::stream) - .map(e -> e.getKey() + "=" + e.getValue()) - .collect(Collectors.joining(",")); - ConfigCommand.ConfigCommandOptions addOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), Arrays.asList("--add-config", configStr))); - ConfigCommand.alterConfigWithZk(zkClient, addOpts, adminZkClient); - } - - void verifyConfig(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId) { - Properties entityConfigs = zkClient.getEntityConfigs("brokers", brokerId.orElse(ZooKeeperInternals.DEFAULT_STRING)); - assertEquals(configs, entityConfigs); - } - - void alterAndVerifyConfig(KafkaZkClient zkClient, Map<String, String> configs, Optional<String> brokerId) throws Exception { - alterConfigWithZk(zkClient, configs, brokerId); - verifyConfig(zkClient, configs, brokerId); - } + @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}) + public void testNullStatusOnKraftCommandAlterUserQuota() { + Stream<String> command = Stream.concat(quorumArgs(), Stream.of( + "--entity-type", "users", + "--entity-name", "admin", + "--alter", "--add-config", "consumer_byte_rate=20000")); + String message = captureStandardMsg(run(command)); - void deleteAndVerifyConfig(KafkaZkClient zkClient, Set<String> configNames, Optional<String> brokerId) { - ConfigCommand.ConfigCommandOptions deleteOpts = new ConfigCommand.ConfigCommandOptions(toArray(alterOpts, entityOp(brokerId), Arrays.asList("--delete-config", String.join(",", configNames)))); - ConfigCommand.alterConfigWithZk(zkClient, deleteOpts, adminZkClient); - verifyConfig(zkClient, Collections.emptyMap(), brokerId); + assertTrue(StringUtils.isBlank(message), message); } - @ClusterTest(types = {Type.ZK}) + @ClusterTest(types = Type.ZK) public void testDynamicBrokerConfigUpdateUsingZooKeeper() throws Exception { cluster.shutdownBroker(0); String zkConnect = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkConnect(); KafkaZkClient zkClient = ((ZkClusterInvocationContext.ZkClusterInstance) cluster).getUnderlying().zkClient(); String brokerId = "1"; - adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); - alterOpts = Arrays.asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); + AdminZkClient adminZkClient = new AdminZkClient(zkClient, scala.None$.empty()); + alterOpts = asList("--zookeeper", zkConnect, "--entity-type", "brokers", "--alter"); // Add config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "110000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "120000"), Optional.empty()); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("message.max.bytes", "110000")); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), + singletonMap("message.max.bytes", "120000")); // Change config - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "130000"), Optional.of(brokerId)); - alterAndVerifyConfig(zkClient, Collections.singletonMap("message.max.size", "140000"), Optional.empty()); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("message.max.bytes", "130000")); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), + singletonMap("message.max.bytes", "140000")); // Delete config - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.of(brokerId)); - deleteAndVerifyConfig(zkClient, Collections.singleton("message.max.size"), Optional.empty()); + deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singleton("message.max.bytes")); + deleteAndVerifyConfig(zkClient, adminZkClient, Optional.empty(), + singleton("message.max.bytes")); // Listener configs: should work only with listener name - alterAndVerifyConfig(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId)); + alterAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks")); assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("ssl.keystore.location", "/tmp/test.jks"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("ssl.keystore.location", "/tmp/test.jks"))); // Per-broker config configured at default cluster-level should fail assertThrows(ConfigException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.location", "/tmp/test.jks"), Optional.empty())); - deleteAndVerifyConfig(zkClient, Collections.singleton("listener.name.external.ssl.keystore.location"), Optional.of(brokerId)); + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.empty(), + singletonMap("listener.name.internal.ssl.keystore.location", "/tmp/test.jks"))); + deleteAndVerifyConfig(zkClient, adminZkClient, Optional.of(brokerId), + singleton("listener.name.internal.ssl.keystore.location")); // Password config update without encoder secret should fail assertThrows(IllegalArgumentException.class, - () -> alterConfigWithZk(zkClient, Collections.singletonMap("listener.name.external.ssl.keystore.password", "secret"), Optional.of(brokerId))); + () -> alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), + singletonMap("listener.name.internal.ssl.keystore.password", "secret"))); // Password config update with encoder secret should succeed and encoded password must be stored in ZK Map<String, String> configs = new HashMap<>(); - configs.put("listener.name.external.ssl.keystore.password", "secret"); + configs.put("listener.name.internal.ssl.keystore.password", "secret"); configs.put("log.cleaner.threads", "2"); - Map<String, String> encoderConfigs = Collections.singletonMap(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); - alterConfigWithZk(zkClient, configs, Optional.of(brokerId), encoderConfigs); + Map<String, String> encoderConfigs = new HashMap<>(configs); + encoderConfigs.put(PASSWORD_ENCODER_SECRET_CONFIG, "encoder-secret"); + alterConfigWithZk(zkClient, adminZkClient, Optional.of(brokerId), encoderConfigs); Properties brokerConfigs = zkClient.getEntityConfigs("brokers", brokerId); - assertFalse(brokerConfigs.contains(PasswordEncoderConfigs.PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); + assertFalse(brokerConfigs.contains(PASSWORD_ENCODER_SECRET_CONFIG), "Encoder secret stored in ZooKeeper"); assertEquals("2", brokerConfigs.getProperty("log.cleaner.threads")); // not encoded - String encodedPassword = brokerConfigs.getProperty("listener.name.external.ssl.keystore.password"); + String encodedPassword = brokerConfigs.getProperty("listener.name.internal.ssl.keystore.password"); PasswordEncoder passwordEncoder = ConfigCommand.createPasswordEncoder(JavaConverters.mapAsScalaMap(encoderConfigs)); Review Comment: Sorry, I don't understand why convert Java type to Scala is weird. If I change the input type in `ConfigCommand.scala` `createPasswordEncoder`, I should use `java.util.Map` in scala code, that isn't strange? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org