showuon commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1461839024


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -529,20 +489,16 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
   }
 }
 
-class BrokerTopicAggregatedMetric() {
-  private val partitionMetricValues = new ConcurrentHashMap[Int, Long]()
-
-  def setPartitionMetricValue(partition: Int, partitionValue: Long): Unit = {
-    partitionMetricValues.put(partition, partitionValue)
-  }
-
-  def removePartition(partition: Int): Option[Long] = {
-    Option.apply(partitionMetricValues.remove(partition))
-  }
-
-  def value(): Long = partitionMetricValues.values().stream().mapToLong(v => 
v).sum()
-
-  def close(): Unit = partitionMetricValues.clear()
+class AggregatedMetric {
+  // The map to store:
+  //   - per-partition value for topic-level metrics. The key will be the 
partition number
+  //   - per-topic value for broker-level metrics. The key will be the topic 
name
+  private val metricValues = new ConcurrentHashMap[String, Long]()
+  def setValue(key: String, value: Long): Unit = metricValues.put(key, value)
+  def removeKey(key: String): Option[Long] = 
Option.apply(metricValues.remove(key))
+  // Sum all values in the metricValues map
+  def value(): Long = metricValues.values().stream().mapToLong(v => v).sum()
+  def close(): Unit = metricValues.clear()
 }

Review Comment:
   After the latest refactor, we will use the `AggregatedMetric` for per-topic 
and per-broker metrics, which makes it simpler. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to