nizhikov commented on code in PR #15873: URL: https://github.com/apache/kafka/pull/15873#discussion_r1594336755
########## core/src/test/java/kafka/admin/ConfigCommandUnitTest.java: ########## @@ -410,6 +448,432 @@ public void testOptionEntityTypeNames() { doTestOptionEntityTypeNames(false); } + @Test + public void shouldFailIfUnrecognisedEntityTypeUsingZookeeper() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, + "--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"}); + assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); + } + + @Test + public void shouldFailIfUnrecognisedEntityType() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", + "--entity-name", "client", "--entity-type", "not-recognised", "--alter", "--add-config", "a=b,c=d"}); + assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); + } + + @Test + public void shouldFailIfBrokerEntityTypeIsNotAnIntegerUsingZookeeper() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, + "--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"}); + assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); + } + + @Test + public void shouldFailIfBrokerEntityTypeIsNotAnInteger() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", + "--entity-name", "A", "--entity-type", "brokers", "--alter", "--add-config", "a=b,c=d"}); + assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); + } + + @Test + public void shouldFailIfShortBrokerEntityTypeIsNotAnIntegerUsingZookeeper() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, + "--broker", "A", "--alter", "--add-config", "a=b,c=d"}); + assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfigWithZk(null, createOpts, DUMMY_ADMIN_ZK_CLIENT)); + } + + @Test + public void shouldFailIfShortBrokerEntityTypeIsNotAnInteger() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", + "--broker", "A", "--alter", "--add-config", "a=b,c=d"}); + assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(new DummyAdminClient(new Node(1, "localhost", 9092)), createOpts)); + } + + @Test + public void shouldFailIfMixedEntityTypeFlagsUsingZookeeper() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, + "--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"}); + assertThrows(IllegalArgumentException.class, createOpts::checkArgs); + } + + @Test + public void shouldFailIfMixedEntityTypeFlags() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", + "--entity-name", "A", "--entity-type", "users", "--client", "B", "--describe"}); + assertThrows(IllegalArgumentException.class, createOpts::checkArgs); + } + + @Test + public void shouldFailIfInvalidHost() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", + "--entity-name", "A,B", "--entity-type", "ips", "--describe"}); + assertThrows(IllegalArgumentException.class, createOpts::checkArgs); + } + + @Test + public void shouldFailIfInvalidHostUsingZookeeper() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, + "--entity-name", "A,B", "--entity-type", "ips", "--describe"}); + assertThrows(IllegalArgumentException.class, createOpts::checkArgs); + } + + @Test + public void shouldFailIfUnresolvableHost() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--bootstrap-server", "localhost:9092", + "--entity-name", "RFC2606.invalid", "--entity-type", "ips", "--describe"}); + assertThrows(IllegalArgumentException.class, createOpts::checkArgs); + } + + @Test + public void shouldFailIfUnresolvableHostUsingZookeeper() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, + "--entity-name", "RFC2606.invalid", "--entity-type", "ips", "--describe"}); + assertThrows(IllegalArgumentException.class, createOpts::checkArgs); + } + + @Test + public void shouldAddClientConfigUsingZookeeper() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, + "--entity-name", "my-client-id", + "--entity-type", "clients", + "--alter", + "--add-config", "a=b,c=d"}); + + KafkaZkClient zkClient = mock(KafkaZkClient.class); + when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new Properties()); + + class TestAdminZkClient extends AdminZkClient { + public TestAdminZkClient(KafkaZkClient zkClient) { + super(zkClient, scala.None$.empty()); + } + + @Override + public void changeClientIdConfig(String clientId, Properties configChange) { + assertEquals("my-client-id", clientId); + assertEquals("b", configChange.get("a")); + assertEquals("d", configChange.get("c")); + } + } + + // Changing USER configs don't use `KafkaZkClient` so it safe to pass `null`. + ConfigCommand.alterConfigWithZk(null, createOpts, new TestAdminZkClient(zkClient)); + } + + @Test + public void shouldAddIpConfigsUsingZookeeper() { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(new String[]{"--zookeeper", ZK_CONNECT, + "--entity-name", "1.2.3.4", + "--entity-type", "ips", + "--alter", + "--add-config", "a=b,c=d"}); + + KafkaZkClient zkClient = mock(KafkaZkClient.class); + when(zkClient.getEntityConfigs(anyString(), anyString())).thenReturn(new Properties()); + + class TestAdminZkClient extends AdminZkClient { + public TestAdminZkClient(KafkaZkClient zkClient) { + super(zkClient, scala.None$.empty()); + } + + @Override + public void changeIpConfig(String ip, Properties configChange) { + assertEquals("1.2.3.4", ip); + assertEquals("b", configChange.get("a")); + assertEquals("d", configChange.get("c")); + } + } + + // Changing USER configs don't use `KafkaZkClient` so it safe to pass `null`. + ConfigCommand.alterConfigWithZk(null, createOpts, new TestAdminZkClient(zkClient)); + } + + private Map.Entry<List<String>, Map<String, String>> toValues(Optional<String> entityName, String entityType) { + String command; + switch (entityType) { + case ClientQuotaEntity.USER: + command = "users"; + break; + case ClientQuotaEntity.CLIENT_ID: + command = "clients"; + break; + case ClientQuotaEntity.IP: + command = "ips"; + break; + default: + throw new IllegalArgumentException("Unknown command: " + entityType); + } + + return entityName.map(name -> { + if (name.isEmpty()) + return new AbstractMap.SimpleImmutableEntry<>(Arrays.asList("--entity-type", command, "--entity-default"), Collections.singletonMap(entityType, (String) null)); + return new AbstractMap.SimpleImmutableEntry<>(Arrays.asList("--entity-type", command, "--entity-name", name), Collections.singletonMap(entityType, name)); + }).orElse(new AbstractMap.SimpleImmutableEntry<>(Collections.emptyList(), Collections.emptyMap())); + } + + private void verifyAlterCommandFails(String expectedErrorMessage, List<String> alterOpts) { + Admin mockAdminClient = mock(Admin.class); + ConfigCommand.ConfigCommandOptions opts = new ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList("--bootstrap-server", "localhost:9092", + "--alter"), alterOpts)); + IllegalArgumentException e = assertThrows(IllegalArgumentException.class, () -> ConfigCommand.alterConfig(mockAdminClient, opts)); + assertTrue(e.getMessage().contains(expectedErrorMessage), "Unexpected exception: " + e); + } + + @Test + public void shouldNotAlterNonQuotaIpConfigsUsingBootstrapServer() { + // when using --bootstrap-server, it should be illegal to alter anything that is not a connection quota + // for ip entities + List<String> ipEntityOpts = Arrays.asList("--entity-type", "ips", "--entity-name", "127.0.0.1"); + String invalidProp = "some_config"; + verifyAlterCommandFails(invalidProp, concat(ipEntityOpts, Arrays.asList("--add-config", "connection_creation_rate=10000,some_config=10"))); + verifyAlterCommandFails(invalidProp, concat(ipEntityOpts, Arrays.asList("--add-config", "some_config=10"))); + verifyAlterCommandFails(invalidProp, concat(ipEntityOpts, Arrays.asList("--delete-config", "connection_creation_rate=10000,some_config=10"))); + verifyAlterCommandFails(invalidProp, concat(ipEntityOpts, Arrays.asList("--delete-config", "some_config=10"))); + } + + private void verifyDescribeQuotas(List<String> describeArgs, ClientQuotaFilter expectedFilter) { + ConfigCommand.ConfigCommandOptions describeOpts = new ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList("--bootstrap-server", "localhost:9092", + "--describe"), describeArgs)); + KafkaFutureImpl<Map<ClientQuotaEntity, Map<String, Double>>> describeFuture = new KafkaFutureImpl<>(); + describeFuture.complete(Collections.emptyMap()); + DescribeClientQuotasResult describeResult = mock(DescribeClientQuotasResult.class); + when(describeResult.entities()).thenReturn(describeFuture); + + AtomicBoolean describedConfigs = new AtomicBoolean(); + Node node = new Node(1, "localhost", 9092); + MockAdminClient mockAdminClient = new MockAdminClient(Collections.singletonList(node), node) { + @Override + public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options) { + assertTrue(filter.strict()); + assertEquals(new HashSet<>(expectedFilter.components()), new HashSet<>(filter.components())); + describedConfigs.set(true); + return describeResult; + } + }; + ConfigCommand.describeConfig(mockAdminClient, describeOpts); + assertTrue(describedConfigs.get()); + } + + @Test + public void testDescribeIpConfigs() { + String entityType = ClientQuotaEntity.IP; + String knownHost = "1.2.3.4"; + ClientQuotaFilter defaultIpFilter = ClientQuotaFilter.containsOnly(Collections.singletonList(ClientQuotaFilterComponent.ofDefaultEntity(entityType))); + ClientQuotaFilter singleIpFilter = ClientQuotaFilter.containsOnly(Collections.singletonList(ClientQuotaFilterComponent.ofEntity(entityType, knownHost))); + ClientQuotaFilter allIpsFilter = ClientQuotaFilter.containsOnly(Collections.singletonList(ClientQuotaFilterComponent.ofEntityType(entityType))); + verifyDescribeQuotas(Arrays.asList("--entity-default", "--entity-type", "ips"), defaultIpFilter); + verifyDescribeQuotas(Collections.singletonList("--ip-defaults"), defaultIpFilter); + verifyDescribeQuotas(Arrays.asList("--entity-type", "ips", "--entity-name", knownHost), singleIpFilter); + verifyDescribeQuotas(Arrays.asList("--ip", knownHost), singleIpFilter); + verifyDescribeQuotas(Arrays.asList("--entity-type", "ips"), allIpsFilter); + } + + public void verifyAlterQuotas(List<String> alterOpts, ClientQuotaEntity expectedAlterEntity, + Map<String, Double> expectedProps, Set<ClientQuotaAlteration.Op> expectedAlterOps) { + ConfigCommand.ConfigCommandOptions createOpts = new ConfigCommand.ConfigCommandOptions(toArray(Arrays.asList("--bootstrap-server", "localhost:9092", + "--alter"), alterOpts)); + + AtomicBoolean describedConfigs = new AtomicBoolean(); + KafkaFutureImpl<Map<ClientQuotaEntity, Map<String, Double>>> describeFuture = new KafkaFutureImpl<>(); + describeFuture.complete(Collections.singletonMap(expectedAlterEntity, expectedProps)); + DescribeClientQuotasResult describeResult = mock(DescribeClientQuotasResult.class); + when(describeResult.entities()).thenReturn(describeFuture); + + Set<ClientQuotaFilterComponent> expectedFilterComponents = expectedAlterEntity.entries().entrySet().stream().map(e -> { + String entityType = e.getKey(); + String entityName = e.getValue(); + return entityName == null + ? ClientQuotaFilterComponent.ofDefaultEntity(e.getKey()) + : ClientQuotaFilterComponent.ofEntity(entityType, entityName); + }).collect(Collectors.toSet()); + + AtomicBoolean alteredConfigs = new AtomicBoolean(); + KafkaFutureImpl<Void> alterFuture = new KafkaFutureImpl<>(); + alterFuture.complete(null); + AlterClientQuotasResult alterResult = mock(AlterClientQuotasResult.class); + when(alterResult.all()).thenReturn(alterFuture); + + Node node = new Node(1, "localhost", 9092); + MockAdminClient mockAdminClient = new MockAdminClient(Collections.singletonList(node), node) { + @Override + public DescribeClientQuotasResult describeClientQuotas(ClientQuotaFilter filter, DescribeClientQuotasOptions options) { + assertTrue(filter.strict()); + assertEquals(expectedFilterComponents, new HashSet<>(filter.components())); + describedConfigs.set(true); + return describeResult; + } + + @Override + public AlterClientQuotasResult alterClientQuotas(Collection<ClientQuotaAlteration> entries, AlterClientQuotasOptions options) { + assertFalse(options.validateOnly()); + assertEquals(1, entries.size()); + ClientQuotaAlteration alteration = entries.iterator().next(); + assertEquals(expectedAlterEntity, alteration.entity()); + Collection<ClientQuotaAlteration.Op> ops = alteration.ops(); + assertEquals(expectedAlterOps, new HashSet<>(ops)); + alteredConfigs.set(true); + return alterResult; + } + }; + ConfigCommand.alterConfig(mockAdminClient, createOpts); + assertTrue(describedConfigs.get()); + assertTrue(alteredConfigs.get()); + } + + @Test + public void testAlterIpConfig() { + Map.Entry<List<String>, Map<String, String>> t = toValues(Optional.of("1.2.3.4"), ClientQuotaEntity.IP); + List<String> singleIpArgs = t.getKey(); + Map<String, String> singleIpEntry = t.getValue(); + ClientQuotaEntity singleIpEntity = new ClientQuotaEntity(singleIpEntry); + t = toValues(Optional.of(""), ClientQuotaEntity.IP); + List<String> defaultIpArgs = t.getKey(); + Map<String, String> defaultIpEntry = t.getValue(); + ClientQuotaEntity defaultIpEntity = new ClientQuotaEntity(defaultIpEntry); + + List<String> deleteArgs = Arrays.asList("--delete-config", "connection_creation_rate"); + Set<ClientQuotaAlteration.Op> deleteAlterationOps = new HashSet<>(Collections.singletonList(new ClientQuotaAlteration.Op("connection_creation_rate", null))); + Map<String, Double> propsToDelete = Collections.singletonMap("connection_creation_rate", 50.0); + + List<String> addArgs = Arrays.asList("--add-config", "connection_creation_rate=100"); + Set<ClientQuotaAlteration.Op> addAlterationOps = new HashSet<>(Collections.singletonList(new ClientQuotaAlteration.Op("connection_creation_rate", 100.0))); + + verifyAlterQuotas(concat(singleIpArgs, deleteArgs), singleIpEntity, propsToDelete, deleteAlterationOps); + verifyAlterQuotas(concat(singleIpArgs, addArgs), singleIpEntity, Collections.emptyMap(), addAlterationOps); + verifyAlterQuotas(concat(defaultIpArgs, deleteArgs), defaultIpEntity, propsToDelete, deleteAlterationOps); + verifyAlterQuotas(concat(defaultIpArgs, addArgs), defaultIpEntity, Collections.emptyMap(), addAlterationOps); + } + + @Test + public void shouldAddClientConfig() { + List<String> alterArgs = Arrays.asList("--add-config", "consumer_byte_rate=20000,producer_byte_rate=10000", + "--delete-config", "request_percentage"); + Map<String, Double> propsToDelete = Collections.singletonMap("request_percentage", 50.0); + + Set<ClientQuotaAlteration.Op> alterationOps = new HashSet<>(Arrays.asList( + new ClientQuotaAlteration.Op("consumer_byte_rate", 20000d), + new ClientQuotaAlteration.Op("producer_byte_rate", 10000d), + new ClientQuotaAlteration.Op("request_percentage", null) + )); + + BiConsumer<Optional<String>, Optional<String>> verifyAlterUserClientQuotas = (userOpt, clientOpt) -> { + Map.Entry<List<String>, Map<String, String>> t = toValues(userOpt, ClientQuotaEntity.USER); + List<String> userArgs = t.getKey(); + Map<String, String> userEntry = t.getValue(); + t = toValues(clientOpt, ClientQuotaEntity.CLIENT_ID); + List<String> clientArgs = t.getKey(); + Map<String, String> clientEntry = t.getValue(); + + List<String> commandArgs = concat(alterArgs, userArgs, clientArgs); + ClientQuotaEntity clientQuotaEntity = new ClientQuotaEntity(concat(userEntry, clientEntry)); + try { + verifyAlterQuotas(commandArgs, clientQuotaEntity, propsToDelete, alterationOps); + } catch (Exception e) { + throw new RuntimeException(e); + } + }; + + verifyAlterUserClientQuotas.accept(Optional.of("test-user-1"), Optional.of("test-client-1")); Review Comment: CI looks OK for me. Please, take a look. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org