[ https://issues.apache.org/jira/browse/KAFKA-13417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jason Gustafson updated KAFKA-13417: ------------------------------------ Description: `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to update the current configuration and to let each `Reconfigurable` process the update: {code} val oldConfig = currentConfig val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false) if (newConfig ne currentConfig) { currentConfig = newConfig kafkaConfig.updateCurrentConfig(newConfig) // Process BrokerReconfigurable updates after current config is updated brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig)) } {code} The problem here is that `currentConfig` gets initialized as `kafkaConfig` which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` and consequently `oldConfig`. The problem with this is that some of the `reconfigure` implementations will only apply a new configuration if the value in `oldConfig` does not match the value in `newConfig`. For example, here is the logic to update thread pools dynamically: {code} override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (newConfig.numIoThreads != oldConfig.numIoThreads) server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads) if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir) server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) if (newConfig.backgroundThreads != oldConfig.backgroundThreads) server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) } {code} Because of this, the dynamic update will not get applied the first time it is made. I believe subsequent updates would work correctly though because we would have lost the indirect reference to `kafkaConfig`. Other than the `DynamicThreadPool` configurations, it looks like the config to update unclean leader election may also be affected by this bug. NOTE: This bug only affects kraft, which is missing the call to `DynamicBrokerConfig.initialize()`. was: `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to update the current configuration and to let each `Reconfigurable` process the update: {code} val oldConfig = currentConfig val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false) if (newConfig ne currentConfig) { currentConfig = newConfig kafkaConfig.updateCurrentConfig(newConfig) // Process BrokerReconfigurable updates after current config is updated brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig)) } {code} The problem here is that `currentConfig` gets initialized as `kafkaConfig` which means that the first call to `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` and consequently `oldConfig`. The problem with this is that some of the `reconfigure` implementations will only apply a new configuration if the value in `oldConfig` does not match the value in `newConfig`. For example, here is the logic to update thread pools dynamically: {code} override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = { if (newConfig.numIoThreads != oldConfig.numIoThreads) server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads) if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir) server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) if (newConfig.backgroundThreads != oldConfig.backgroundThreads) server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) } {code} Because of this, the dynamic update will not get applied the first time it is made. I believe subsequent updates would work correctly though because we would have lost the indirect reference to `kafkaConfig`. Other than the `DynamicThreadPool` configurations, it looks like the config to update unclean leader election may also be affected by this bug. > Dynamic thread pool re-configurations may not get processed > ----------------------------------------------------------- > > Key: KAFKA-13417 > URL: https://issues.apache.org/jira/browse/KAFKA-13417 > Project: Kafka > Issue Type: Bug > Reporter: Jason Gustafson > Assignee: Jason Gustafson > Priority: Major > > `DynamicBrokerConfig.updateCurrentConfig` includes the following logic to > update the current configuration and to let each `Reconfigurable` process the > update: > {code} > val oldConfig = currentConfig > val (newConfig, brokerReconfigurablesToUpdate) = > processReconfiguration(newProps, validateOnly = false) > if (newConfig ne currentConfig) { > currentConfig = newConfig > kafkaConfig.updateCurrentConfig(newConfig) > // Process BrokerReconfigurable updates after current config is updated > brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, > newConfig)) > } > {code} > The problem here is that `currentConfig` gets initialized as `kafkaConfig` > which means that the first call to > `kafkaConfig.updateCurrentConfig(newConfig)` ends up mutating `currentConfig` > and consequently `oldConfig`. The problem with this is that some of the > `reconfigure` implementations will only apply a new configuration if the > value in `oldConfig` does not match the value in `newConfig`. For example, > here is the logic to update thread pools dynamically: > {code} > override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): > Unit = { > if (newConfig.numIoThreads != oldConfig.numIoThreads) > > server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads) > if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads) > server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, > newConfig.numNetworkThreads) > if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers) > > server.replicaManager.replicaFetcherManager.resizeThreadPool(newConfig.numReplicaFetchers) > if (newConfig.numRecoveryThreadsPerDataDir != > oldConfig.numRecoveryThreadsPerDataDir) > > server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir) > if (newConfig.backgroundThreads != oldConfig.backgroundThreads) > server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads) > } > {code} > Because of this, the dynamic update will not get applied the first time it is > made. I believe subsequent updates would work correctly though because we > would have lost the indirect reference to `kafkaConfig`. Other than the > `DynamicThreadPool` configurations, it looks like the config to update > unclean leader election may also be affected by this bug. > NOTE: This bug only affects kraft, which is missing the call to > `DynamicBrokerConfig.initialize()`. -- This message was sent by Atlassian Jira (v8.20.1#820001)