kevin-wu24 commented on code in PR #20356: URL: https://github.com/apache/kafka/pull/20356#discussion_r2279422006
########## core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala: ########## @@ -51,6 +51,60 @@ class KafkaRequestHandlerTest { val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic) val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats + @Test + def testCombinedModeIdlePercent(): Unit = { + val time = Time.SYSTEM + val metricsBroker = new RequestChannelMetrics(java.util.Set.of[ApiKeys]) + val metricsController = new RequestChannelMetrics(java.util.Set.of[ApiKeys]) + val requestChannelBroker = new RequestChannel(10, time, metricsBroker) + val requestChannelController = new RequestChannel(10, time, metricsController) + val apiHandler = mock(classOf[ApiRequestHandler]) + + // Create both pools with 0 threads so we can wire a shared Meter before handlers are created + val brokerPool = new KafkaRequestHandlerPool( + 0, + requestChannelBroker, + apiHandler, + time, + 0, + "RequestHandlerAvgIdlePercent", + isCombinedMode = true + ) + + val controllerPool = new KafkaRequestHandlerPool( + 0, + requestChannelController, + apiHandler, + time, + 0, + "RequestHandlerAvgIdlePercent", + isCombinedMode = true + ) + + try { + val field = classOf[KafkaRequestHandlerPool].getDeclaredField("aggregateIdleMeter") + field.setAccessible(true) + val sharedMeter = field.get(brokerPool).asInstanceOf[Meter] + field.set(controllerPool, sharedMeter) + + brokerPool.resizeThreadPool(4) + controllerPool.resizeThreadPool(4) + + val deadline = System.currentTimeMillis() + 8000 + var value = 0.0 + while (System.currentTimeMillis() < deadline && value == 0.0) { + Thread.sleep(200) + value = sharedMeter.oneMinuteRate() + } + assertTrue(value >= 0.0 && value <= 1.05, s"idle percent should be within [0,1], got $value") Review Comment: Why would there be a measurement error? A measurement of >1 ever indicates a bug in our logic determining the denominator. ########## core/src/test/scala/kafka/server/KafkaRequestHandlerTest.scala: ########## @@ -51,6 +51,60 @@ class KafkaRequestHandlerTest { val brokerTopicMetrics: BrokerTopicMetrics = brokerTopicStats.topicStats(topic) val allTopicMetrics: BrokerTopicMetrics = brokerTopicStats.allTopicsStats + @Test + def testCombinedModeIdlePercent(): Unit = { + val time = Time.SYSTEM + val metricsBroker = new RequestChannelMetrics(java.util.Set.of[ApiKeys]) + val metricsController = new RequestChannelMetrics(java.util.Set.of[ApiKeys]) + val requestChannelBroker = new RequestChannel(10, time, metricsBroker) + val requestChannelController = new RequestChannel(10, time, metricsController) + val apiHandler = mock(classOf[ApiRequestHandler]) + + // Create both pools with 0 threads so we can wire a shared Meter before handlers are created + val brokerPool = new KafkaRequestHandlerPool( + 0, + requestChannelBroker, + apiHandler, + time, + 0, + "RequestHandlerAvgIdlePercent", + isCombinedMode = true + ) + + val controllerPool = new KafkaRequestHandlerPool( + 0, + requestChannelController, + apiHandler, + time, + 0, + "RequestHandlerAvgIdlePercent", + isCombinedMode = true + ) + + try { + val field = classOf[KafkaRequestHandlerPool].getDeclaredField("aggregateIdleMeter") + field.setAccessible(true) + val sharedMeter = field.get(brokerPool).asInstanceOf[Meter] + field.set(controllerPool, sharedMeter) + + brokerPool.resizeThreadPool(4) + controllerPool.resizeThreadPool(4) + + val deadline = System.currentTimeMillis() + 8000 + var value = 0.0 + while (System.currentTimeMillis() < deadline && value == 0.0) { + Thread.sleep(200) + value = sharedMeter.oneMinuteRate() + } + assertTrue(value >= 0.0 && value <= 1.05, s"idle percent should be within [0,1], got $value") Review Comment: Why would there be a measurement error? Ever having a measurement of >1 indicates a bug in our logic determining the denominator. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org