ableegoldman commented on code in PR #12758:
URL: https://github.com/apache/kafka/pull/12758#discussion_r998896253


##########
streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java:
##########
@@ -66,4 +73,28 @@ public static boolean eosEnabled(final ProcessingMode 
processingMode) {
         return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA ||
             processingMode == ProcessingMode.EXACTLY_ONCE_V2;
     }
+
+    @SuppressWarnings("deprecation")
+    public static long getTotalCacheSize(final StreamsConfig config) {

Review Comment:
   Idk if this would actually be cleaner or not, but what if we consolidated 
the logic here & in `TopologyConfig` so we don't have to perform the same logic 
twice -- maybe by changing the input parameters of this to 
`stateStoreCacheMaxBytesValue`  and `cacheMaxBytesBufferingValue`, then you can 
just pass in the values from either the `globalAppConfigs` or the 
`topologyConfigs` over in the `TopologyConfig` class -- I think that'd be 
somewhat cleaner, WDYT?



##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -925,7 +925,7 @@ private KafkaStreams(final TopologyMetadata 
topologyMetadata,
         streamsUncaughtExceptionHandler = 
this::defaultStreamsUncaughtExceptionHandler;
         delegatingStateRestoreListener = new DelegatingStateRestoreListener();
 
-        totalCacheSize = 
applicationConfigs.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG);
+        totalCacheSize = 
applicationConfigs.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG);

Review Comment:
   We  should use the `getTotalCacheSize` utility method here as well



##########
streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java:
##########
@@ -182,6 +189,7 @@ synchronized void put(final Bytes key, final LRUCacheEntry 
value) {
             dirtyKeys.add(key);
         }
         currentSizeBytes += node.size();
+        totalCacheSizeSensor.record(currentSizeBytes);

Review Comment:
   Maybe this is a bit paranoid, but I'm a bit worried about a potential 
performance impact of recording this metric on every cache put/eviction, since 
`#record` makes a call to the current system time on every invocation (and 
we've seen issues related to this in the past)
   
   WDYT about splitting out the new metric related changes into a separate PR? 
I'm fine with still moving forward on that and just watching the benchmarks, I 
just don't want the poor cache config changes to get reverted again should we 
happen to find some performance drop. Better to keep things as isolated as 
possible, even if it's annoying to do/ask for (sorry 😅 )



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -721,6 +721,10 @@ private void handleTaskMigrated(final 
TaskMigratedException e) {
         subscribeConsumer();
     }
 
+    public long getCacheSize() {
+        return cacheResizeSize.get();

Review Comment:
   Unfortunately I don't think we can reliably use this variable to read out 
the current cache size, since we erase it immediately after reading it out to 
resize the cache. We might need to introduce a new variable if we need the 
thread's cache size



##########
streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java:
##########
@@ -64,11 +67,11 @@ public class TopologyConfig extends AbstractConfig {
                 null,
                 Importance.LOW,
                 BUFFERED_RECORDS_PER_PARTITION_DOC)
-            .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
+            .define(STATESTORE_CACHE_MAX_BYTES_CONFIG,

Review Comment:
   I'm _pretty_ sure we'll need to add the new config in addition to the old 
deprecated one, rather than replacing it here. Otherwise it'll react jankily if 
we try to pass in/read out the old config (I think..)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to