vvcephei commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463128398



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) {
             throw new ProcessorStateException(fatal);
         }
 
-        // Setup metrics before the database is opened, otherwise the metrics 
are not updated
+        // Setup statistics before the database is opened, otherwise the 
statistics are not updated
         // with the measurements from Rocks DB
-        maybeSetUpMetricsRecorder(configs);
+        maybeSetUpStatistics(configs);
 
         openRocksDB(dbOptions, columnFamilyOptions);
         open = true;
+
+        addValueProvidersToMetricsRecorder(configs);
     }
 
-    private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
-        if (userSpecifiedOptions.statistics() == null &&
+    private void maybeSetUpStatistics(final Map<String, Object> configs) {
+        if (userSpecifiedOptions.statistics() != null) {
+            userSpecifiedStatistics = true;
+        }
+        if (!userSpecifiedStatistics &&
             RecordingLevel.forName((String) 
configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
 
-            isStatisticsRegistered = true;
             // metrics recorder will clean up statistics object
             final Statistics statistics = new Statistics();
             userSpecifiedOptions.setStatistics(statistics);
-            metricsRecorder.addStatistics(name, statistics);
+        }
+    }
+
+    private void addValueProvidersToMetricsRecorder(final Map<String, Object> 
configs) {
+        final TableFormatConfig tableFormatConfig = 
userSpecifiedOptions.tableFormatConfig();
+        final Statistics statistics = userSpecifiedStatistics ? null : 
userSpecifiedOptions.statistics();
+        if (tableFormatConfig instanceof 
BlockBasedTableConfigWithAccessibleCache) {
+            final Cache cache = ((BlockBasedTableConfigWithAccessibleCache) 
tableFormatConfig).blockCache();
+            metricsRecorder.addValueProviders(name, db, cache, statistics);
+        } else {
+            metricsRecorder.addValueProviders(name, db, null, statistics);
+            log.warn("A table format configuration is used that does not 
expose the block cache. This means " +
+                "that metrics that relate to the block cache may be wrong if 
the block cache is shared.");
         }

Review comment:
       Ah, after reading your test, I now see the issue. I'd overlooked the 
fact that users would independently construct the table config object AND the 
cache. I see now that this makes it impossible to reliably capture the cache, 
since users have to actually choose to pass our special table config to the 
Options and then pass the Cache to that table config.
   
   This doesn't seem ideal. What do you think about just using reflection 
instead?
   
   ```suggestion
           if (tableFormatConfig instanceof BlockBasedTableConfig) {
               final BlockBasedTableConfig blockBasedTableConfig = 
(BlockBasedTableConfig) tableFormatConfig;
               try {
                   final Field blockCacheField = 
BlockBasedTableConfig.class.getDeclaredField("blockCache_");
                   blockCacheField.setAccessible(true);
                   final Cache nullableBlockCache = (Cache) 
blockCacheField.get(blockBasedTableConfig);
                   metricsRecorder.addValueProviders(name, db, 
nullableBlockCache, statistics);
               } catch (final NoSuchFieldException | IllegalAccessException | 
ClassCastException e) {
                   log.warn("Expected to find and access field 'blockCache_' in 
BlockBasedTableConfig. " +
                                "Probably, an incompatible version of RocksDB 
is being used. " +
                                "Cache will be missing from memory metrics.", 
e);
                   metricsRecorder.addValueProviders(name, db, null, 
statistics);
               }
           } else {
               metricsRecorder.addValueProviders(name, db, null, statistics);
           }
   ```
   
   We would obviously test all the branches here to de-risk the reflection. We 
can also add a test that searches the classpath for implementations of 
TableFormatConfig to ensure we don't miss the memo if RocksDB adds a new 
TableFormatConfig implementation.
   
   Alternative thought, if you don't like the reflection: We would _also_ 
subclass Options and override `org.rocksdb.Options#setTableFormatConfig` to 
check if the passed `TableFormatConfig` is a `BlockBasedTableConfig`, and if 
so, then _we_ wrap it with `BlockBasedTableConfigWithAccessibleCache`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to