rkhachatryan commented on code in PR #21733: URL: https://github.com/apache/flink/pull/21733#discussion_r1085994402
########## flink-tests/src/test/java/org/apache/flink/test/state/TaskManagerWideRocksDbMemorySharingITCase.java: ########## @@ -125,36 +129,45 @@ public void testBlockCache() throws Exception { 0)); // do some work and check the actual usage of memory - for (int i = 0; i < 10; i++) { + double[] deviations = new double[NUM_MEASUREMENTS]; + for (int i = 0; i < NUM_MEASUREMENTS; i++) { Thread.sleep(50L); - DoubleSummaryStatistics stats = + double[] blockCacheUsages = collectGaugeValues(jobIDs, "rocksdb.block-cache-usage") - .collect(Collectors.summarizingDouble((Double::doubleValue))); - assertEquals( - String.format( - "Block cache usage reported by different tasks varies too much: %s\n" - + "That likely mean that they use different cache objects", - stats), - stats.getMax(), - stats.getMin(), - // some deviation is possible because: - // 1. records are being processed in parallel with requesting metrics - // 2. reporting metrics is not synchronized - 500_000d); + .mapToDouble(value -> value) + .toArray(); assertTrue( String.format( "total block cache usage is too high: %s (limit: %s, effective limit: %s)", - stats, EXPECTED_BLOCK_CACHE_SIZE, EFFECTIVE_LIMIT), - stats.getMax() <= EFFECTIVE_LIMIT); + Arrays.toString(blockCacheUsages), + EXPECTED_BLOCK_CACHE_SIZE, + EFFECTIVE_LIMIT), + Arrays.stream(blockCacheUsages).max().getAsDouble() <= EFFECTIVE_LIMIT); + deviations[i] = new StandardDeviation().evaluate(blockCacheUsages); } - + validateDeviations(deviations); } finally { for (JobID jobID : jobIDs) { cluster.getRestClusterClient().cancel(jobID).get(); } } } + private static void validateDeviations(double[] deviations) { + DescriptiveStatisticsHistogramStatistics percentile = + new DescriptiveStatisticsHistogramStatistics(deviations); + assertTrue( + String.format( + "Block cache usage reported by different tasks varies too much: %s\n" + + "That likely mean that they use different cache objects", + Arrays.toString(deviations)), + // some deviation is possible because: + // 1. records are being processed in parallel with requesting metrics + // 2. reporting metrics is not synchronized + percentile.getQuantile(.50d) <= 10_000d Review Comment: Good point. When running locally with memory sharing inside slot instead of the whole TM, I get the numbers around `256631.79848130155`. Without sharing at all, `0` is reported, but that fails `block-cache-capacity` check to fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org