[ https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=708335&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-708335 ]
ASF GitHub Bot logged work on BEAM-13015: ----------------------------------------- Author: ASF GitHub Bot Created on: 13/Jan/22 14:19 Start Date: 13/Jan/22 14:19 Worklog Time Spent: 10m Work Description: scwhittle commented on a change in pull request #16495: URL: https://github.com/apache/beam/pull/16495#discussion_r783999225 ########## File path: sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java ########## @@ -160,4 +160,20 @@ private void testCache(Cache<String, String> cache) { assertEquals("value2", cache.computeIfAbsent("key2", (unused) -> "value2")); assertEquals("value2", cache.peek("key2")); } + + @Test + public void testDescribeStats() throws Exception { + Cache<Integer, WeightedValue<String>> cache = Caches.forMaximumBytes(1000 * 1048576L); + for (int i = 0; i < 100; ++i) { + cache.computeIfAbsent(i, (key) -> WeightedValue.of("value", 1048576L)); + cache.peek(i); + cache.put(100 + i, WeightedValue.of("value", 1048576L)); + } + + assertThat(cache.describeStats(), containsString("used/max 200/1000 MB")); Review comment: would be good to have some test of the correctness of the used bytes as item weight is updated or evictions occur ########## File path: sdks/java/core/src/main/java/org/apache/beam/sdk/util/WeightedValue.java ########## @@ -47,4 +49,26 @@ public long getWeight() { public T getValue() { return value; } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof WeightedValue)) { Review comment: check for null? ########## File path: sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java ########## @@ -57,92 +65,58 @@ public static long weigh(Object o) { /** An eviction listener that reduces the size of entries that are {@link Shrinkable}. */ @VisibleForTesting - static class ShrinkOnEviction implements RemovalListener<CompositeKey, Object> { + static class ShrinkOnEviction implements RemovalListener<CompositeKey, WeightedValue<Object>> { private final org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache< - CompositeKey, Object> + CompositeKey, WeightedValue<Object>> cache; + private final LongAdder weightInBytes; - ShrinkOnEviction(CacheBuilder<Object, Object> cacheBuilder) { + ShrinkOnEviction( + CacheBuilder<Object, WeightedValue<Object>> cacheBuilder, LongAdder weightInBytes) { this.cache = cacheBuilder.removalListener(this).build(); + this.weightInBytes = weightInBytes; } public org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache< - CompositeKey, Object> + CompositeKey, WeightedValue<Object>> getCache() { return cache; } @Override - public void onRemoval(RemovalNotification<CompositeKey, Object> removalNotification) { + public void onRemoval( + RemovalNotification<CompositeKey, WeightedValue<Object>> removalNotification) { + weightInBytes.add(-removalNotification.getValue().getWeight()); if (removalNotification.wasEvicted()) { - if (!(removalNotification.getValue() instanceof Cache.Shrinkable)) { + if (!(removalNotification.getValue().getValue() instanceof Cache.Shrinkable)) { return; } - Object updatedEntry = ((Shrinkable<?>) removalNotification.getValue()).shrink(); + Object updatedEntry = ((Shrinkable<?>) removalNotification.getValue().getValue()).shrink(); if (updatedEntry != null) { - cache.put(removalNotification.getKey(), updatedEntry); + cache.put(removalNotification.getKey(), addWeightedValue(updatedEntry, weightInBytes)); } } } } /** A cache that never stores any values. */ public static <K, V> Cache<K, V> noop() { - // We specifically use Guava cache since it allows for recursive computeIfAbsent calls - // preventing deadlock from occurring when a loading function mutates the underlying cache - return (Cache<K, V>) - forCache(new ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(0)).getCache()); + return forMaximumBytes(0L); } /** A cache that never evicts any values. */ public static <K, V> Cache<K, V> eternal() { - // We specifically use Guava cache since it allows for recursive computeIfAbsent calls - // preventing deadlock from occurring when a loading function mutates the underlying cache - return (Cache<K, V>) - forCache( - new ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(Long.MAX_VALUE)).getCache()); + return forMaximumBytes(Long.MAX_VALUE); } /** * Uses the specified {@link PipelineOptions} to configure and return a cache instance based upon * parameters within {@link SdkHarnessOptions}. */ public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) { - // We specifically use Guava cache since it allows for recursive computeIfAbsent calls - // preventing deadlock from occurring when a loading function mutates the underlying cache - return (Cache<K, V>) - forCache( - new ShrinkOnEviction( - CacheBuilder.newBuilder() - .maximumWeight( - options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() - * 1024L - * 1024L - / WEIGHT_RATIO) - .weigher( - new Weigher<Object, Object>() { - - @Override - public int weigh(Object key, Object value) { - long size; - if (value instanceof Weighted) { - size = Caches.weigh(key) + ((Weighted) value).getWeight(); - } else { - size = Caches.weigh(key) + Caches.weigh(value); - } - size = size / WEIGHT_RATIO + 1; - if (size >= Integer.MAX_VALUE) { - LOG.warn( - "Entry with size {} MiBs inserted into the cache. This is larger than the maximum individual entry size of {} MiBs. The cache will under report its memory usage by the difference. This may lead to OutOfMemoryErrors.", - (size / 1048576L) + 1, - 2 * WEIGHT_RATIO * 1024); - return Integer.MAX_VALUE; - } - return (int) size; - } - })) - .getCache()); + return forMaximumBytes( + options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 1048576L); Review comment: nit: I think 1024* 1024 is easier to eyeball correctness -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org Issue Time Tracking ------------------- Worklog Id: (was: 708335) Time Spent: 44h (was: 43h 50m) > Optimize Java SDK harness > ------------------------- > > Key: BEAM-13015 > URL: https://issues.apache.org/jira/browse/BEAM-13015 > Project: Beam > Issue Type: Improvement > Components: sdk-java-harness > Reporter: Luke Cwik > Assignee: Luke Cwik > Priority: P2 > Time Spent: 44h > Remaining Estimate: 0h > > Use profiling tools to remove bundle processing overhead in the SDK harness. -- This message was sent by Atlassian Jira (v8.20.1#820001)