ableegoldman commented on code in PR #12903:
URL: https://github.com/apache/kafka/pull/12903#discussion_r1040224575


##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java:
##########
@@ -81,9 +85,11 @@ public synchronized void resize(final long 
newCacheSizeBytes) {
                 return;
             }
             final CircularIterator<NamedCache> circularIterator = new 
CircularIterator<>(caches.values());
-            while (sizeBytes() > maxCacheSizeBytes) {
+            while (sizeInBytes.get() > maxCacheSizeBytes) {
                 final NamedCache cache = circularIterator.next();
+                final long oldSize = cache.sizeInBytes();

Review Comment:
   probably a relatively unimportant optimization, but we could save two 
`synchronized` calls by having `#evict` return the number of bytes we evicted 
from that NamedCache -- just a thought



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java:
##########
@@ -186,7 +206,13 @@ public LRUCacheEntry delete(final String namespace, final 
Bytes key) {
             return null;
         }
 
-        return cache.delete(key);
+        final LRUCacheEntry entry;
+        synchronized (cache) {
+            final long oldSize = cache.sizeInBytes();
+            entry = cache.delete(key);

Review Comment:
   in this case we can't the change in bytes, but because `delete` has no 
danger of side effects affecting the cache size (eg as in `put` which may 
trigger an eviction, so the delta isn't just the bytes passed in to `put`) we 
can just compute the change in size directly by subtracting the size of this, 
which should be known, right?



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java:
##########
@@ -158,15 +169,24 @@ public void put(final String namespace, final Bytes key, 
final LRUCacheEntry val
         numPuts++;
 
         final NamedCache cache = getOrCreateCache(namespace);
-        cache.put(key, value);
-        maybeEvict(namespace);
+
+        synchronized (cache) {
+            final long oldSize = cache.sizeInBytes();
+            cache.put(key, value);

Review Comment:
   ditto here as well -- just return the change in bytes after the put? 
   
   same for all applicable cases, I'll stop commenting on every one but you get 
the gist



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java:
##########
@@ -133,7 +139,12 @@ public void flush(final String namespace) {
         if (cache == null) {
             return;
         }
-        cache.flush();
+
+        synchronized (cache) {
+            final long oldSize = cache.sizeInBytes();
+            cache.flush();

Review Comment:
   ditto here, `#flush` could return the number of bytes flushed/evicted -- 
although I guess in this case the overhead saved is less because we already 
hold the lock on the `cache` itself
   
   Still, it might encourage more sensible bookkeeping of the cache sizes to 
always return the number of bytes that changed for a given call that was return 
type `void` anyways



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/ThreadCache.java:
##########
@@ -241,35 +267,30 @@ private boolean isOverflowing(final long size) {
     }
 
     long sizeBytes() {
-        long sizeInBytes = 0;
-        for (final NamedCache namedCache : caches.values()) {
-            sizeInBytes += namedCache.sizeInBytes();
-            if (isOverflowing(sizeInBytes)) {
-                return Long.MAX_VALUE;
-            }
-        }
-        return sizeInBytes;
+        return sizeInBytes.get();
     }
 
     synchronized void close(final String namespace) {
         final NamedCache removed = caches.remove(namespace);
         if (removed != null) {
+            sizeInBytes.getAndAdd(-removed.sizeInBytes());
             removed.close();
         }
     }
 
-    private void maybeEvict(final String namespace) {
+    private void maybeEvict(final String namespace, final NamedCache cache) {

Review Comment:
   is `namespace` unused now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to