chia7712 commented on code in PR #15304:
URL: https://github.com/apache/kafka/pull/15304#discussion_r1864859978


##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -172,27 +178,25 @@ object ConfigCommand extends Logging {
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
     entityTypeHead match {
-      case ConfigType.TOPIC =>
-        alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
-      case ConfigType.BROKER =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-        val sensitiveEntries = newEntries.filter(_._2.value == null)
-        if (sensitiveEntries.nonEmpty)
-          throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-        val newConfig = new JConfig(newEntries.asJava.values)
-
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+      case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER | 
ConfigType.GROUP =>
+        val configResourceType = entityTypeHead match {
+          case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+          case ConfigType.CLIENT_METRICS => ConfigResource.Type.CLIENT_METRICS
+          case ConfigType.BROKER => ConfigResource.Type.BROKER
+          case ConfigType.GROUP => ConfigResource.Type.GROUP
+        }
+        try {
+          alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, configResourceType)
+        } catch {
+          case e: ExecutionException =>
+            e.getCause match {
+              case _: UnsupportedVersionException =>
+                throw new UnsupportedVersionException(s"The 
${ApiKeys.INCREMENTAL_ALTER_CONFIGS} API is not supported by the cluster. The 
API is supported starting from version 2.3.0."

Review Comment:
   Does this implementation align with 
[KIP-1011](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1011%3A+Use+incrementalAlterConfigs+when+updating+broker+configs+by+kafka-configs.sh)?
 Here’s a relevant quote from KIP-1011 for reference.
   
   > When updating broker config,  instead of using Admin.alterConfigs, we will 
use Admin.incrementalAlterConfigs and fallback to use alterConfigs 
automatically if incrementalAlterConfigs is not supported
   
   Please let me know if I’ve missed any important discussions in this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to