zentol commented on a change in pull request #16654:
URL: https://github.com/apache/flink/pull/16654#discussion_r681645843



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
##########
@@ -117,71 +126,88 @@ public void testGetThreadInfoStats() throws Exception {
     /** Tests that cached result is reused within refresh interval. */
     @Test
     public void testCachedStatsNotUpdatedWithinRefreshInterval() throws 
Exception {
-        final int requestId2 = 1;
-
-        final JobVertexThreadInfoStats threadInfoStats2 =
-                createThreadInfoStats(requestId2, TIME_GAP, null);
+        final JobVertexThreadInfoStats unusedThreadInfoStats =
+                createThreadInfoStats(1, TIME_GAP, null);
 
         final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
-                        CLEAN_UP_INTERVAL,
                         STATS_REFRESH_INTERVAL,
                         threadInfoStatsDefaultSample,
-                        threadInfoStats2);
+                        unusedThreadInfoStats);
         // stores threadInfoStatsDefaultSample in cache
         doInitialRequestAndVerifyResult(tracker);
         Optional<JobVertexThreadInfoStats> result =
                 tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
-        // cached result is returned instead of threadInfoStats2
+        // cached result is returned instead of unusedThreadInfoStats
         assertEquals(threadInfoStatsDefaultSample, result.get());
     }
 
     /** Tests that cached result is NOT reused after refresh interval. */
     @Test
     public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
-        final Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(10);
-        final long waitingTime = threadInfoStatsRefreshInterval2.toMillis() + 
10;
+        final Duration shortRefreshInterval = Duration.ofMillis(1);
 
-        final int requestId2 = 1;
-        final JobVertexThreadInfoStats threadInfoStats2 =
+        // first entry is in the past, so refresh is triggered immediately 
upon fetching it
+        final JobVertexThreadInfoStats initialThreadInfoStats =
                 createThreadInfoStats(
-                        requestId2, TIME_GAP, 
Collections.singletonList(threadInfoSample));
-
+                        Instant.now().minus(10, ChronoUnit.SECONDS),
+                        REQUEST_ID,
+                        Duration.ofMillis(5),
+                        Collections.singletonList(threadInfoSample));
+        final JobVertexThreadInfoStats threadInfoStatsAfterRefresh =
+                createThreadInfoStats(1, TIME_GAP, 
Collections.singletonList(threadInfoSample));
+
+        // register a CountDownLatch with the cache so we can await refresh of 
the entry
+        CountDownLatch cacheRefreshed = new CountDownLatch(1);
+        Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> 
vertexStatsCache =
+                createCache(CLEAN_UP_INTERVAL, new 
LatchRemovalListener<>(cacheRefreshed));
         final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
                         CLEAN_UP_INTERVAL,
-                        threadInfoStatsRefreshInterval2,
-                        threadInfoStatsDefaultSample,
-                        threadInfoStats2);
-        doInitialRequestAndVerifyResult(tracker);
+                        shortRefreshInterval,
+                        vertexStatsCache,
+                        initialThreadInfoStats,
+                        threadInfoStatsAfterRefresh);
 
-        // ensure that the previous request "expires"
-        Thread.sleep(waitingTime);
+        // no stats yet, but the request triggers async collection of stats
+        assertFalse(tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX).isPresent());
+        // block until the async call completes and the first result is 
available
+        tracker.getResultAvailableFuture().get();
 
-        Optional<JobVertexThreadInfoStats> result =
-                tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
+        // retrieve the entry, triggering the refresh as side effect
+        assertExpectedEqualsReceived(
+                initialThreadInfoStats, tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX));
 
-        assertExpectedEqualsReceived(threadInfoStats2, result);
+        // wait until the entry is refreshed, with generous buffer
+        assertTrue(cacheRefreshed.await(500, TimeUnit.MILLISECONDS));

Review comment:
       ```suggestion
           assertTrue(cacheRefreshed.await());
   ```
   Based on recent discussions we no longer use timeouts in tests.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/threadinfo/JobVertexThreadInfoTrackerTest.java
##########
@@ -117,71 +126,88 @@ public void testGetThreadInfoStats() throws Exception {
     /** Tests that cached result is reused within refresh interval. */
     @Test
     public void testCachedStatsNotUpdatedWithinRefreshInterval() throws 
Exception {
-        final int requestId2 = 1;
-
-        final JobVertexThreadInfoStats threadInfoStats2 =
-                createThreadInfoStats(requestId2, TIME_GAP, null);
+        final JobVertexThreadInfoStats unusedThreadInfoStats =
+                createThreadInfoStats(1, TIME_GAP, null);
 
         final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
-                        CLEAN_UP_INTERVAL,
                         STATS_REFRESH_INTERVAL,
                         threadInfoStatsDefaultSample,
-                        threadInfoStats2);
+                        unusedThreadInfoStats);
         // stores threadInfoStatsDefaultSample in cache
         doInitialRequestAndVerifyResult(tracker);
         Optional<JobVertexThreadInfoStats> result =
                 tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
-        // cached result is returned instead of threadInfoStats2
+        // cached result is returned instead of unusedThreadInfoStats
         assertEquals(threadInfoStatsDefaultSample, result.get());
     }
 
     /** Tests that cached result is NOT reused after refresh interval. */
     @Test
     public void testCachedStatsUpdatedAfterRefreshInterval() throws Exception {
-        final Duration threadInfoStatsRefreshInterval2 = Duration.ofMillis(10);
-        final long waitingTime = threadInfoStatsRefreshInterval2.toMillis() + 
10;
+        final Duration shortRefreshInterval = Duration.ofMillis(1);
 
-        final int requestId2 = 1;
-        final JobVertexThreadInfoStats threadInfoStats2 =
+        // first entry is in the past, so refresh is triggered immediately 
upon fetching it
+        final JobVertexThreadInfoStats initialThreadInfoStats =
                 createThreadInfoStats(
-                        requestId2, TIME_GAP, 
Collections.singletonList(threadInfoSample));
-
+                        Instant.now().minus(10, ChronoUnit.SECONDS),
+                        REQUEST_ID,
+                        Duration.ofMillis(5),
+                        Collections.singletonList(threadInfoSample));
+        final JobVertexThreadInfoStats threadInfoStatsAfterRefresh =
+                createThreadInfoStats(1, TIME_GAP, 
Collections.singletonList(threadInfoSample));
+
+        // register a CountDownLatch with the cache so we can await refresh of 
the entry
+        CountDownLatch cacheRefreshed = new CountDownLatch(1);
+        Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> 
vertexStatsCache =
+                createCache(CLEAN_UP_INTERVAL, new 
LatchRemovalListener<>(cacheRefreshed));
         final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
                         CLEAN_UP_INTERVAL,
-                        threadInfoStatsRefreshInterval2,
-                        threadInfoStatsDefaultSample,
-                        threadInfoStats2);
-        doInitialRequestAndVerifyResult(tracker);
+                        shortRefreshInterval,
+                        vertexStatsCache,
+                        initialThreadInfoStats,
+                        threadInfoStatsAfterRefresh);
 
-        // ensure that the previous request "expires"
-        Thread.sleep(waitingTime);
+        // no stats yet, but the request triggers async collection of stats
+        assertFalse(tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX).isPresent());
+        // block until the async call completes and the first result is 
available
+        tracker.getResultAvailableFuture().get();
 
-        Optional<JobVertexThreadInfoStats> result =
-                tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
+        // retrieve the entry, triggering the refresh as side effect
+        assertExpectedEqualsReceived(
+                initialThreadInfoStats, tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX));
 
-        assertExpectedEqualsReceived(threadInfoStats2, result);
+        // wait until the entry is refreshed, with generous buffer
+        assertTrue(cacheRefreshed.await(500, TimeUnit.MILLISECONDS));
 
-        assertNotSame(result.get(), threadInfoStatsDefaultSample);
+        // verify that we get the second result on the next request
+        Optional<JobVertexThreadInfoStats> result =
+                tracker.getVertexStats(JOB_ID, EXECUTION_JOB_VERTEX);
+        assertExpectedEqualsReceived(threadInfoStatsAfterRefresh, result);
     }
 
     /** Tests that cached results are removed within the cleanup interval. */
     @Test
     public void testCachedStatsCleanedAfterCleanupInterval() throws Exception {
-        final Duration cleanUpInterval2 = Duration.ofMillis(10);
-        final long waitingTime = cleanUpInterval2.toMillis() + 10;
+        final Duration shortCleanUpInterval = Duration.ofMillis(1);
 
+        // register a CountDownLatch with the cache so we can await expiry of 
the entry
+        CountDownLatch cacheExpired = new CountDownLatch(1);
+        Cache<JobVertexThreadInfoTracker.Key, JobVertexThreadInfoStats> 
vertexStatsCache =
+                createCache(shortCleanUpInterval, new 
LatchRemovalListener<>(cacheExpired));
         final JobVertexThreadInfoTracker<JobVertexThreadInfoStats> tracker =
                 createThreadInfoTracker(
-                        cleanUpInterval2, STATS_REFRESH_INTERVAL, 
threadInfoStatsDefaultSample);
-        doInitialRequestAndVerifyResult(tracker);
+                        shortCleanUpInterval,
+                        STATS_REFRESH_INTERVAL,
+                        vertexStatsCache,
+                        threadInfoStatsDefaultSample);
 
-        // wait until we are ready to cleanup
-        Thread.sleep(waitingTime);
+        // no stats yet, but the request triggers async collection of stats
+        assertFalse(tracker.getVertexStats(JOB_ID, 
EXECUTION_JOB_VERTEX).isPresent());
+        // wait until one eviction was registered, with generous buffer
+        assertTrue(cacheExpired.await(1000, TimeUnit.MILLISECONDS));

Review comment:
       ```suggestion
           assertTrue(cacheExpired.await());
   ```
   see above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to