divijvaidya commented on code in PR #13959: URL: https://github.com/apache/kafka/pull/13959#discussion_r1254353779
########## core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala: ########## @@ -297,15 +297,20 @@ class ServerShutdownTest extends KafkaServerTestHarness { // Shutdown controller. Request timeout is 30s, verify that shutdown completed well before that val shutdownFuture = executor.submit(new Runnable { - override def run(): Unit = controllerChannelManager.shutdown() + override def run(): Unit = { + controllerChannelManager.shutdown() + controllerChannelManager.removeMetrics() + } }) shutdownFuture.get(10, TimeUnit.SECONDS) } finally { if (serverSocket != null) serverSocket.close() - if (controllerChannelManager != null) + if (controllerChannelManager != null) { controllerChannelManager.shutdown() Review Comment: Is there a reason that controllerChannelManager.shutdown() is not performing the task of removing it's metrics (instead of having to call removeMetrics separatelty?) ########## core/src/main/scala/kafka/controller/ControllerChannelManager.scala: ########## @@ -57,13 +63,19 @@ class ControllerChannelManager(controllerEpoch: () => Int, threadNamePrefix: Option[String] = None) extends Logging { import ControllerChannelManager._ + type MetricNameWithTagData = java.util.HashMap[String, java.util.Set[java.util.Map[String, String]]] + private val metricsGroup = new KafkaMetricsGroup(this.getClass) + // Visible for testing + private[controller] val gaugeMetricNameWithTag = new MetricNameWithTagData Review Comment: we want to ensure that this is not an ever growing map and we are correctly removing entries from it when applicable. same for other map. ########## core/src/main/scala/kafka/controller/KafkaController.scala: ########## @@ -537,6 +537,7 @@ class KafkaController(val config: KafkaConfig, private def removeMetrics(): Unit = { KafkaController.MetricNames.foreach(metricsGroup.removeMetric) + controllerChannelManager.removeMetrics() Review Comment: KafkaController#shutdown() owns the responsibility of closing controller channel manager. ControllerChannelManager should own the responsibility of closing it's own metrics as part of it's shutdown() ########## core/src/main/scala/kafka/controller/ControllerChannelManager.scala: ########## @@ -197,14 +222,29 @@ class ControllerChannelManager(controllerEpoch: () => Int, brokerState.requestSendThread.shutdown() brokerState.networkClient.close() brokerState.messageQueue.clear() - metricsGroup.removeMetric(QueueSizeMetricName, brokerMetricTags(brokerState.brokerNode.id)) - metricsGroup.removeMetric(RequestRateAndQueueTimeMetricName, brokerMetricTags(brokerState.brokerNode.id)) + removeMetricsForBroker(brokerState) brokerStateInfo.remove(brokerState.brokerNode.id) } catch { case e: Throwable => error("Error while removing broker by the controller", e) } } + private def removeMetricsForBroker(brokerState: ControllerBrokerStateInfo): Unit = { + val removeTagSet = new java.util.HashSet[java.util.Map[String, String]]() + val metricNameWithTag = new MetricNameWithTagData + metricNameWithTag.putAll(gaugeMetricNameWithTag) + metricNameWithTag.putAll(timerMetricNameWithTag) + metricNameWithTag.asScala.foreach(metricNameAndTags => { + metricNameAndTags._2.asScala.filter(tag => tag.get(BrokerMetricTagKeyName).equals(brokerState.brokerNode.id.toString)) + .foreach(metricTag => { + metricsGroup.removeMetric(metricNameAndTags._1, metricTag) + removeTagSet.add(metricTag) + }) + }) + gaugeMetricNameWithTag.values().asScala.foreach(tagSet => tagSet.removeAll(removeTagSet)) + timerMetricNameWithTag.values().asScala.foreach(tagSet => tagSet.removeAll(removeTagSet)) + } Review Comment: I think this could be simplified without having to create new map and copying data. e.g. you can pass `gaugeMetricNameWithTag` as a parameter to a function which can iterate through the value , filter the matching tags and remove the metrics for that tag. You can use the same function for timerMetricNameWithTag ########## core/src/main/scala/kafka/controller/ControllerChannelManager.scala: ########## @@ -171,21 +189,28 @@ class ControllerChannelManager(controllerEpoch: () => Int, case Some(name) => s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread" } + val brokerMetricTag = brokerMetricTags(broker.id) val requestRateAndQueueTimeMetrics = metricsGroup.newTimer( - RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS, brokerMetricTags(broker.id) + RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS, brokerMetricTag ) + timerMetricNameWithTag.computeIfAbsent(RequestRateAndQueueTimeMetricName, k => new java.util.HashSet[java.util.Map[String, String]]()) + .add(brokerMetricTag) val requestThread = new RequestSendThread(config.brokerId, controllerEpoch, messageQueue, networkClient, brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName) requestThread.setDaemon(false) - val queueSizeGauge = metricsGroup.newGauge(QueueSizeMetricName, () => messageQueue.size, brokerMetricTags(broker.id)) + val queueSizeGauge = metricsGroup.newGauge(QueueSizeMetricName, () => messageQueue.size, brokerMetricTag) + gaugeMetricNameWithTag.computeIfAbsent(QueueSizeMetricName, k => new java.util.HashSet[java.util.Map[String, String]]()) Review Comment: these two lines are tightly coupled so we might want to create a method so that future implementors don't accidentally forget to call second line when writing a line similar to first. ########## core/src/test/scala/unit/kafka/controller/ControllerChannelManagerTest.scala: ########## @@ -48,6 +55,76 @@ class ControllerChannelManagerTest { private val logger = new StateChangeLogger(controllerId, true, None) type ControlRequest = AbstractControlRequest.Builder[_ <: AbstractControlRequest] + type MetricNameWithTagData = java.util.HashMap[String, java.util.Set[java.util.Map[String, String]]] + + @Test + def testRemoveMetricsOnShutdown(): Unit = { + val mockMetricsGroupCtor = mockConstruction(classOf[KafkaMetricsGroup]) + val metrics = new Metrics + val serverSocket = new ServerSocket(0) + try { + // Start a ControllerChannelManager + val securityProtocol = SecurityProtocol.PLAINTEXT + val listenerName = ListenerName.forSecurityProtocol(securityProtocol) + val brokerAndEpochs = Map( + (new Broker(2, "localhost2", serverSocket.getLocalPort, listenerName, securityProtocol), 0L), + (new Broker(3, "localhost3", serverSocket.getLocalPort, listenerName, securityProtocol), 0L) + ) + val controllerConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(controllerId, "")) + val controllerContext = new ControllerContext + controllerContext.setLiveBrokers(brokerAndEpochs) + val controllerChannelManager = new ControllerChannelManager( + () => controllerContext.epoch, + controllerConfig, + new MockTime, + metrics, + new StateChangeLogger(controllerId, inControllerContext = true, None)) + controllerChannelManager.startup(controllerContext.liveOrShuttingDownBrokers) + + // The first mockMetricsGroup is initialized in `ControllerContext` + val mockMetricsGroup = mockMetricsGroupCtor.constructed.get(1) + ControllerChannelManager.GaugeMetricNameNoTag.foreach(metricName => verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricName), any())) + controllerChannelManager.gaugeMetricNameWithTag.asScala.foreach(metricNameTags => { + metricNameTags._2.asScala.foreach(tag => verify(mockMetricsGroup).newGauge(ArgumentMatchers.eq(metricNameTags._1), any(), ArgumentMatchers.eq(tag))) + }) + controllerChannelManager.timerMetricNameWithTag.asScala.foreach(metricNameTags => { + metricNameTags._2.asScala.foreach(tag => verify(mockMetricsGroup).newTimer(ArgumentMatchers.eq(metricNameTags._1), any(), any(), ArgumentMatchers.eq(tag))) + }) + + // Since `gaugeMetricNameWithTag` and `timerMetricNameWithTag` will clean up the related tags when `controllerChannelManager` + // is shut down, so we need to save it here and use it to verify the tags later. + val gaugeMetricNameWithTagToVerify = new MetricNameWithTagData + val timerMetricNameWithTagToVerify = new MetricNameWithTagData + controllerChannelManager.gaugeMetricNameWithTag.asScala.foreach(metricNameTags => { + val tagsToVerify = new java.util.HashSet[java.util.Map[String, String]]() + metricNameTags._2.asScala.foreach(tagsToVerify.add) + gaugeMetricNameWithTagToVerify.put(metricNameTags._1, tagsToVerify) + }) + controllerChannelManager.timerMetricNameWithTag.asScala.foreach(metricNameTags => { + val tagsToVerify = new java.util.HashSet[java.util.Map[String, String]]() + metricNameTags._2.asScala.foreach(tagsToVerify.add) + timerMetricNameWithTagToVerify.put(metricNameTags._1, tagsToVerify) + }) + + controllerChannelManager.shutdown() + controllerChannelManager.removeMetrics() + + // We can not use `gaugeMetricNameWithTag` to verify, because it is cleared. + gaugeMetricNameWithTagToVerify.asScala.foreach(metricNameTags => { + metricNameTags._2.asScala.foreach(tag => verify(mockMetricsGroup).removeMetric(ArgumentMatchers.eq(metricNameTags._1), ArgumentMatchers.eq(tag))) + }) + timerMetricNameWithTagToVerify.asScala.foreach(metricNameTags => { + metricNameTags._2.asScala.foreach(tag => verify(mockMetricsGroup).removeMetric(ArgumentMatchers.eq(metricNameTags._1), ArgumentMatchers.eq(tag))) + }) + ControllerChannelManager.GaugeMetricNameNoTag.foreach(verify(mockMetricsGroup).removeMetric(_)) + + verifyNoMoreInteractions(mockMetricsGroup) + } finally { + mockMetricsGroupCtor.close() + metrics.close() + serverSocket.close() Review Comment: This could be be written as try-with-resource. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org