dengziming commented on code in PR #15304:
URL: https://github.com/apache/kafka/pull/15304#discussion_r1865129743
##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -172,27 +178,25 @@ object ConfigCommand extends Logging {
val configsToBeDeleted = parseConfigsToBeDeleted(opts)
entityTypeHead match {
- case ConfigType.TOPIC =>
- alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
- case ConfigType.BROKER =>
- val oldConfig = getResourceConfig(adminClient, entityTypeHead,
entityNameHead, includeSynonyms = false, describeAll = false)
- .map { entry => (entry.name, entry) }.toMap
-
- // fail the command if any of the configs to be deleted does not exist
- val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
- if (invalidConfigs.nonEmpty)
- throw new InvalidConfigurationException(s"Invalid config(s):
${invalidConfigs.mkString(",")}")
-
- val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
- val sensitiveEntries = newEntries.filter(_._2.value == null)
- if (sensitiveEntries.nonEmpty)
- throw new InvalidConfigurationException(s"All sensitive broker
config entries must be specified for --alter, missing entries:
${sensitiveEntries.keySet}")
- val newConfig = new JConfig(newEntries.asJava.values)
-
- val configResource = new ConfigResource(ConfigResource.Type.BROKER,
entityNameHead)
- val alterOptions = new
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
- adminClient.alterConfigs(Map(configResource -> newConfig).asJava,
alterOptions).all().get(60, TimeUnit.SECONDS)
+ case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER |
ConfigType.GROUP =>
+ val configResourceType = entityTypeHead match {
+ case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+ case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
+ case ConfigType.BROKER => ConfigResource.Type.BROKER
+ case ConfigType.GROUP => ConfigResource.Type.GROUP
+ }
+ try {
+ alterResourceConfig(adminClient, entityTypeHead, entityNameHead,
configsToBeDeleted, configsToBeAdded, configResourceType)
+ } catch {
+ case e: ExecutionException =>
+ e.getCause match {
+ case _: UnsupportedVersionException =>
+ throw new UnsupportedVersionException(s"The
${ApiKeys.INCREMENTAL_ALTER_CONFIGS} API is not supported by the cluster. The
API is supported starting from version 2.3.0."
Review Comment:
Hi @chia7712 , though this KIP is intended to be landed in release-3.8, it
was delayed to 4.0, so whether to maintain this compatible deprecation period
has became a question.
And in the KIP we have declared that we will fallback to use `alterConfigs`
and this fallback logic will be removed in 4.0, so we just throw an
`UnsupportedVersionException` since we are already planning for 4.0.
Here is the discussion thread and I will add it to the KIP docs today, sorry
for the delay. https://lists.apache.org/thread/m9pvhgfwtr05rf9sdhn22x11cvtbgv6w
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]