cadonna commented on a change in pull request #9098: URL: https://github.com/apache/kafka/pull/9098#discussion_r463617899
########## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java ########## @@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) { throw new ProcessorStateException(fatal); } - // Setup metrics before the database is opened, otherwise the metrics are not updated + // Setup statistics before the database is opened, otherwise the statistics are not updated // with the measurements from Rocks DB - maybeSetUpMetricsRecorder(configs); + maybeSetUpStatistics(configs); openRocksDB(dbOptions, columnFamilyOptions); open = true; + + addValueProvidersToMetricsRecorder(configs); } - private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) { - if (userSpecifiedOptions.statistics() == null && + private void maybeSetUpStatistics(final Map<String, Object> configs) { + if (userSpecifiedOptions.statistics() != null) { + userSpecifiedStatistics = true; + } + if (!userSpecifiedStatistics && RecordingLevel.forName((String) configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) { - isStatisticsRegistered = true; // metrics recorder will clean up statistics object final Statistics statistics = new Statistics(); userSpecifiedOptions.setStatistics(statistics); - metricsRecorder.addStatistics(name, statistics); + } + } + + private void addValueProvidersToMetricsRecorder(final Map<String, Object> configs) { + final TableFormatConfig tableFormatConfig = userSpecifiedOptions.tableFormatConfig(); + final Statistics statistics = userSpecifiedStatistics ? null : userSpecifiedOptions.statistics(); + if (tableFormatConfig instanceof BlockBasedTableConfigWithAccessibleCache) { + final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) tableFormatConfig).blockCache(); + metricsRecorder.addValueProviders(name, db, cache, statistics); + } else { + metricsRecorder.addValueProviders(name, db, null, statistics); + log.warn("A table format configuration is used that does not expose the block cache. This means " + + "that metrics that relate to the block cache may be wrong if the block cache is shared."); } Review comment: I agree with you that it is not ideal and thank you for this lesson on reflection. Indeed, I do not like reflection in this case, because it makes the code too much dependent on RocksDB internals. We should use reflection to check if the public API to configure RocksDB changed in a newer version, but that is another story. I do not understand how the alternative of wrapping `BlockBasedTableConfig` into `BlockBasedTableConfigWithAccessibleCache` should work. Since the cache is not accessible in `BlockBasedTableConfig` it will also not be accessible when it is wrapped in `BlockBasedTableConfigWithAccessibleCache` (despite the name). We need to get the reference to the cache when the cache is set in `BlockBasedTableConfig`. If the cache is already set we can only use reflection. Since the block based table format is the only format in RocksDB that uses the cache, I do not see why a user absolutely needs to pass a new `BlockBasedTableConfig` object. I think for now it is OK to log a warning, and clearly document that the provided `BlockBasedTableConfig` object should be used. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org