dengziming commented on code in PR #15304:
URL: https://github.com/apache/kafka/pull/15304#discussion_r1861624180


##########
core/src/test/scala/unit/kafka/utils/TestUtils.scala:
##########
@@ -333,6 +333,9 @@ object TestUtils extends Logging {
     if 
(!props.containsKey(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG))
       
props.put(GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "0")
     rack.foreach(props.put(ServerConfigs.BROKER_RACK_CONFIG, _))
+    // reduce log cleaner offset map memory usage, must be at greater than 1MB 
per cleaner thread, set to 2M+2 so that
+    // we can set 2 cleaner threads.
+    props.put(CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP, "2097154")

Review Comment:
   This approach is more reasonable, I reverted the changes in TestUtils and 
KafkaClusterTestKit, but we need to change `props.put` to `props.putIfAbsent` 
in  KafkaClusterTestKit since it will overwrite the original value.
   Outdated



##########
core/src/main/scala/kafka/admin/ConfigCommand.scala:
##########
@@ -172,27 +173,26 @@ object ConfigCommand extends Logging {
     val configsToBeDeleted = parseConfigsToBeDeleted(opts)
 
     entityTypeHead match {
-      case ConfigType.TOPIC =>
-        alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, ConfigResource.Type.TOPIC)
-
-      case ConfigType.BROKER =>
-        val oldConfig = getResourceConfig(adminClient, entityTypeHead, 
entityNameHead, includeSynonyms = false, describeAll = false)
-          .map { entry => (entry.name, entry) }.toMap
-
-        // fail the command if any of the configs to be deleted does not exist
-        val invalidConfigs = configsToBeDeleted.filterNot(oldConfig.contains)
-        if (invalidConfigs.nonEmpty)
-          throw new InvalidConfigurationException(s"Invalid config(s): 
${invalidConfigs.mkString(",")}")
-
-        val newEntries = oldConfig ++ configsToBeAdded -- configsToBeDeleted
-        val sensitiveEntries = newEntries.filter(_._2.value == null)
-        if (sensitiveEntries.nonEmpty)
-          throw new InvalidConfigurationException(s"All sensitive broker 
config entries must be specified for --alter, missing entries: 
${sensitiveEntries.keySet}")
-        val newConfig = new JConfig(newEntries.asJava.values)
-
-        val configResource = new ConfigResource(ConfigResource.Type.BROKER, 
entityNameHead)
-        val alterOptions = new 
AlterConfigsOptions().timeoutMs(30000).validateOnly(false)
-        adminClient.alterConfigs(Map(configResource -> newConfig).asJava, 
alterOptions).all().get(60, TimeUnit.SECONDS)
+      case ConfigType.TOPIC | ConfigType.CLIENT_METRICS | ConfigType.BROKER | 
ConfigType.GROUP =>
+        val configResourceType = entityTypeHead match {
+          case ConfigType.TOPIC => ConfigResource.Type.TOPIC
+          case ConfigType.CLIENT_METRICS =>ConfigResource.Type.CLIENT_METRICS
+          case ConfigType.BROKER => ConfigResource.Type.BROKER
+          case ConfigType.GROUP => ConfigResource.Type.GROUP
+        }
+        try {
+          alterResourceConfig(adminClient, entityTypeHead, entityNameHead, 
configsToBeDeleted, configsToBeAdded, configResourceType)
+        } catch {
+          case e: ExecutionException =>
+            e.getCause match {
+              case _: UnsupportedVersionException if entityTypeHead == 
ConfigType.BROKER =>
+                System.err.println(s"Could not update broker config 
$entityNameHead, because brokers don't support api 
${ApiKeys.INCREMENTAL_ALTER_CONFIGS},"
+                + " You can upgrade your brokers to version 2.3.0 or newer to 
avoid this error.")
+                return

Review Comment:
   Catching it here is more accurate just in case of other unsupported-error 
from the server, and it should be handled in the main function, I wrapped a new 
Exception and rethrow to main causing the process to exit with code 1, this 
seems more reasonalbe.
   
![image](https://github.com/user-attachments/assets/9816353a-2fbe-4689-89c6-b3f4e30a89d4)
     



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to