junrao commented on code in PR #15234:
URL: https://github.com/apache/kafka/pull/15234#discussion_r1460103407


##########
server-common/src/test/java/org/apache/kafka/server/util/timer/SystemTimerReaperTest.java:
##########
@@ -14,10 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.coordinator.group.util;
+package org.apache.kafka.server.util.timer;
 
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.server.util.timer.SystemTimer;
+import org.apache.kafka.server.util.timer.SystemTimerReaper;

Review Comment:
   This is unused. It seems that SystemTimer, Timer and TimerTask are all 
unused.



##########
server/src/main/java/org/apache/kafka/server/metrics/ClientMetricsInstance.java:
##########
@@ -38,6 +39,7 @@ public class ClientMetricsInstance {
     private long lastPushRequestTimestamp;
     private volatile boolean terminating;
     private volatile Errors lastKnownError;
+    private TimerTask expirationTimerTask;

Review Comment:
   Does this need to be volatile since it's written and read by different 
threads?



##########
server/src/test/java/org/apache/kafka/server/ClientMetricsManagerTest.java:
##########
@@ -919,4 +922,101 @@ public void 
testPushTelemetryConcurrentRequestAfterSubscriptionUpdate() throws U
         // 1 request should fail with throttling error.
         assertEquals(1, throttlingErrorCount);
     }
+
+    @Test
+    public void testCacheEviction() throws UnknownHostException, 
InterruptedException {
+        Properties properties = new Properties();
+        properties.put("metrics", 
ClientMetricsConfigs.ALL_SUBSCRIBED_METRICS_CONFIG);
+        properties.put(ClientMetricsConfigs.PUSH_INTERVAL_MS, "100");
+        clientMetricsManager.updateSubscription("sub-1", properties);
+
+        GetTelemetrySubscriptionsRequest request = new 
GetTelemetrySubscriptionsRequest.Builder(
+            new GetTelemetrySubscriptionsRequestData(), true).build();
+
+        GetTelemetrySubscriptionsResponse response = 
clientMetricsManager.processGetTelemetrySubscriptionRequest(
+            request, ClientMetricsTestUtils.requestContext());
+        assertEquals(Errors.NONE, response.error());
+
+        
assertNotNull(clientMetricsManager.clientInstance(response.data().clientInstanceId()));
+        assertEquals(1, clientMetricsManager.expirationTimer().size());
+        // Advance the clock to trigger cache eviction, cache expiry should be 
100 * 3 = 300 ms.
+        clientMetricsManager.expirationTimer().advanceClock(300);
+        assertTimeoutPreemptively(Duration.ofMillis(300), () -> {

Review Comment:
   We advanced the clock by 300 previously, which should have triggered the 
timeout logic. Why do we need to wait for 300ms here?



##########
server/src/main/java/org/apache/kafka/server/ClientMetricsManager.java:
##########
@@ -253,10 +266,18 @@ private ClientMetricsInstance clientInstance(Uuid 
clientInstanceId, RequestConte
                 if (clientInstance.subscriptionVersion() >= 
subscriptionUpdateVersion.get()) {
                     return clientInstance;
                 }
+                // Cancel the existing expiration timer task for the old 
client instance.
+                clientInstance.cancelExpirationTimerTask();

Review Comment:
   Is this call necessary? `clientInstance.updateExpirationTimerTask()` does 
this part and is always called later in the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to