divijvaidya commented on code in PR #15133: URL: https://github.com/apache/kafka/pull/15133#discussion_r1461999960
########## core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala: ########## @@ -245,403 +251,438 @@ class KafkaRequestHandlerTest { @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testSingularCopyLagBytesMetric(systemRemoteStorageEnabled: Boolean): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics(systemRemoteStorageEnabled) + val props = kafka.utils.TestUtils.createDummyBrokerConfig() + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, systemRemoteStorageEnabled.toString) + val brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props))) + val brokerTopicMetrics = brokerTopicStats.topicStats(topic) if (systemRemoteStorageEnabled) { - brokerTopicMetrics.recordRemoteCopyLagBytes(0, 100); - brokerTopicMetrics.recordRemoteCopyLagBytes(1, 150); - brokerTopicMetrics.recordRemoteCopyLagBytes(2, 250); + brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 100) + brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 150) + brokerTopicStats.recordRemoteCopyLagBytes(topic, 2, 250) assertEquals(500, brokerTopicMetrics.remoteCopyLagBytes) + assertEquals(500, brokerTopicStats.allTopicsStats.remoteCopyLagBytes) + brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 100) + assertEquals(600, brokerTopicStats.allTopicsStats.remoteCopyLagBytes) } else { assertEquals(None, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)) + assertEquals(None, brokerTopicStats.allTopicsStats.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_COPY_LAG_BYTES_METRIC.getName)) } } @Test def testMultipleCopyLagBytesMetrics(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteCopyLagBytes(0, 1); - brokerTopicMetrics.recordRemoteCopyLagBytes(1, 2); - brokerTopicMetrics.recordRemoteCopyLagBytes(2, 3); + brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 1) + brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 2) + brokerTopicStats.recordRemoteCopyLagBytes(topic, 2, 3) - brokerTopicMetrics.recordRemoteCopyLagBytes(0, 4); - brokerTopicMetrics.recordRemoteCopyLagBytes(1, 5); - brokerTopicMetrics.recordRemoteCopyLagBytes(2, 6); + brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 4) + brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 5) + brokerTopicStats.recordRemoteCopyLagBytes(topic, 2, 6) assertEquals(15, brokerTopicMetrics.remoteCopyLagBytes) + assertEquals(15, allTopicMetrics.remoteCopyLagBytes) + brokerTopicStats.recordRemoteCopyLagBytes(topic2, 2, 5) + assertEquals(20, allTopicMetrics.remoteCopyLagBytes) } @Test def testCopyLagBytesMetricWithPartitionExpansion(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteCopyLagBytes(0, 1); - brokerTopicMetrics.recordRemoteCopyLagBytes(1, 2); + brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 1) + brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteCopyLagBytes) + assertEquals(3, allTopicMetrics.remoteCopyLagBytes) - brokerTopicMetrics.recordRemoteCopyLagBytes(2, 3); + brokerTopicStats.recordRemoteCopyLagBytes(topic, 2, 3) assertEquals(6, brokerTopicMetrics.remoteCopyLagBytes) + assertEquals(6, allTopicMetrics.remoteCopyLagBytes) + brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 1) + assertEquals(7, allTopicMetrics.remoteCopyLagBytes) } @Test def testCopyLagBytesMetricWithPartitionShrinking(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteCopyLagBytes(0, 1); - brokerTopicMetrics.recordRemoteCopyLagBytes(1, 2); + brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 1) + brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteCopyLagBytes) + assertEquals(3, allTopicMetrics.remoteCopyLagBytes) - brokerTopicMetrics.removeRemoteCopyLagBytes(1); + brokerTopicStats.removeRemoteCopyLagBytes(topic, 1) assertEquals(1, brokerTopicMetrics.remoteCopyLagBytes) + assertEquals(1, allTopicMetrics.remoteCopyLagBytes) + + brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 1) + assertEquals(2, allTopicMetrics.remoteCopyLagBytes) } @Test def testCopyLagBytesMetricWithRemovingNonexistentPartitions(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteCopyLagBytes(0, 1); - brokerTopicMetrics.recordRemoteCopyLagBytes(1, 2); + brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 1) + brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteCopyLagBytes) + assertEquals(3, allTopicMetrics.remoteCopyLagBytes) - brokerTopicMetrics.removeRemoteCopyLagBytes(3); + + brokerTopicStats.removeRemoteCopyLagBytes(topic, 3) assertEquals(3, brokerTopicMetrics.remoteCopyLagBytes) + assertEquals(3, allTopicMetrics.remoteCopyLagBytes) } @Test def testCopyLagBytesMetricClear(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteCopyLagBytes(0, 1); - brokerTopicMetrics.recordRemoteCopyLagBytes(1, 2); + brokerTopicStats.recordRemoteCopyLagBytes(topic, 0, 1) + brokerTopicStats.recordRemoteCopyLagBytes(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteCopyLagBytes) + assertEquals(3, allTopicMetrics.remoteCopyLagBytes) - brokerTopicMetrics.close() + brokerTopicStats.close() assertEquals(0, brokerTopicMetrics.remoteCopyLagBytes) + assertEquals(0, allTopicMetrics.remoteCopyLagBytes) + + brokerTopicStats.recordRemoteCopyLagBytes(topic2, 0, 1) + assertEquals(1, allTopicMetrics.remoteCopyLagBytes) } @Test def testMultipleCopyLagSegmentsMetrics(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteCopyLagSegments(0, 1); - brokerTopicMetrics.recordRemoteCopyLagSegments(1, 2); - brokerTopicMetrics.recordRemoteCopyLagSegments(2, 3); + brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 1) + brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 2) + brokerTopicStats.recordRemoteCopyLagSegments(topic, 2, 3) - brokerTopicMetrics.recordRemoteCopyLagSegments(0, 4); - brokerTopicMetrics.recordRemoteCopyLagSegments(1, 5); - brokerTopicMetrics.recordRemoteCopyLagSegments(2, 6); + brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 4) + brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 5) + brokerTopicStats.recordRemoteCopyLagSegments(topic, 2, 6) assertEquals(15, brokerTopicMetrics.remoteCopyLagSegments) + assertEquals(15, allTopicMetrics.remoteCopyLagSegments) + + brokerTopicStats.recordRemoteCopyLagSegments(topic2, 0, 1) + assertEquals(16, allTopicMetrics.remoteCopyLagSegments) } @Test def testCopyLagSegmentsMetricWithPartitionExpansion(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteCopyLagSegments(0, 1); - brokerTopicMetrics.recordRemoteCopyLagSegments(1, 2); + brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 1) + brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteCopyLagSegments) + assertEquals(3, allTopicMetrics.remoteCopyLagSegments) - brokerTopicMetrics.recordRemoteCopyLagSegments(2, 3); + brokerTopicStats.recordRemoteCopyLagSegments(topic, 2, 3) assertEquals(6, brokerTopicMetrics.remoteCopyLagSegments) + assertEquals(6, allTopicMetrics.remoteCopyLagSegments) } @Test def testCopyLagSegmentsMetricWithPartitionShrinking(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteCopyLagSegments(0, 1); - brokerTopicMetrics.recordRemoteCopyLagSegments(1, 2); + brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 1) + brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteCopyLagSegments) + assertEquals(3, allTopicMetrics.remoteCopyLagSegments) - brokerTopicMetrics.removeRemoteCopyLagSegments(1); + brokerTopicStats.removeRemoteCopyLagSegments(topic, 1) assertEquals(1, brokerTopicMetrics.remoteCopyLagSegments) + assertEquals(1, allTopicMetrics.remoteCopyLagSegments) } @Test def testCopyLagSegmentsMetricWithRemovingNonexistentPartitions(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteCopyLagSegments(0, 1); - brokerTopicMetrics.recordRemoteCopyLagSegments(1, 2); + brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 1) + brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteCopyLagSegments) + assertEquals(3, allTopicMetrics.remoteCopyLagSegments) - brokerTopicMetrics.removeRemoteCopyLagSegments(3); + brokerTopicStats.removeRemoteCopyLagSegments(topic, 3) assertEquals(3, brokerTopicMetrics.remoteCopyLagSegments) + assertEquals(3, allTopicMetrics.remoteCopyLagSegments) } @Test def testCopyLagSegmentsMetricClear(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteCopyLagSegments(0, 1); - brokerTopicMetrics.recordRemoteCopyLagSegments(1, 2); + brokerTopicStats.recordRemoteCopyLagSegments(topic, 0, 1) + brokerTopicStats.recordRemoteCopyLagSegments(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteCopyLagSegments) + assertEquals(3, allTopicMetrics.remoteCopyLagSegments) - brokerTopicMetrics.close() + brokerTopicStats.close() assertEquals(0, brokerTopicMetrics.remoteCopyLagSegments) + assertEquals(0, allTopicMetrics.remoteCopyLagSegments) + } @Test def testMultipleDeleteLagBytesMetrics(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 1) + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 2) + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 2, 3) - brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 1); - brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 2); - brokerTopicMetrics.recordRemoteDeleteLagBytes(2, 3); - - brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 4); - brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 5); - brokerTopicMetrics.recordRemoteDeleteLagBytes(2, 6); + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 4) + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 5) + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 2, 6) assertEquals(15, brokerTopicMetrics.remoteDeleteLagBytes) + assertEquals(15, allTopicMetrics.remoteDeleteLagBytes) + + brokerTopicStats.recordRemoteDeleteLagBytes(topic2, 0, 1) + assertEquals(16, allTopicMetrics.remoteDeleteLagBytes) } @Test def testDeleteLagBytesMetricWithPartitionExpansion(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 1); - brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 2); + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 1) + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteDeleteLagBytes) + assertEquals(3, allTopicMetrics.remoteDeleteLagBytes) - brokerTopicMetrics.recordRemoteDeleteLagBytes(2, 3); + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 2, 3) assertEquals(6, brokerTopicMetrics.remoteDeleteLagBytes) + assertEquals(6, allTopicMetrics.remoteDeleteLagBytes) } @Test def testDeleteLagBytesMetricWithPartitionShrinking(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 1); - brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 2); + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 1) + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteDeleteLagBytes) + assertEquals(3, allTopicMetrics.remoteDeleteLagBytes) - brokerTopicMetrics.removeRemoteDeleteLagBytes(1); + brokerTopicStats.removeRemoteDeleteLagBytes(topic, 1) assertEquals(1, brokerTopicMetrics.remoteDeleteLagBytes) + assertEquals(1, allTopicMetrics.remoteDeleteLagBytes) } @Test def testDeleteLagBytesMetricWithRemovingNonexistentPartitions(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 1); - brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 2); + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 1) + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteDeleteLagBytes) + assertEquals(3, allTopicMetrics.remoteDeleteLagBytes) - brokerTopicMetrics.removeRemoteDeleteLagBytes(3); + brokerTopicStats.removeRemoteDeleteLagBytes(topic, 3) assertEquals(3, brokerTopicMetrics.remoteDeleteLagBytes) + assertEquals(3, allTopicMetrics.remoteDeleteLagBytes) } @Test def testDeleteLagBytesMetricClear(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteDeleteLagBytes(0, 1); - brokerTopicMetrics.recordRemoteDeleteLagBytes(1, 2); + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 0, 1) + brokerTopicStats.recordRemoteDeleteLagBytes(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteDeleteLagBytes) + assertEquals(3, allTopicMetrics.remoteDeleteLagBytes) - brokerTopicMetrics.close() + brokerTopicStats.close() assertEquals(0, brokerTopicMetrics.remoteDeleteLagBytes) + assertEquals(0, allTopicMetrics.remoteDeleteLagBytes) } @Test def testMultipleDeleteLagSegmentsMetrics(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 1); - brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 2); - brokerTopicMetrics.recordRemoteDeleteLagSegments(2, 3); + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 1) + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 2) + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 2, 3) - brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 4); - brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 5); - brokerTopicMetrics.recordRemoteDeleteLagSegments(2, 6); + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 4) + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 5) + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 2, 6) assertEquals(15, brokerTopicMetrics.remoteDeleteLagSegments) + assertEquals(15, allTopicMetrics.remoteDeleteLagSegments) + + brokerTopicStats.recordRemoteDeleteLagSegments(topic2, 1, 5) + assertEquals(20, allTopicMetrics.remoteDeleteLagSegments) } @Test def testDeleteLagSegmentsMetricWithPartitionExpansion(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 1); - brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 2); + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 1) + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteDeleteLagSegments) + assertEquals(3, allTopicMetrics.remoteDeleteLagSegments) - brokerTopicMetrics.recordRemoteDeleteLagSegments(2, 3); + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 2, 3) assertEquals(6, brokerTopicMetrics.remoteDeleteLagSegments) + assertEquals(6, allTopicMetrics.remoteDeleteLagSegments) } @Test def testDeleteLagSegmentsMetricWithPartitionShrinking(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 1); - brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 2); + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 1) + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteDeleteLagSegments) + assertEquals(3, allTopicMetrics.remoteDeleteLagSegments) - brokerTopicMetrics.removeRemoteDeleteLagSegments(1); + brokerTopicStats.removeRemoteDeleteLagSegments(topic, 1) assertEquals(1, brokerTopicMetrics.remoteDeleteLagSegments) + assertEquals(1, allTopicMetrics.remoteDeleteLagSegments) } @Test def testDeleteLagSegmentsMetricWithRemovingNonexistentPartitions(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 1); - brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 2); + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 1) + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteDeleteLagSegments) + assertEquals(3, allTopicMetrics.remoteDeleteLagSegments) - brokerTopicMetrics.removeRemoteDeleteLagSegments(3); + brokerTopicStats.removeRemoteDeleteLagSegments(topic, 3) assertEquals(3, brokerTopicMetrics.remoteDeleteLagSegments) + assertEquals(3, allTopicMetrics.remoteDeleteLagSegments) } @Test def testDeleteLagSegmentsMetricClear(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteDeleteLagSegments(0, 1); - brokerTopicMetrics.recordRemoteDeleteLagSegments(1, 2); + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 0, 1) + brokerTopicStats.recordRemoteDeleteLagSegments(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteDeleteLagSegments) + assertEquals(3, allTopicMetrics.remoteDeleteLagSegments) - brokerTopicMetrics.close() + brokerTopicStats.close() assertEquals(0, brokerTopicMetrics.remoteDeleteLagSegments) + assertEquals(0, allTopicMetrics.remoteDeleteLagSegments) } @Test def testRemoteLogMetadataCount(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - assertEquals(0, brokerTopicMetrics.remoteLogMetadataCount) - brokerTopicMetrics.recordRemoteLogMetadataCount(0, 1) + assertEquals(0, allTopicMetrics.remoteLogMetadataCount) + brokerTopicStats.recordRemoteLogMetadataCount(topic, 0, 1) assertEquals(1, brokerTopicMetrics.remoteLogMetadataCount) + assertEquals(1, allTopicMetrics.remoteLogMetadataCount) - brokerTopicMetrics.recordRemoteLogMetadataCount(1, 2) - brokerTopicMetrics.recordRemoteLogMetadataCount(2, 3) + brokerTopicStats.recordRemoteLogMetadataCount(topic, 1, 2) + brokerTopicStats.recordRemoteLogMetadataCount(topic, 2, 3) assertEquals(6, brokerTopicMetrics.remoteLogMetadataCount) + assertEquals(6, allTopicMetrics.remoteLogMetadataCount) - brokerTopicMetrics.close() + brokerTopicStats.close() assertEquals(0, brokerTopicMetrics.remoteLogMetadataCount) + assertEquals(0, allTopicMetrics.remoteLogMetadataCount) } @ParameterizedTest @ValueSource(booleans = Array(true, false)) def testSingularLogSizeBytesMetric(systemRemoteStorageEnabled: Boolean): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics(systemRemoteStorageEnabled) - + val props = kafka.utils.TestUtils.createDummyBrokerConfig() + props.setProperty(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, systemRemoteStorageEnabled.toString) + val brokerTopicStats = new BrokerTopicStats(java.util.Optional.of(KafkaConfig.fromProps(props))) + val brokerTopicMetrics = brokerTopicStats.topicStats(topic) if (systemRemoteStorageEnabled) { - brokerTopicMetrics.recordRemoteLogSizeBytes(0, 100); - brokerTopicMetrics.recordRemoteLogSizeBytes(1, 150); - brokerTopicMetrics.recordRemoteLogSizeBytes(2, 250); + brokerTopicStats.recordRemoteLogSizeBytes(topic, 0, 100) + brokerTopicStats.recordRemoteLogSizeBytes(topic, 1, 150) + brokerTopicStats.recordRemoteLogSizeBytes(topic, 2, 250) assertEquals(500, brokerTopicMetrics.remoteLogSizeBytes) + assertEquals(500, brokerTopicStats.allTopicsStats.remoteLogSizeBytes) + + brokerTopicStats.recordRemoteLogSizeBytes(topic2, 0, 100) + assertEquals(600, brokerTopicStats.allTopicsStats.remoteLogSizeBytes) } else { assertEquals(None, brokerTopicMetrics.metricGaugeMap.get(RemoteStorageMetrics.REMOTE_LOG_SIZE_BYTES_METRIC.getName)) } } @Test def testMultipleLogSizeBytesMetrics(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() + brokerTopicStats.recordRemoteLogSizeBytes(topic, 0, 1) + brokerTopicStats.recordRemoteLogSizeBytes(topic, 1, 2) + brokerTopicStats.recordRemoteLogSizeBytes(topic, 2, 3) - brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1); - brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2); - brokerTopicMetrics.recordRemoteLogSizeBytes(2, 3); - - brokerTopicMetrics.recordRemoteLogSizeBytes(0, 4); - brokerTopicMetrics.recordRemoteLogSizeBytes(1, 5); - brokerTopicMetrics.recordRemoteLogSizeBytes(2, 6); + brokerTopicStats.recordRemoteLogSizeBytes(topic, 0, 4) + brokerTopicStats.recordRemoteLogSizeBytes(topic, 1, 5) + brokerTopicStats.recordRemoteLogSizeBytes(topic, 2, 6) assertEquals(15, brokerTopicMetrics.remoteLogSizeBytes) + assertEquals(15, allTopicMetrics.remoteLogSizeBytes) } @Test def testLogSizeBytesMetricWithPartitionExpansion(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1); - brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2); + brokerTopicStats.recordRemoteLogSizeBytes(topic, 0, 1) + brokerTopicStats.recordRemoteLogSizeBytes(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes) + assertEquals(3, allTopicMetrics.remoteLogSizeBytes) - brokerTopicMetrics.recordRemoteLogSizeBytes(2, 3); + brokerTopicStats.recordRemoteLogSizeBytes(topic, 2, 3) assertEquals(6, brokerTopicMetrics.remoteLogSizeBytes) + assertEquals(6, allTopicMetrics.remoteLogSizeBytes) } @Test def testLogSizeBytesMetricWithPartitionShrinking(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1); - brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2); + brokerTopicStats.recordRemoteLogSizeBytes(topic, 0, 1) + brokerTopicStats.recordRemoteLogSizeBytes(topic, 1, 2) assertEquals(3, brokerTopicMetrics.remoteLogSizeBytes) + assertEquals(3, allTopicMetrics.remoteLogSizeBytes) - brokerTopicMetrics.removeRemoteLogSizeBytes(1); + brokerTopicStats.removeRemoteLogSizeBytes(topic, 1) assertEquals(1, brokerTopicMetrics.remoteLogSizeBytes) + assertEquals(1, allTopicMetrics.remoteLogSizeBytes) } @Test def testLogSizeBytesMetricWithRemovingNonexistentPartitions(): Unit = { - val brokerTopicMetrics = setupBrokerTopicMetrics() - - brokerTopicMetrics.recordRemoteLogSizeBytes(0, 1); - brokerTopicMetrics.recordRemoteLogSizeBytes(1, 2); +// val props = kafka.utils.TestUtils.createDummyBrokerConfig() Review Comment: nit delete this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org