[ 
https://issues.apache.org/jira/browse/BEAM-13015?focusedWorklogId=708335&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-708335
 ]

ASF GitHub Bot logged work on BEAM-13015:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 13/Jan/22 14:19
            Start Date: 13/Jan/22 14:19
    Worklog Time Spent: 10m 
      Work Description: scwhittle commented on a change in pull request #16495:
URL: https://github.com/apache/beam/pull/16495#discussion_r783999225



##########
File path: 
sdks/java/harness/src/test/java/org/apache/beam/fn/harness/CachesTest.java
##########
@@ -160,4 +160,20 @@ private void testCache(Cache<String, String> cache) {
     assertEquals("value2", cache.computeIfAbsent("key2", (unused) -> 
"value2"));
     assertEquals("value2", cache.peek("key2"));
   }
+
+  @Test
+  public void testDescribeStats() throws Exception {
+    Cache<Integer, WeightedValue<String>> cache = Caches.forMaximumBytes(1000 
* 1048576L);
+    for (int i = 0; i < 100; ++i) {
+      cache.computeIfAbsent(i, (key) -> WeightedValue.of("value", 1048576L));
+      cache.peek(i);
+      cache.put(100 + i, WeightedValue.of("value", 1048576L));
+    }
+
+    assertThat(cache.describeStats(), containsString("used/max 200/1000 MB"));

Review comment:
       would be good to have some test of the correctness of the used bytes as 
item weight is updated or evictions occur

##########
File path: 
sdks/java/core/src/main/java/org/apache/beam/sdk/util/WeightedValue.java
##########
@@ -47,4 +49,26 @@ public long getWeight() {
   public T getValue() {
     return value;
   }
+
+  @Override
+  public boolean equals(@Nullable Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof WeightedValue)) {

Review comment:
       check for null?

##########
File path: 
sdks/java/harness/src/main/java/org/apache/beam/fn/harness/Caches.java
##########
@@ -57,92 +65,58 @@ public static long weigh(Object o) {
 
   /** An eviction listener that reduces the size of entries that are {@link 
Shrinkable}. */
   @VisibleForTesting
-  static class ShrinkOnEviction implements RemovalListener<CompositeKey, 
Object> {
+  static class ShrinkOnEviction implements RemovalListener<CompositeKey, 
WeightedValue<Object>> {
 
     private final 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache<
-            CompositeKey, Object>
+            CompositeKey, WeightedValue<Object>>
         cache;
+    private final LongAdder weightInBytes;
 
-    ShrinkOnEviction(CacheBuilder<Object, Object> cacheBuilder) {
+    ShrinkOnEviction(
+        CacheBuilder<Object, WeightedValue<Object>> cacheBuilder, LongAdder 
weightInBytes) {
       this.cache = cacheBuilder.removalListener(this).build();
+      this.weightInBytes = weightInBytes;
     }
 
     public 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache<
-            CompositeKey, Object>
+            CompositeKey, WeightedValue<Object>>
         getCache() {
       return cache;
     }
 
     @Override
-    public void onRemoval(RemovalNotification<CompositeKey, Object> 
removalNotification) {
+    public void onRemoval(
+        RemovalNotification<CompositeKey, WeightedValue<Object>> 
removalNotification) {
+      weightInBytes.add(-removalNotification.getValue().getWeight());
       if (removalNotification.wasEvicted()) {
-        if (!(removalNotification.getValue() instanceof Cache.Shrinkable)) {
+        if (!(removalNotification.getValue().getValue() instanceof 
Cache.Shrinkable)) {
           return;
         }
-        Object updatedEntry = ((Shrinkable<?>) 
removalNotification.getValue()).shrink();
+        Object updatedEntry = ((Shrinkable<?>) 
removalNotification.getValue().getValue()).shrink();
         if (updatedEntry != null) {
-          cache.put(removalNotification.getKey(), updatedEntry);
+          cache.put(removalNotification.getKey(), 
addWeightedValue(updatedEntry, weightInBytes));
         }
       }
     }
   }
 
   /** A cache that never stores any values. */
   public static <K, V> Cache<K, V> noop() {
-    // We specifically use Guava cache since it allows for recursive 
computeIfAbsent calls
-    // preventing deadlock from occurring when a loading function mutates the 
underlying cache
-    return (Cache<K, V>)
-        forCache(new 
ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(0)).getCache());
+    return forMaximumBytes(0L);
   }
 
   /** A cache that never evicts any values. */
   public static <K, V> Cache<K, V> eternal() {
-    // We specifically use Guava cache since it allows for recursive 
computeIfAbsent calls
-    // preventing deadlock from occurring when a loading function mutates the 
underlying cache
-    return (Cache<K, V>)
-        forCache(
-            new 
ShrinkOnEviction(CacheBuilder.newBuilder().maximumSize(Long.MAX_VALUE)).getCache());
+    return forMaximumBytes(Long.MAX_VALUE);
   }
 
   /**
    * Uses the specified {@link PipelineOptions} to configure and return a 
cache instance based upon
    * parameters within {@link SdkHarnessOptions}.
    */
   public static <K, V> Cache<K, V> fromOptions(PipelineOptions options) {
-    // We specifically use Guava cache since it allows for recursive 
computeIfAbsent calls
-    // preventing deadlock from occurring when a loading function mutates the 
underlying cache
-    return (Cache<K, V>)
-        forCache(
-            new ShrinkOnEviction(
-                    CacheBuilder.newBuilder()
-                        .maximumWeight(
-                            
options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb()
-                                * 1024L
-                                * 1024L
-                                / WEIGHT_RATIO)
-                        .weigher(
-                            new Weigher<Object, Object>() {
-
-                              @Override
-                              public int weigh(Object key, Object value) {
-                                long size;
-                                if (value instanceof Weighted) {
-                                  size = Caches.weigh(key) + ((Weighted) 
value).getWeight();
-                                } else {
-                                  size = Caches.weigh(key) + 
Caches.weigh(value);
-                                }
-                                size = size / WEIGHT_RATIO + 1;
-                                if (size >= Integer.MAX_VALUE) {
-                                  LOG.warn(
-                                      "Entry with size {} MiBs inserted into 
the cache. This is larger than the maximum individual entry size of {} MiBs. 
The cache will under report its memory usage by the difference. This may lead 
to OutOfMemoryErrors.",
-                                      (size / 1048576L) + 1,
-                                      2 * WEIGHT_RATIO * 1024);
-                                  return Integer.MAX_VALUE;
-                                }
-                                return (int) size;
-                              }
-                            }))
-                .getCache());
+    return forMaximumBytes(
+        options.as(SdkHarnessOptions.class).getMaxCacheMemoryUsageMb() * 
1048576L);

Review comment:
       nit: I think 1024* 1024 is easier to eyeball correctness




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 708335)
    Time Spent: 44h  (was: 43h 50m)

> Optimize Java SDK harness
> -------------------------
>
>                 Key: BEAM-13015
>                 URL: https://issues.apache.org/jira/browse/BEAM-13015
>             Project: Beam
>          Issue Type: Improvement
>          Components: sdk-java-harness
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: P2
>          Time Spent: 44h
>  Remaining Estimate: 0h
>
> Use profiling tools to remove bundle processing overhead in the SDK harness.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to