dajac commented on code in PR #15304:
URL: https://github.com/apache/kafka/pull/15304#discussion_r1862489630
##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -80,6 +82,11 @@ object ConfigCommand extends Logging {
System.err.println(e.getMessage)
Exit.exit(1)
+ case e: UnsupportedVersionException =>
+ logger.debug(s"Unsupported API encountered in server when executing
config command with args '${args.mkString(" ")}'")
+ e.printStackTrace(System.err)
Review Comment:
nit: It may be better to use `System.err.println(e.getMessage)`. What do you
think?
##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -290,17 +303,125 @@ public void
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
try (Admin client = cluster.admin()) {
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG,
"PKCS12"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG,
"/temp/test.jks"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG,
"password"), alterOpts));
}
}
+ @ClusterTest
+ public void testUpdateInvalidBrokerConfigs() {
+ updateAndCheckInvalidBrokerConfig(Optional.empty());
+
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
+ ""));
+ }
+
+ private void updateAndCheckInvalidBrokerConfig(Optional<String>
brokerIdOrDefault) {
+ List<String> alterOpts =
generateDefaultAlterOpts(cluster.bootstrapServers());
+ try (Admin client = cluster.admin()) {
+ alterConfigWithAdmin(client, brokerIdOrDefault,
Collections.singletonMap("invalid", "2"), alterOpts);
+
+ Stream<String> describeCommand = Stream.concat(
+ Stream.concat(
+ Stream.of("--bootstrap-server",
cluster.bootstrapServers()),
+ Stream.of(entityOp(brokerIdOrDefault).toArray(new
String[0]))),
+ Stream.of("--entity-type", "brokers", "--describe"));
+ String describeResult = captureStandardStream(false,
run(describeCommand));
+
+ // We will treat unknown config as sensitive
+ assertTrue(describeResult.contains("sensitive=true"),
describeResult);
+ // Sensitive config will not return
+ assertTrue(describeResult.contains("invalid=null"),
describeResult);
+ }
+ }
+
+ @ClusterTest
+ public void testUpdateInvalidTopicConfigs() throws ExecutionException,
InterruptedException {
+ List<String> alterOpts = asList("--bootstrap-server",
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+ try (Admin client = cluster.admin()) {
+ client.createTopics(Collections.singletonList(new
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+ assertInstanceOf(
+ InvalidConfigurationException.class,
+ assertThrows(
+ ExecutionException.class,
+ () -> ConfigCommand.alterConfig(
+ client,
+ new ConfigCommand.ConfigCommandOptions(
+ toArray(alterOpts,
+ asList("--add-config",
"invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
+ ).getCause()
+ );
+ }
+ }
+
+ // Test case from KAFKA-13788
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size",
value = "2097154"),
+ })
+ public void testUpdateBrokerConfigNotAffectedByInvalidConfig() {
+ try (Admin client = cluster.admin()) {
+ ConfigCommand.alterConfig(client, new
ConfigCommand.ConfigCommandOptions(
+ toArray(asList("--bootstrap-server",
cluster.bootstrapServers(),
+ "--alter",
+ "--add-config", "log.cleaner.threadzz=2",
+ "--entity-type", "brokers",
+ "--entity-default"))));
+
+ ConfigCommand.alterConfig(client, new
ConfigCommand.ConfigCommandOptions(
+ toArray(asList("--bootstrap-server",
cluster.bootstrapServers(),
+ "--alter",
+ "--add-config", "log.cleaner.threads=2",
+ "--entity-type", "brokers",
+ "--entity-default"))));
+ kafka.utils.TestUtils.waitUntilTrue(
+ () ->
cluster.brokerSocketServers().stream().allMatch(broker ->
broker.config().getInt("log.cleaner.threads") == 2),
+ () -> "Timeout waiting for topic config propagating to
broker",
+ org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+ 100L);
+ }
+ }
+
+ @ClusterTest(
+ serverProperties = {@ClusterConfigProperty(key =
"log.cleaner.dedupe.buffer.size", value = "2097154")},
+ // Zk code has been removed, use kraft and mockito to mock this
situation
+ metadataVersion = MetadataVersion.IBP_3_3_IV0
+ )
+ public void testFallbackToDeprecatedAlterConfigs() throws
NoSuchMethodException, InvocationTargetException, InstantiationException,
IllegalAccessException {
Review Comment:
nit: The name is misleading because it suggests that we have a fallback
mechanism.
##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -290,17 +303,125 @@ public void
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
try (Admin client = cluster.admin()) {
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG,
"PKCS12"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG,
"/temp/test.jks"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG,
"password"), alterOpts));
}
}
+ @ClusterTest
+ public void testUpdateInvalidBrokerConfigs() {
+ updateAndCheckInvalidBrokerConfig(Optional.empty());
+
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
+ ""));
+ }
+
+ private void updateAndCheckInvalidBrokerConfig(Optional<String>
brokerIdOrDefault) {
+ List<String> alterOpts =
generateDefaultAlterOpts(cluster.bootstrapServers());
+ try (Admin client = cluster.admin()) {
+ alterConfigWithAdmin(client, brokerIdOrDefault,
Collections.singletonMap("invalid", "2"), alterOpts);
+
+ Stream<String> describeCommand = Stream.concat(
+ Stream.concat(
+ Stream.of("--bootstrap-server",
cluster.bootstrapServers()),
+ Stream.of(entityOp(brokerIdOrDefault).toArray(new
String[0]))),
+ Stream.of("--entity-type", "brokers", "--describe"));
+ String describeResult = captureStandardStream(false,
run(describeCommand));
+
+ // We will treat unknown config as sensitive
+ assertTrue(describeResult.contains("sensitive=true"),
describeResult);
+ // Sensitive config will not return
+ assertTrue(describeResult.contains("invalid=null"),
describeResult);
+ }
+ }
+
+ @ClusterTest
+ public void testUpdateInvalidTopicConfigs() throws ExecutionException,
InterruptedException {
+ List<String> alterOpts = asList("--bootstrap-server",
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+ try (Admin client = cluster.admin()) {
+ client.createTopics(Collections.singletonList(new
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+ assertInstanceOf(
+ InvalidConfigurationException.class,
+ assertThrows(
+ ExecutionException.class,
+ () -> ConfigCommand.alterConfig(
+ client,
+ new ConfigCommand.ConfigCommandOptions(
+ toArray(alterOpts,
+ asList("--add-config",
"invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
+ ).getCause()
+ );
+ }
+ }
+
+ // Test case from KAFKA-13788
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size",
value = "2097154"),
Review Comment:
nit: We usually indent with 4 spaces.
##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -290,17 +303,125 @@ public void
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
try (Admin client = cluster.admin()) {
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG,
"PKCS12"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG,
"/temp/test.jks"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG,
"password"), alterOpts));
}
}
+ @ClusterTest
+ public void testUpdateInvalidBrokerConfigs() {
+ updateAndCheckInvalidBrokerConfig(Optional.empty());
+
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
+ ""));
+ }
+
+ private void updateAndCheckInvalidBrokerConfig(Optional<String>
brokerIdOrDefault) {
+ List<String> alterOpts =
generateDefaultAlterOpts(cluster.bootstrapServers());
+ try (Admin client = cluster.admin()) {
+ alterConfigWithAdmin(client, brokerIdOrDefault,
Collections.singletonMap("invalid", "2"), alterOpts);
+
+ Stream<String> describeCommand = Stream.concat(
+ Stream.concat(
+ Stream.of("--bootstrap-server",
cluster.bootstrapServers()),
+ Stream.of(entityOp(brokerIdOrDefault).toArray(new
String[0]))),
+ Stream.of("--entity-type", "brokers", "--describe"));
+ String describeResult = captureStandardStream(false,
run(describeCommand));
+
+ // We will treat unknown config as sensitive
+ assertTrue(describeResult.contains("sensitive=true"),
describeResult);
+ // Sensitive config will not return
+ assertTrue(describeResult.contains("invalid=null"),
describeResult);
+ }
+ }
+
+ @ClusterTest
+ public void testUpdateInvalidTopicConfigs() throws ExecutionException,
InterruptedException {
+ List<String> alterOpts = asList("--bootstrap-server",
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+ try (Admin client = cluster.admin()) {
+ client.createTopics(Collections.singletonList(new
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+ assertInstanceOf(
+ InvalidConfigurationException.class,
+ assertThrows(
+ ExecutionException.class,
+ () -> ConfigCommand.alterConfig(
+ client,
+ new ConfigCommand.ConfigCommandOptions(
+ toArray(alterOpts,
+ asList("--add-config",
"invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
+ ).getCause()
+ );
+ }
+ }
+
+ // Test case from KAFKA-13788
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size",
value = "2097154"),
Review Comment:
nit: Could also put a comment explaining why we need to change this config.
It is not obvious.
##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -172,27 +178,25 @@ object ConfigCommand extends Logging {
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
entityTypeHead match {
- case ConfigType.TOPIC =>
- alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
- case ConfigType.BROKER =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
-
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
-
- val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
- val sensitiveEntries = newEntries.filter(_._2.value == null)
- if (sensitiveEntries.nonEmpty)
- throw new InvalidConfigurationException(s"All sensitive broker
config entries must be specified for --alter, missing entries:
${sensitiveEntries.keySet}")
- val newConfig = new JConfig(newEntries.asJava.values)
-
- val configResource = new ConfigResource(ConfigResource.Type.BROKER,
entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- adminClient.alterConfigs(Map(configResource -> newConfig).asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER |
ConfigType.GROUP =>
+ val configResourceType = entityTypeHead match {
+ case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+ case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
+ case ConfigType.BROKER => ConfigResource.Type.BROKER
+ case ConfigType.GROUP => ConfigResource.Type.GROUP
+ }
+ try {
+ alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, configResourceType)
+ } catch {
+ case e: ExecutionException =>
+ e.getCause match {
+ case cause: UnsupportedVersionException if entityTypeHead ==
ConfigType.BROKER =>
Review Comment:
I wonder if we could remove the `entityTypeHead == ConfigType.BROKER`
condition to cover all the types. Is there a reason not to?
##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -172,27 +178,25 @@ object ConfigCommand extends Logging {
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
entityTypeHead match {
- case ConfigType.TOPIC =>
- alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
- case ConfigType.BROKER =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
-
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
-
- val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
- val sensitiveEntries = newEntries.filter(_._2.value == null)
- if (sensitiveEntries.nonEmpty)
- throw new InvalidConfigurationException(s"All sensitive broker
config entries must be specified for --alter, missing entries:
${sensitiveEntries.keySet}")
- val newConfig = new JConfig(newEntries.asJava.values)
-
- val configResource = new ConfigResource(ConfigResource.Type.BROKER,
entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- adminClient.alterConfigs(Map(configResource -> newConfig).asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER |
ConfigType.GROUP =>
+ val configResourceType = entityTypeHead match {
+ case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+ case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
+ case ConfigType.BROKER => ConfigResource.Type.BROKER
+ case ConfigType.GROUP => ConfigResource.Type.GROUP
+ }
+ try {
+ alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, configResourceType)
+ } catch {
+ case e: ExecutionException =>
+ e.getCause match {
+ case cause: UnsupportedVersionException if entityTypeHead ==
ConfigType.BROKER =>
+ throw new UnsupportedVersionException(s"Could not update
broker config $entityNameHead, because brokers don't support api
${ApiKeys.INCREMENTAL_ALTER_CONFIGS},"
+ + " You can upgrade your brokers to version 2.3.0 or newer to
avoid this error.", cause)
Review Comment:
nit: I would say something like this: `The
${ApiKeys.INCREMENTAL_ALTER_CONFIGS} API is not supported by the cluster. The
API is supported starting from version 2.3.0. You may want to use an older
version of this tool to interact with your cluster.`. It seems better to me to
suggest to use an older tool because I suppose that the user won't jump on
upgrading his cluster.
##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -290,17 +303,125 @@ public void
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
try (Admin client = cluster.admin()) {
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG,
"PKCS12"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG,
"/temp/test.jks"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG,
"password"), alterOpts));
}
}
+ @ClusterTest
+ public void testUpdateInvalidBrokerConfigs() {
+ updateAndCheckInvalidBrokerConfig(Optional.empty());
+
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
+ ""));
+ }
+
+ private void updateAndCheckInvalidBrokerConfig(Optional<String>
brokerIdOrDefault) {
+ List<String> alterOpts =
generateDefaultAlterOpts(cluster.bootstrapServers());
+ try (Admin client = cluster.admin()) {
+ alterConfigWithAdmin(client, brokerIdOrDefault,
Collections.singletonMap("invalid", "2"), alterOpts);
+
+ Stream<String> describeCommand = Stream.concat(
+ Stream.concat(
+ Stream.of("--bootstrap-server",
cluster.bootstrapServers()),
+ Stream.of(entityOp(brokerIdOrDefault).toArray(new
String[0]))),
+ Stream.of("--entity-type", "brokers", "--describe"));
+ String describeResult = captureStandardStream(false,
run(describeCommand));
+
+ // We will treat unknown config as sensitive
+ assertTrue(describeResult.contains("sensitive=true"),
describeResult);
+ // Sensitive config will not return
+ assertTrue(describeResult.contains("invalid=null"),
describeResult);
+ }
+ }
+
+ @ClusterTest
+ public void testUpdateInvalidTopicConfigs() throws ExecutionException,
InterruptedException {
+ List<String> alterOpts = asList("--bootstrap-server",
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+ try (Admin client = cluster.admin()) {
+ client.createTopics(Collections.singletonList(new
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+ assertInstanceOf(
+ InvalidConfigurationException.class,
+ assertThrows(
+ ExecutionException.class,
+ () -> ConfigCommand.alterConfig(
+ client,
+ new ConfigCommand.ConfigCommandOptions(
+ toArray(alterOpts,
+ asList("--add-config",
"invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
+ ).getCause()
+ );
+ }
+ }
+
+ // Test case from KAFKA-13788
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size",
value = "2097154"),
+ })
+ public void testUpdateBrokerConfigNotAffectedByInvalidConfig() {
+ try (Admin client = cluster.admin()) {
+ ConfigCommand.alterConfig(client, new
ConfigCommand.ConfigCommandOptions(
+ toArray(asList("--bootstrap-server",
cluster.bootstrapServers(),
+ "--alter",
+ "--add-config", "log.cleaner.threadzz=2",
+ "--entity-type", "brokers",
+ "--entity-default"))));
+
+ ConfigCommand.alterConfig(client, new
ConfigCommand.ConfigCommandOptions(
+ toArray(asList("--bootstrap-server",
cluster.bootstrapServers(),
+ "--alter",
+ "--add-config", "log.cleaner.threads=2",
+ "--entity-type", "brokers",
+ "--entity-default"))));
+ kafka.utils.TestUtils.waitUntilTrue(
+ () ->
cluster.brokerSocketServers().stream().allMatch(broker ->
broker.config().getInt("log.cleaner.threads") == 2),
+ () -> "Timeout waiting for topic config propagating to
broker",
+ org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+ 100L);
+ }
+ }
+
+ @ClusterTest(
+ serverProperties = {@ClusterConfigProperty(key =
"log.cleaner.dedupe.buffer.size", value = "2097154")},
+ // Zk code has been removed, use kraft and mockito to mock this
situation
+ metadataVersion = MetadataVersion.IBP_3_3_IV0
+ )
+ public void testFallbackToDeprecatedAlterConfigs() throws
NoSuchMethodException, InvocationTargetException, InstantiationException,
IllegalAccessException {
+ try (Admin client = cluster.admin()) {
+ Admin spyAdmin = Mockito.spy(client);
+
+ AlterConfigsResult mockResult;
+ {
+ // Create a mock result of unsupported version error
+ KafkaFutureImpl<Void> future = new KafkaFutureImpl<>();
+ future.completeExceptionally(new
UnsupportedVersionException("simulated error"));
+ Constructor<AlterConfigsResult> constructor =
AlterConfigsResult.class.getDeclaredConstructor(java.util.Map.class);
+ constructor.setAccessible(true);
+ mockResult =
constructor.newInstance(Collections.singletonMap(new
ConfigResource(ConfigResource.Type.BROKER, ""), future));
+ constructor.setAccessible(false);
+ }
Review Comment:
nit: You may be able to use AdminClientTestUtils.alterConfigsResult.
##########
core/src/test/java/kafka/admin/ConfigCommandIntegrationTest.java:
##########
@@ -290,17 +303,125 @@ public void
testUpdatePerBrokerConfigInKRaftThenShouldFail() {
try (Admin client = cluster.admin()) {
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_TYPE_CONFIG,
"PKCS12"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_LOCATION_CONFIG,
"/temp/test.jks"), alterOpts));
assertThrows(ExecutionException.class,
- () -> alterConfigWithKraft(client,
Optional.of(defaultBrokerId),
+ () -> alterConfigWithAdmin(client,
Optional.of(defaultBrokerId),
singletonMap(SSL_TRUSTSTORE_PASSWORD_CONFIG,
"password"), alterOpts));
}
}
+ @ClusterTest
+ public void testUpdateInvalidBrokerConfigs() {
+ updateAndCheckInvalidBrokerConfig(Optional.empty());
+
updateAndCheckInvalidBrokerConfig(Optional.of(cluster.anyBrokerSocketServer().config().brokerId()
+ ""));
+ }
+
+ private void updateAndCheckInvalidBrokerConfig(Optional<String>
brokerIdOrDefault) {
+ List<String> alterOpts =
generateDefaultAlterOpts(cluster.bootstrapServers());
+ try (Admin client = cluster.admin()) {
+ alterConfigWithAdmin(client, brokerIdOrDefault,
Collections.singletonMap("invalid", "2"), alterOpts);
+
+ Stream<String> describeCommand = Stream.concat(
+ Stream.concat(
+ Stream.of("--bootstrap-server",
cluster.bootstrapServers()),
+ Stream.of(entityOp(brokerIdOrDefault).toArray(new
String[0]))),
+ Stream.of("--entity-type", "brokers", "--describe"));
+ String describeResult = captureStandardStream(false,
run(describeCommand));
+
+ // We will treat unknown config as sensitive
+ assertTrue(describeResult.contains("sensitive=true"),
describeResult);
+ // Sensitive config will not return
+ assertTrue(describeResult.contains("invalid=null"),
describeResult);
+ }
+ }
+
+ @ClusterTest
+ public void testUpdateInvalidTopicConfigs() throws ExecutionException,
InterruptedException {
+ List<String> alterOpts = asList("--bootstrap-server",
cluster.bootstrapServers(), "--entity-type", "topics", "--alter");
+ try (Admin client = cluster.admin()) {
+ client.createTopics(Collections.singletonList(new
NewTopic("test-config-topic", 1, (short) 1))).all().get();
+ assertInstanceOf(
+ InvalidConfigurationException.class,
+ assertThrows(
+ ExecutionException.class,
+ () -> ConfigCommand.alterConfig(
+ client,
+ new ConfigCommand.ConfigCommandOptions(
+ toArray(alterOpts,
+ asList("--add-config",
"invalid=2", "--entity-type", "topics", "--entity-name", "test-config-topic"))))
+ ).getCause()
+ );
+ }
+ }
+
+ // Test case from KAFKA-13788
+ @ClusterTest(serverProperties = {
+ @ClusterConfigProperty(key = "log.cleaner.dedupe.buffer.size",
value = "2097154"),
+ })
+ public void testUpdateBrokerConfigNotAffectedByInvalidConfig() {
+ try (Admin client = cluster.admin()) {
+ ConfigCommand.alterConfig(client, new
ConfigCommand.ConfigCommandOptions(
+ toArray(asList("--bootstrap-server",
cluster.bootstrapServers(),
+ "--alter",
+ "--add-config", "log.cleaner.threadzz=2",
+ "--entity-type", "brokers",
+ "--entity-default"))));
+
+ ConfigCommand.alterConfig(client, new
ConfigCommand.ConfigCommandOptions(
+ toArray(asList("--bootstrap-server",
cluster.bootstrapServers(),
+ "--alter",
+ "--add-config", "log.cleaner.threads=2",
+ "--entity-type", "brokers",
+ "--entity-default"))));
+ kafka.utils.TestUtils.waitUntilTrue(
+ () ->
cluster.brokerSocketServers().stream().allMatch(broker ->
broker.config().getInt("log.cleaner.threads") == 2),
+ () -> "Timeout waiting for topic config propagating to
broker",
+ org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+ 100L);
+ }
+ }
+
+ @ClusterTest(
+ serverProperties = {@ClusterConfigProperty(key =
"log.cleaner.dedupe.buffer.size", value = "2097154")},
+ // Zk code has been removed, use kraft and mockito to mock this
situation
+ metadataVersion = MetadataVersion.IBP_3_3_IV0
Review Comment:
ditto.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]