divijvaidya commented on code in PR #15133:
URL: https://github.com/apache/kafka/pull/15133#discussion_r1458819718


##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -529,20 +564,33 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
   }
 }
 
-class BrokerTopicAggregatedMetric() {
+class BrokerTopicAggregatedMetric(allTopics: Boolean = false) {

Review Comment:
   This class is specific to aggregated metrics on a topic. We are hacking it 
to provide broker level metrics. Maybe a BrokerAggregatedMetric is required?



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -787,9 +787,14 @@ private void copyLogSegment(UnifiedLog log, LogSegment 
segment, long nextSegment
             long bytesLag = log.onlyLocalLogSegmentsSize() - 
log.activeSegment().size();
             String topic = topicIdPartition.topic();
             int partition = topicIdPartition.partition();
-            
brokerTopicStats.topicStats(topic).recordRemoteCopyLagBytes(partition, 
bytesLag);
+
+            BrokerTopicMetrics brokerTopicMetrics = 
brokerTopicStats.topicStats(topic);
+            brokerTopicMetrics.recordRemoteCopyLagBytes(partition, bytesLag);
+            brokerTopicStats.allTopicsStats().recordRemoteCopyLagBytes(topic, 
brokerTopicMetrics.remoteCopyLagBytes());

Review Comment:
   nit
   
   perhaps worth adding a comment on why we are using 
`brokerTopicMetrics.remoteCopyLagBytes()` instead simply updating with 
`bytesLag` here. I think the answer is that we want to update the "topic" level 
metric with an aggregate value across partitions and `bytesLag` just represents 
this partition.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1651,14 +1665,25 @@ private static void 
shutdownAndAwaitTermination(ExecutorService pool, String poo
 
     private void removeRemoteTopicPartitionMetrics(TopicIdPartition 
topicIdPartition) {
         BrokerTopicMetrics topicMetrics = 
brokerTopicStats.topicStats(topicIdPartition.topic());
+        String topic = topicIdPartition.topic();
         int partition = topicIdPartition.partition();
+        // remove the partition metric values
         topicMetrics.removeRemoteCopyLagBytes(partition);
         topicMetrics.removeRemoteCopyLagSegments(partition);
         topicMetrics.removeRemoteDeleteLagBytes(partition);
         topicMetrics.removeRemoteDeleteLagSegments(partition);
         topicMetrics.removeRemoteLogMetadataCount(partition);
         topicMetrics.removeRemoteLogSizeComputationTime(partition);
         topicMetrics.removeRemoteLogSizeBytes(partition);
+
+        // update the all topic metric values

Review Comment:
   suggestion 
   
   change comment to:
   Update the all topic metric values so that we have a sample right for all 
topics metric after removal of partition. In absence of this update, the all 
topic metric will not update until the next sample point is generated for it.



##########
core/src/main/java/kafka/log/remote/RemoteLogManager.java:
##########
@@ -1175,7 +1186,10 @@ private Optional<RetentionSizeData> 
buildRetentionSizeData(long retentionSize,
                         }
                     }
                 }
-                
brokerTopicStats.topicStats(topicIdPartition.topic()).recordRemoteLogSizeComputationTime(topicIdPartition.partition(),
 time.milliseconds() - startTimeMs);
+
+                BrokerTopicMetrics brokerTopicMetrics = 
brokerTopicStats.topicStats(topicIdPartition.topic());
+                
brokerTopicMetrics.recordRemoteLogSizeComputationTime(topicIdPartition.partition(),
 time.milliseconds() - startTimeMs);
+                
brokerTopicStats.allTopicsStats().recordRemoteLogSizeComputationTime(topicIdPartition.topic(),
 brokerTopicMetrics.remoteLogSizeComputationTime());

Review Comment:
   could go in a single function RequestHandler with name 
updateRemoteLogSizeComputationTime(), similar to existing updateBytesOut(). 
This will ensure that in future folks don't forget to update allTopics if they 
update the broekrTopic metric.



##########
core/src/main/scala/kafka/server/KafkaRequestHandler.scala:
##########
@@ -529,20 +564,33 @@ class BrokerTopicMetrics(name: Option[String], configOpt: 
java.util.Optional[Kaf
   }
 }
 
-class BrokerTopicAggregatedMetric() {
+class BrokerTopicAggregatedMetric(allTopics: Boolean = false) {
   private val partitionMetricValues = new ConcurrentHashMap[Int, Long]()
+  private val topicMetricValues = new ConcurrentHashMap[String, Long]()
 
   def setPartitionMetricValue(partition: Int, partitionValue: Long): Unit = {
     partitionMetricValues.put(partition, partitionValue)
   }
 
+  def setTopicMetricValue(topic: String, topicValue: Long): Unit = {

Review Comment:
   do we want to remove the topic metric after the topic is deleted?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to