cadonna commented on a change in pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#discussion_r463617899
##########
File path:
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java
##########
@@ -181,23 +181,39 @@ void openDB(final ProcessorContext context) {
throw new ProcessorStateException(fatal);
}
- // Setup metrics before the database is opened, otherwise the metrics
are not updated
+ // Setup statistics before the database is opened, otherwise the
statistics are not updated
// with the measurements from Rocks DB
- maybeSetUpMetricsRecorder(configs);
+ maybeSetUpStatistics(configs);
openRocksDB(dbOptions, columnFamilyOptions);
open = true;
+
+ addValueProvidersToMetricsRecorder(configs);
}
- private void maybeSetUpMetricsRecorder(final Map<String, Object> configs) {
- if (userSpecifiedOptions.statistics() == null &&
+ private void maybeSetUpStatistics(final Map<String, Object> configs) {
+ if (userSpecifiedOptions.statistics() != null) {
+ userSpecifiedStatistics = true;
+ }
+ if (!userSpecifiedStatistics &&
RecordingLevel.forName((String)
configs.get(METRICS_RECORDING_LEVEL_CONFIG)) == RecordingLevel.DEBUG) {
- isStatisticsRegistered = true;
// metrics recorder will clean up statistics object
final Statistics statistics = new Statistics();
userSpecifiedOptions.setStatistics(statistics);
- metricsRecorder.addStatistics(name, statistics);
+ }
+ }
+
+ private void addValueProvidersToMetricsRecorder(final Map<String, Object>
configs) {
+ final TableFormatConfig tableFormatConfig =
userSpecifiedOptions.tableFormatConfig();
+ final Statistics statistics = userSpecifiedStatistics ? null :
userSpecifiedOptions.statistics();
+ if (tableFormatConfig instanceof
BlockBasedTableConfigWithAccessibleCache) {
+ final Cache cache = ((BlockBasedTableConfigWithAccessibleCache)
tableFormatConfig).blockCache();
+ metricsRecorder.addValueProviders(name, db, cache, statistics);
+ } else {
+ metricsRecorder.addValueProviders(name, db, null, statistics);
+ log.warn("A table format configuration is used that does not
expose the block cache. This means " +
+ "that metrics that relate to the block cache may be wrong if
the block cache is shared.");
}
Review comment:
I agree with you that it is not ideal and thank you for this lesson on
reflection.
Indeed, I do not like reflection in this case, because it makes the code too
much dependent on RocksDB internals. We should use reflection to check if the
public API to configure RocksDB changed in a newer version, but that is another
story.
I do not understand how the alternative of wrapping `BlockBasedTableConfig`
into `BlockBasedTableConfigWithAccessibleCache` should work. Since the cache is
not accessible in `BlockBasedTableConfig` it will also not be accessible when
it is wrapped in `BlockBasedTableConfigWithAccessibleCache` (despite the name).
We need to get the reference to the cache when the cache is set in
`BlockBasedTableConfig`. If the cache is already set we can only use reflection.
Since the block based table format is the only format in RocksDB that uses
the cache, I do not see why a user absolutely needs to pass a new
`BlockBasedTableConfig` object. I think for now it is OK to log a warning, and
clearly document that the provided `BlockBasedTableConfig` object should be
used.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]