cadonna commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463617899



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics 
are not updated
+        // Setup statistics before the database is opened, otherwise the 
statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder(configs);
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {
+            userSpecifiedStatistics = true;
+        }
+        if (!userSpecifiedStatistics &&
             RecordingLevel.forName((String) 
configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
 
-            isStatisticsRegistered = true;
             // metrics recorder will clean up statistics object
             final Statistics statistics = new Statistics();
             userSpecifiedOptions.setStatistics(statistics);
-            metricsRecorder.addStatistics(name, statistics);
+        }
+    }
+
+    private void addValueProvidersToMetricsRecorder(final Map<String, Object> 
configs) {
+        final TableFormatConfig tableFormatConfig = 
userSpecifiedOptions.tableFormatConfig();
+        final Statistics statistics = userSpecifiedStatistics ? null : 
userSpecifiedOptions.statistics();
+        if (tableFormatConfig instanceof 
BlockBasedTableConfigWithAccessibleCache) {
+            final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) 
tableFormatConfig).blockCache();
+            metricsRecorder.addValueProviders(name, db, cache, statistics);
+        } else {
+            metricsRecorder.addValueProviders(name, db, null, statistics);
+            log.warn("A table format configuration is used that does not 
expose the block cache. This means " +
+                "that metrics that relate to the block cache may be wrong if 
the block cache is shared.");
         }

Review comment:
       I agree with you that it is not ideal and thank you for this lesson on 
reflection. 
   
   Indeed, I do not like reflection in this case, because it makes the code too 
much dependent on RocksDB internals. We should use reflection to check if the 
public API to configure RocksDB changed in a newer version, but that is another 
story.
   
   I do not understand how the alternative of wrapping `BlockBasedTableConfig` 
into `BlockBasedTableConfigWithAccessibleCache` should work. Since the cache is 
not accessible in `BlockBasedTableConfig` it will also not be accessible when 
it is wrapped in `BlockBasedTableConfigWithAccessibleCache` (despite the name). 
We need to get the reference to the cache when the cache is set in 
`BlockBasedTableConfig`. If the cache is already set we can only use reflection.
   
   Since the block based table format is the only format in RocksDB that uses 
the cache, I do not see why a user absolutely needs to pass a new 
`BlockBasedTableConfig` object. I think for now it is OK to log a warning, and 
clearly document that the provided `BlockBasedTableConfig` object should be 
used.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to