divijvaidya commented on code in PR #13959:
URL: https://github.com/apache/kafka/pull/13959#discussion_r1254353779


##########
core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala:
##########
@@ -297,15 +297,20 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
       // Shutdown controller. Request timeout is 30s, verify that shutdown 
completed well before that
       val shutdownFuture = executor.submit(new Runnable {
-        override def run(): Unit = controllerChannelManager.shutdown()
+        override def run(): Unit = {
+          controllerChannelManager.shutdown()
+          controllerChannelManager.removeMetrics()
+        }
       })
       shutdownFuture.get(10, TimeUnit.SECONDS)
 
     } finally {
       if (serverSocket != null)
         serverSocket.close()
-      if (controllerChannelManager != null)
+      if (controllerChannelManager != null) {
         controllerChannelManager.shutdown()

Review Comment:
   Is there a reason that controllerChannelManager.shutdown() is not performing 
the task of removing it's metrics (instead of having to call removeMetrics 
separatelty?)



##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -57,13 +63,19 @@ class ControllerChannelManager(controllerEpoch: () => Int,
                                threadNamePrefix: Option[String] = None) 
extends Logging {
   import ControllerChannelManager._
 
+  type MetricNameWithTagData = java.util.HashMap[String, 
java.util.Set[java.util.Map[String, String]]]
+
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
+  // Visible for testing
+  private[controller] val gaugeMetricNameWithTag = new MetricNameWithTagData

Review Comment:
   we want to ensure that this is not an ever growing map and we are correctly 
removing entries from it when applicable. same for other map.



##########
core/src/main/scala/kafka/controller/KafkaController.scala:
##########
@@ -537,6 +537,7 @@ class KafkaController(val config: KafkaConfig,
 
   private def removeMetrics(): Unit = {
     KafkaController.MetricNames.foreach(metricsGroup.removeMetric)
+    controllerChannelManager.removeMetrics()

Review Comment:
   KafkaController#shutdown() owns the responsibility of closing controller 
channel manager. ControllerChannelManager should own the responsibility of 
closing it's own metrics as part of it's shutdown()



##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -197,14 +222,29 @@ class ControllerChannelManager(controllerEpoch: () => Int,
       brokerState.requestSendThread.shutdown()
       brokerState.networkClient.close()
       brokerState.messageQueue.clear()
-      metricsGroup.removeMetric(QueueSizeMetricName, 
brokerMetricTags(brokerState.brokerNode.id))
-      metricsGroup.removeMetric(RequestRateAndQueueTimeMetricName, 
brokerMetricTags(brokerState.brokerNode.id))
+      removeMetricsForBroker(brokerState)
       brokerStateInfo.remove(brokerState.brokerNode.id)
     } catch {
       case e: Throwable => error("Error while removing broker by the 
controller", e)
     }
   }
 
+  private def removeMetricsForBroker(brokerState: ControllerBrokerStateInfo): 
Unit = {
+    val removeTagSet = new java.util.HashSet[java.util.Map[String, String]]()
+    val metricNameWithTag = new MetricNameWithTagData
+    metricNameWithTag.putAll(gaugeMetricNameWithTag)
+    metricNameWithTag.putAll(timerMetricNameWithTag)
+    metricNameWithTag.asScala.foreach(metricNameAndTags => {
+      metricNameAndTags._2.asScala.filter(tag => 
tag.get(BrokerMetricTagKeyName).equals(brokerState.brokerNode.id.toString))
+        .foreach(metricTag => {
+          metricsGroup.removeMetric(metricNameAndTags._1, metricTag)
+          removeTagSet.add(metricTag)
+        })
+    })
+    gaugeMetricNameWithTag.values().asScala.foreach(tagSet => 
tagSet.removeAll(removeTagSet))
+    timerMetricNameWithTag.values().asScala.foreach(tagSet => 
tagSet.removeAll(removeTagSet))
+  }

Review Comment:
   I think this could be simplified without having to create new map and 
copying data.
   
   e.g. you can pass `gaugeMetricNameWithTag` as a parameter to a function 
which can iterate through the value , filter the matching tags and remove the 
metrics for that tag. You can use the same function for timerMetricNameWithTag



##########
core/src/main/scala/kafka/controller/ControllerChannelManager.scala:
##########
@@ -171,21 +189,28 @@ class ControllerChannelManager(controllerEpoch: () => Int,
       case Some(name) => 
s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
     }
 
+    val brokerMetricTag = brokerMetricTags(broker.id)
     val requestRateAndQueueTimeMetrics = metricsGroup.newTimer(
-      RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, 
TimeUnit.SECONDS, brokerMetricTags(broker.id)
+      RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, 
TimeUnit.SECONDS, brokerMetricTag
     )
+    timerMetricNameWithTag.computeIfAbsent(RequestRateAndQueueTimeMetricName, 
k => new java.util.HashSet[java.util.Map[String, String]]())
+      .add(brokerMetricTag)
 
     val requestThread = new RequestSendThread(config.brokerId, 
controllerEpoch, messageQueue, networkClient,
       brokerNode, config, time, requestRateAndQueueTimeMetrics, 
stateChangeLogger, threadName)
     requestThread.setDaemon(false)
 
-    val queueSizeGauge = metricsGroup.newGauge(QueueSizeMetricName, () => 
messageQueue.size, brokerMetricTags(broker.id))
+    val queueSizeGauge = metricsGroup.newGauge(QueueSizeMetricName, () => 
messageQueue.size, brokerMetricTag)
+    gaugeMetricNameWithTag.computeIfAbsent(QueueSizeMetricName, k => new 
java.util.HashSet[java.util.Map[String, String]]())

Review Comment:
   these two lines are tightly coupled so we might want to create a method so 
that future implementors don't accidentally forget to call second line when 
writing a line similar to first.



##########
core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala:
##########
@@ -48,6 +55,76 @@ class ControllerChannelManagerTest {
   private val logger = new StateChangeLogger(controllerId, true, None)
 
   type ControlRequest = AbstractControlRequest.Builder[_ <: 
AbstractControlRequest]
+  type MetricNameWithTagData = java.util.HashMap[String, 
java.util.Set[java.util.Map[String, String]]]
+
+  @Test
+  def testRemoveMetricsOnShutdown(): Unit = {
+    val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup])
+    val metrics = new Metrics
+    val serverSocket = new ServerSocket(0)
+    try {
+      // Start a ControllerChannelManager
+      val securityProtocol = SecurityProtocol.PLAINTEXT
+      val listenerName = ListenerName.forSecurityProtocol(securityProtocol)
+      val brokerAndEpochs = Map(
+        (new Broker(2, "localhost2", serverSocket.getLocalPort, listenerName, 
securityProtocol), 0L),
+        (new Broker(3, "localhost3", serverSocket.getLocalPort, listenerName, 
securityProtocol), 0L)
+      )
+      val controllerConfig = 
KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, ""))
+      val controllerContext = new ControllerContext
+      controllerContext.setLiveBrokers(brokerAndEpochs)
+      val controllerChannelManager = new ControllerChannelManager(
+        () => controllerContext.epoch,
+        controllerConfig,
+        new MockTime,
+        metrics,
+        new StateChangeLogger(controllerId, inControllerContext = true, None))
+      
controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers)
+
+      // The first mockMetricsGroup is initialized in `ControllerContext`
+      val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(1)
+      ControllerChannelManager.GaugeMetricNameNoTag.foreach(metricName => 
verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any()))
+      
controllerChannelManager.gaugeMetricNameWithTag.asScala.foreach(metricNameTags 
=> {
+        metricNameTags._2.asScala.foreach(tag => 
verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricNameTags._1), 
any(), ArgumentMatchers.eq(tag)))
+      })
+      
controllerChannelManager.timerMetricNameWithTag.asScala.foreach(metricNameTags 
=> {
+        metricNameTags._2.asScala.foreach(tag => 
verify(mockMetricsGroup).newTimer(ArgumentMatchers.eq(metricNameTags._1), 
any(), any(), ArgumentMatchers.eq(tag)))
+      })
+
+      // Since `gaugeMetricNameWithTag` and `timerMetricNameWithTag` will 
clean up the related tags when `controllerChannelManager`
+      // is shut down, so we need to save it here and use it to verify the 
tags later.
+      val gaugeMetricNameWithTagToVerify = new MetricNameWithTagData
+      val timerMetricNameWithTagToVerify = new MetricNameWithTagData
+      
controllerChannelManager.gaugeMetricNameWithTag.asScala.foreach(metricNameTags 
=> {
+        val tagsToVerify = new java.util.HashSet[java.util.Map[String, 
String]]()
+        metricNameTags._2.asScala.foreach(tagsToVerify.add)
+        gaugeMetricNameWithTagToVerify.put(metricNameTags._1, tagsToVerify)
+      })
+      
controllerChannelManager.timerMetricNameWithTag.asScala.foreach(metricNameTags 
=> {
+        val tagsToVerify = new java.util.HashSet[java.util.Map[String, 
String]]()
+        metricNameTags._2.asScala.foreach(tagsToVerify.add)
+        timerMetricNameWithTagToVerify.put(metricNameTags._1, tagsToVerify)
+      })
+
+      controllerChannelManager.shutdown()
+      controllerChannelManager.removeMetrics()
+
+      // We can not use `gaugeMetricNameWithTag` to verify, because it is 
cleared.
+      gaugeMetricNameWithTagToVerify.asScala.foreach(metricNameTags => {
+        metricNameTags._2.asScala.foreach(tag => 
verify(mockMetricsGroup).removeMetric(ArgumentMatchers.eq(metricNameTags._1), 
ArgumentMatchers.eq(tag)))
+      })
+      timerMetricNameWithTagToVerify.asScala.foreach(metricNameTags => {
+        metricNameTags._2.asScala.foreach(tag => 
verify(mockMetricsGroup).removeMetric(ArgumentMatchers.eq(metricNameTags._1), 
ArgumentMatchers.eq(tag)))
+      })
+      
ControllerChannelManager.GaugeMetricNameNoTag.foreach(verify(mockMetricsGroup).removeMetric(_))
+
+      verifyNoMoreInteractions(mockMetricsGroup)
+    } finally {
+      mockMetricsGroupCtor.close()
+      metrics.close()
+      serverSocket.close()

Review Comment:
   This could be be written as try-with-resource.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to