hachikuji commented on code in PR #12063:
URL: https://github.com/apache/kafka/pull/12063#discussion_r852470161


##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -906,17 +906,22 @@ class DynamicClientQuotaCallback(server: KafkaBroker) 
extends Reconfigurable {
   }
 }
 
-class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable 
with Logging {
+class DynamicListenerConfig(
+  broker: KafkaBroker,
+) extends BrokerReconfigurable with Logging {
 
   override def reconfigurableConfigs: Set[String] = {
     DynamicListenerConfig.ReconfigurableConfigs
   }
 
-  def validateReconfiguration(newConfig: KafkaConfig): Unit = {
-    val oldConfig = server.config
-    if (!oldConfig.requiresZookeeper) {
+  def checkIsNotKRaft(): Unit = {
+    if (!broker.config.requiresZookeeper) {
       throw new ConfigException("Dynamic reconfiguration of listeners is not 
yet supported when using a Raft-based metadata quorum")

Review Comment:
   It seems a little strange to throw such a specific exception for a 
generically named method like `checkIsNotKRaft`. How about 
`verifyDynamicListenersSupported()` or something like that?



##########
core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala:
##########
@@ -142,4 +157,80 @@ class BrokerMetadataPublisherTest {
     new TopicsImage(idsMap.asJava, namesMap.asJava)
   }
 
+  class TestBrokerMetadataPublisher(
+    conf: KafkaConfig,
+    metadataCache: KRaftMetadataCache,
+    logManager: LogManager,
+    replicaManager: ReplicaManager,
+    groupCoordinator: GroupCoordinator,
+    txnCoordinator: TransactionCoordinator,
+    clientQuotaMetadataManager: ClientQuotaMetadataManager,
+    featureCache: FinalizedFeatureCache,
+    dynamicConfigHandlers: Map[String, ConfigHandler],
+    private val _authorizer: Option[Authorizer]
+  ) extends BrokerMetadataPublisher(conf,
+      metadataCache,
+      logManager,
+      replicaManager,
+      groupCoordinator,
+      txnCoordinator,
+      clientQuotaMetadataManager,
+      featureCache,
+      dynamicConfigHandlers,
+      _authorizer)
+  {
+    val numTimesReloadCalled = new AtomicInteger(0)
+
+    override def reloadUpdatedFilesWithoutConfigChange(props: Properties): 
Unit = {

Review Comment:
   It feels like we could probably do this with a Mockito spy.



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala:
##########
@@ -248,6 +248,24 @@ class BrokerMetadataListener(
     }
   }
 
+  // This is used in tests to alter the publisher that is in use by the broker.
+  def alterPublisher(publisher: MetadataPublisher): CompletableFuture[Void] = {
+    val event = new AlterPublisherEvent(publisher)
+    eventQueue.append(event)
+    event.future
+  }
+
+  class AlterPublisherEvent(publisher: MetadataPublisher)
+    extends EventQueue.FailureLoggingEvent(log) {
+    val future = new CompletableFuture[Void]()
+
+    override def run(): Unit = {
+      _publisher = Some(publisher)
+      log.info(s"Set publisher to ${publisher}")

Review Comment:
   Why info? Seems more like a debug level message. 



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -950,16 +963,17 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
     val listenersAdded = newListeners.filterNot(e => 
oldListenerMap.contains(e.listenerName))
 
     // Clear SASL login cache to force re-login
-    if (listenersAdded.nonEmpty || listenersRemoved.nonEmpty)
-      LoginManager.closeAll()
-
-    server.socketServer.removeListeners(listenersRemoved)
-    if (listenersAdded.nonEmpty)
-      server.socketServer.addListeners(listenersAdded)
-
-    server match {
-      case kafkaServer: KafkaServer => 
kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
-      case _ =>
+    LoginManager.closeAll()

Review Comment:
   Do we still want to do this even if we reject the configuration change?



##########
core/src/main/scala/kafka/server/DynamicBrokerConfig.scala:
##########
@@ -906,17 +906,22 @@ class DynamicClientQuotaCallback(server: KafkaBroker) 
extends Reconfigurable {
   }
 }
 
-class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable 
with Logging {
+class DynamicListenerConfig(
+  broker: KafkaBroker,
+) extends BrokerReconfigurable with Logging {
 
   override def reconfigurableConfigs: Set[String] = {
     DynamicListenerConfig.ReconfigurableConfigs
   }
 
-  def validateReconfiguration(newConfig: KafkaConfig): Unit = {
-    val oldConfig = server.config
-    if (!oldConfig.requiresZookeeper) {
+  def checkIsNotKRaft(): Unit = {

Review Comment:
   nit: could this be private?



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -187,21 +189,26 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
                 toLoggableProps(resource, props).mkString(","))
               dynamicConfigHandlers(ConfigType.Topic).
                 processConfigChanges(resource.name(), props)
-              conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
-            case BROKER => if (resource.name().isEmpty) {
-              // Apply changes to "cluster configs" (also known as default 
BROKER configs).
-              // These are stored in KRaft with an empty name field.
-              info(s"Updating cluster configuration : " +
-                toLoggableProps(resource, props).mkString(","))
-              dynamicConfigHandlers(ConfigType.Broker).
-                processConfigChanges(ConfigEntityName.Default, props)
-            } else if (resource.name().equals(brokerId.toString)) {
-              // Apply changes to this broker's dynamic configuration.
-              info(s"Updating broker ${brokerId} with new configuration : " +
-                toLoggableProps(resource, props).mkString(","))
-              dynamicConfigHandlers(ConfigType.Broker).
-                processConfigChanges(resource.name(), props)
-            }
+            case BROKER =>
+              if (resource.name().isEmpty) {
+                // Apply changes to "cluster configs" (also known as default 
BROKER configs).
+                // These are stored in KRaft with an empty name field.
+                info(s"Updating cluster configuration : " +
+                  toLoggableProps(resource, props).mkString(","))
+                dynamicConfigHandlers(ConfigType.Broker).
+                  processConfigChanges(ConfigEntityName.Default, props)
+              } else if (resource.name().equals(brokerId.toString)) {

Review Comment:
   nit: in scala, we can use `==`



##########
core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala:
##########
@@ -187,21 +189,26 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
                 toLoggableProps(resource, props).mkString(","))
               dynamicConfigHandlers(ConfigType.Topic).
                 processConfigChanges(resource.name(), props)
-              conf.dynamicConfig.reloadUpdatedFilesWithoutConfigChange(props)
-            case BROKER => if (resource.name().isEmpty) {
-              // Apply changes to "cluster configs" (also known as default 
BROKER configs).
-              // These are stored in KRaft with an empty name field.
-              info(s"Updating cluster configuration : " +
-                toLoggableProps(resource, props).mkString(","))
-              dynamicConfigHandlers(ConfigType.Broker).
-                processConfigChanges(ConfigEntityName.Default, props)
-            } else if (resource.name().equals(brokerId.toString)) {
-              // Apply changes to this broker's dynamic configuration.
-              info(s"Updating broker ${brokerId} with new configuration : " +
-                toLoggableProps(resource, props).mkString(","))
-              dynamicConfigHandlers(ConfigType.Broker).
-                processConfigChanges(resource.name(), props)
-            }
+            case BROKER =>
+              if (resource.name().isEmpty) {
+                // Apply changes to "cluster configs" (also known as default 
BROKER configs).
+                // These are stored in KRaft with an empty name field.
+                info(s"Updating cluster configuration : " +

Review Comment:
   nit: either drop the `s` prefix before the string or move `toLoggableProps` 
to an `${}` substitution



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to