ableegoldman commented on code in PR #12758: URL: https://github.com/apache/kafka/pull/12758#discussion_r998896253
########## streams/src/main/java/org/apache/kafka/streams/internals/StreamsConfigUtils.java: ########## @@ -66,4 +73,28 @@ public static boolean eosEnabled(final ProcessingMode processingMode) { return processingMode == ProcessingMode.EXACTLY_ONCE_ALPHA || processingMode == ProcessingMode.EXACTLY_ONCE_V2; } + + @SuppressWarnings("deprecation") + public static long getTotalCacheSize(final StreamsConfig config) { Review Comment: Idk if this would actually be cleaner or not, but what if we consolidated the logic here & in `TopologyConfig` so we don't have to perform the same logic twice -- maybe by changing the input parameters of this to `stateStoreCacheMaxBytesValue` and `cacheMaxBytesBufferingValue`, then you can just pass in the values from either the `globalAppConfigs` or the `topologyConfigs` over in the `TopologyConfig` class -- I think that'd be somewhat cleaner, WDYT? ########## streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java: ########## @@ -925,7 +925,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata, streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler; delegatingStateRestoreListener = new DelegatingStateRestoreListener(); - totalCacheSize = applicationConfigs.getLong(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG); + totalCacheSize = applicationConfigs.getLong(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG); Review Comment: We should use the `getTotalCacheSize` utility method here as well ########## streams/src/main/java/org/apache/kafka/streams/state/internals/NamedCache.java: ########## @@ -182,6 +189,7 @@ synchronized void put(final Bytes key, final LRUCacheEntry value) { dirtyKeys.add(key); } currentSizeBytes += node.size(); + totalCacheSizeSensor.record(currentSizeBytes); Review Comment: Maybe this is a bit paranoid, but I'm a bit worried about a potential performance impact of recording this metric on every cache put/eviction, since `#record` makes a call to the current system time on every invocation (and we've seen issues related to this in the past) WDYT about splitting out the new metric related changes into a separate PR? I'm fine with still moving forward on that and just watching the benchmarks, I just don't want the poor cache config changes to get reverted again should we happen to find some performance drop. Better to keep things as isolated as possible, even if it's annoying to do/ask for (sorry 😅 ) ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ########## @@ -721,6 +721,10 @@ private void handleTaskMigrated(final TaskMigratedException e) { subscribeConsumer(); } + public long getCacheSize() { + return cacheResizeSize.get(); Review Comment: Unfortunately I don't think we can reliably use this variable to read out the current cache size, since we erase it immediately after reading it out to resize the cache. We might need to introduce a new variable if we need the thread's cache size ########## streams/src/main/java/org/apache/kafka/streams/TopologyConfig.java: ########## @@ -64,11 +67,11 @@ public class TopologyConfig extends AbstractConfig { null, Importance.LOW, BUFFERED_RECORDS_PER_PARTITION_DOC) - .define(CACHE_MAX_BYTES_BUFFERING_CONFIG, + .define(STATESTORE_CACHE_MAX_BYTES_CONFIG, Review Comment: I'm _pretty_ sure we'll need to add the new config in addition to the old deprecated one, rather than replacing it here. Otherwise it'll react jankily if we try to pass in/read out the old config (I think..) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org