divijvaidya commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1458819718
########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -529,20 +564,33 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf } } -class BrokerTopicAggregatedMetric() { +class BrokerTopicAggregatedMetric(allTopics: Boolean = false) { Review Comment: This class is specific to aggregated metrics on a topic. We are hacking it to provide broker level metrics. Maybe a BrokerAggregatedMetric is required? ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -787,9 +787,14 @@ private void copyLogSegment(UnifiedLog log, LogSegment segment, long nextSegment long bytesLag = log.onlyLocalLogSegmentsSize() - log.activeSegment().size(); String topic = topicIdPartition.topic(); int partition = topicIdPartition.partition(); - brokerTopicStats.topicStats(topic).recordRemoteCopyLagBytes(partition, bytesLag); + + BrokerTopicMetrics brokerTopicMetrics = brokerTopicStats.topicStats(topic); + brokerTopicMetrics.recordRemoteCopyLagBytes(partition, bytesLag); + brokerTopicStats.allTopicsStats().recordRemoteCopyLagBytes(topic, brokerTopicMetrics.remoteCopyLagBytes()); Review Comment: nit perhaps worth adding a comment on why we are using `brokerTopicMetrics.remoteCopyLagBytes()` instead simply updating with `bytesLag` here. I think the answer is that we want to update the "topic" level metric with an aggregate value across partitions and `bytesLag` just represents this partition. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1651,14 +1665,25 @@ private static void shutdownAndAwaitTermination(ExecutorService pool, String poo private void removeRemoteTopicPartitionMetrics(TopicIdPartition topicIdPartition) { BrokerTopicMetrics topicMetrics = brokerTopicStats.topicStats(topicIdPartition.topic()); + String topic = topicIdPartition.topic(); int partition = topicIdPartition.partition(); + // remove the partition metric values topicMetrics.removeRemoteCopyLagBytes(partition); topicMetrics.removeRemoteCopyLagSegments(partition); topicMetrics.removeRemoteDeleteLagBytes(partition); topicMetrics.removeRemoteDeleteLagSegments(partition); topicMetrics.removeRemoteLogMetadataCount(partition); topicMetrics.removeRemoteLogSizeComputationTime(partition); topicMetrics.removeRemoteLogSizeBytes(partition); + + // update the all topic metric values Review Comment: suggestion change comment to: Update the all topic metric values so that we have a sample right for all topics metric after removal of partition. In absence of this update, the all topic metric will not update until the next sample point is generated for it. ########## core/src/main/java/kafka/log/remote/RemoteLogManager.java: ########## @@ -1175,7 +1186,10 @@ private Optional<RetentionSizeData> buildRetentionSizeData(long retentionSize, } } } - brokerTopicStats.topicStats(topicIdPartition.topic()).recordRemoteLogSizeComputationTime(topicIdPartition.partition(), time.milliseconds() - startTimeMs); + + BrokerTopicMetrics brokerTopicMetrics = brokerTopicStats.topicStats(topicIdPartition.topic()); + brokerTopicMetrics.recordRemoteLogSizeComputationTime(topicIdPartition.partition(), time.milliseconds() - startTimeMs); + brokerTopicStats.allTopicsStats().recordRemoteLogSizeComputationTime(topicIdPartition.topic(), brokerTopicMetrics.remoteLogSizeComputationTime()); Review Comment: could go in a single function RequestHandler with name updateRemoteLogSizeComputationTime(), similar to existing updateBytesOut(). This will ensure that in future folks don't forget to update allTopics if they update the broekrTopic metric. ########## core/src/main/scala/kafka/server/KafkaRequestHandler.scala: ########## @@ -529,20 +564,33 @@ class BrokerTopicMetrics(name: Option[String], configOpt: java.util.Optional[Kaf } } -class BrokerTopicAggregatedMetric() { +class BrokerTopicAggregatedMetric(allTopics: Boolean = false) { private val partitionMetricValues = new ConcurrentHashMap[Int, Long]() + private val topicMetricValues = new ConcurrentHashMap[String, Long]() def setPartitionMetricValue(partition: Int, partitionValue: Long): Unit = { partitionMetricValues.put(partition, partitionValue) } + def setTopicMetricValue(topic: String, topicValue: Long): Unit = { Review Comment: do we want to remove the topic metric after the topic is deleted? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org