chia7712 commented on a change in pull request #10976: URL: https://github.com/apache/kafka/pull/10976#discussion_r706140746
########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java ########## @@ -378,261 +347,189 @@ public void shouldRecordStatisticsBasedMetrics() { reset(statisticsToAdd1); reset(statisticsToAdd2); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L); bytesWrittenToDatabaseSensor.record(1 + 2, 0L); - replay(bytesWrittenToDatabaseSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L); bytesReadFromDatabaseSensor.record(2 + 3, 0L); - replay(bytesReadFromDatabaseSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L); memtableBytesFlushedSensor.record(3 + 4, 0L); - replay(memtableBytesFlushedSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L); memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L); - replay(memtableHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L); writeStallDurationSensor.record(4 + 5, 0L); - replay(writeStallDurationSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L); blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L); - replay(blockCacheDataHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(4L); blockCacheIndexHitRatioSensor.record((double) 6 / (6 + 6), 0L); - replay(blockCacheIndexHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(2L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(5L); blockCacheFilterHitRatioSensor.record((double) 5 / (5 + 9), 0L); - replay(blockCacheFilterHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(4L); bytesWrittenDuringCompactionSensor.record(2 + 4, 0L); - replay(bytesWrittenDuringCompactionSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(5L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(6L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(5L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(6L); bytesReadDuringCompactionSensor.record(5 + 6, 0L); - replay(bytesReadDuringCompactionSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L); numberOfOpenFilesSensor.record((5 + 7) - (3 + 4), 0L); - replay(numberOfOpenFilesSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L); numberOfFileErrorsSensor.record(11 + 34, 0L); - replay(numberOfFileErrorsSensor); - - replay(statisticsToAdd1); - replay(statisticsToAdd2); recorder.record(0L); - - verify(statisticsToAdd1); - verify(statisticsToAdd2); - verify( - bytesWrittenToDatabaseSensor, - bytesReadFromDatabaseSensor, - memtableBytesFlushedSensor, - memtableHitRatioSensor, - writeStallDurationSensor, - blockCacheDataHitRatioSensor, - blockCacheIndexHitRatioSensor, - blockCacheFilterHitRatioSensor, - bytesWrittenDuringCompactionSensor, - bytesReadDuringCompactionSensor, - numberOfOpenFilesSensor, - numberOfFileErrorsSensor - ); } @Test public void shouldNotRecordStatisticsBasedMetricsIfStatisticsIsNull() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); - replay( - bytesWrittenToDatabaseSensor, - bytesReadFromDatabaseSensor, - memtableBytesFlushedSensor, - memtableHitRatioSensor, - writeStallDurationSensor, - blockCacheDataHitRatioSensor, - blockCacheIndexHitRatioSensor, - blockCacheFilterHitRatioSensor, - bytesWrittenDuringCompactionSensor, - bytesReadDuringCompactionSensor, - numberOfOpenFilesSensor, - numberOfFileErrorsSensor - ); recorder.record(0L); Review comment: Please verify that no `sensor#record` is executed after `recorder.record(0L);` ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java ########## @@ -300,65 +287,47 @@ public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfte recorder.removeValueProviders(SEGMENT_STORE_NAME_1); reset(recordingTrigger); recordingTrigger.addMetricsRecorder(recorder); - replay(recordingTrigger); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); - - verify(recordingTrigger); } @Test public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); reset(recordingTrigger); - replay(recordingTrigger); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); - - verify(recordingTrigger); } @Test public void shouldCloseStatisticsWhenValueProvidersAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); reset(statisticsToAdd1); statisticsToAdd1.close(); - replay(statisticsToAdd1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - - verify(statisticsToAdd1); } @Test public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); reset(statisticsToAdd1); - replay(statisticsToAdd1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); Review comment: Please using Mockito.verify to make sure recorder.removeValueProviders(SEGMENT_STORE_NAME_2); is not executed when executing recorder.removeValueProviders(SEGMENT_STORE_NAME_2); ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java ########## @@ -300,65 +287,47 @@ public void shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfte recorder.removeValueProviders(SEGMENT_STORE_NAME_1); reset(recordingTrigger); recordingTrigger.addMetricsRecorder(recorder); - replay(recordingTrigger); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); - - verify(recordingTrigger); } @Test public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); reset(recordingTrigger); - replay(recordingTrigger); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); - - verify(recordingTrigger); } @Test public void shouldCloseStatisticsWhenValueProvidersAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); reset(statisticsToAdd1); statisticsToAdd1.close(); - replay(statisticsToAdd1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - - verify(statisticsToAdd1); } @Test public void shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); reset(statisticsToAdd1); - replay(statisticsToAdd1); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - - verify(statisticsToAdd1); } @Test public void shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, cacheToAdd2, statisticsToAdd2); reset(recordingTrigger); - replay(recordingTrigger); recorder.removeValueProviders(SEGMENT_STORE_NAME_1); - verify(recordingTrigger); - reset(recordingTrigger); recordingTrigger.removeMetricsRecorder(recorder); - replay(recordingTrigger); recorder.removeValueProviders(SEGMENT_STORE_NAME_2); Review comment: Please using `Mockito.verify` to make sure `recorder.removeValueProviders(SEGMENT_STORE_NAME_2);` happens once when calling `recorder.removeValueProviders(SEGMENT_STORE_NAME_2);` ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java ########## @@ -378,261 +347,189 @@ public void shouldRecordStatisticsBasedMetrics() { reset(statisticsToAdd1); reset(statisticsToAdd2); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L); bytesWrittenToDatabaseSensor.record(1 + 2, 0L); - replay(bytesWrittenToDatabaseSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L); bytesReadFromDatabaseSensor.record(2 + 3, 0L); - replay(bytesReadFromDatabaseSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L); memtableBytesFlushedSensor.record(3 + 4, 0L); - replay(memtableBytesFlushedSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L); memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L); - replay(memtableHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L); writeStallDurationSensor.record(4 + 5, 0L); - replay(writeStallDurationSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L); blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L); - replay(blockCacheDataHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(4L); blockCacheIndexHitRatioSensor.record((double) 6 / (6 + 6), 0L); - replay(blockCacheIndexHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(2L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(5L); blockCacheFilterHitRatioSensor.record((double) 5 / (5 + 9), 0L); - replay(blockCacheFilterHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(4L); bytesWrittenDuringCompactionSensor.record(2 + 4, 0L); - replay(bytesWrittenDuringCompactionSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(5L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(6L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(5L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(6L); bytesReadDuringCompactionSensor.record(5 + 6, 0L); - replay(bytesReadDuringCompactionSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L); numberOfOpenFilesSensor.record((5 + 7) - (3 + 4), 0L); - replay(numberOfOpenFilesSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L); numberOfFileErrorsSensor.record(11 + 34, 0L); - replay(numberOfFileErrorsSensor); - - replay(statisticsToAdd1); - replay(statisticsToAdd2); recorder.record(0L); Review comment: please make sure the value of sensor is equal to mocked value. ########## File path: streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java ########## @@ -378,261 +347,189 @@ public void shouldRecordStatisticsBasedMetrics() { reset(statisticsToAdd1); reset(statisticsToAdd2); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L); bytesWrittenToDatabaseSensor.record(1 + 2, 0L); - replay(bytesWrittenToDatabaseSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L); bytesReadFromDatabaseSensor.record(2 + 3, 0L); - replay(bytesReadFromDatabaseSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L); memtableBytesFlushedSensor.record(3 + 4, 0L); - replay(memtableBytesFlushedSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L); memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L); - replay(memtableHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L); writeStallDurationSensor.record(4 + 5, 0L); - replay(writeStallDurationSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L); blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L); - replay(blockCacheDataHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(4L); blockCacheIndexHitRatioSensor.record((double) 6 / (6 + 6), 0L); - replay(blockCacheIndexHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(2L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(4L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(2L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(4L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(5L); blockCacheFilterHitRatioSensor.record((double) 5 / (5 + 9), 0L); - replay(blockCacheFilterHitRatioSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(2L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(2L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(4L); bytesWrittenDuringCompactionSensor.record(2 + 4, 0L); - replay(bytesWrittenDuringCompactionSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(5L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(6L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(5L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(6L); bytesReadDuringCompactionSensor.record(5 + 6, 0L); - replay(bytesReadDuringCompactionSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L); numberOfOpenFilesSensor.record((5 + 7) - (3 + 4), 0L); - replay(numberOfOpenFilesSensor); - expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L); - expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L); + when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L); + when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L); numberOfFileErrorsSensor.record(11 + 34, 0L); - replay(numberOfFileErrorsSensor); - - replay(statisticsToAdd1); - replay(statisticsToAdd2); recorder.record(0L); - - verify(statisticsToAdd1); - verify(statisticsToAdd2); - verify( - bytesWrittenToDatabaseSensor, - bytesReadFromDatabaseSensor, - memtableBytesFlushedSensor, - memtableHitRatioSensor, - writeStallDurationSensor, - blockCacheDataHitRatioSensor, - blockCacheIndexHitRatioSensor, - blockCacheFilterHitRatioSensor, - bytesWrittenDuringCompactionSensor, - bytesReadDuringCompactionSensor, - numberOfOpenFilesSensor, - numberOfFileErrorsSensor - ); } @Test public void shouldNotRecordStatisticsBasedMetricsIfStatisticsIsNull() { recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, null); - replay( - bytesWrittenToDatabaseSensor, - bytesReadFromDatabaseSensor, - memtableBytesFlushedSensor, - memtableHitRatioSensor, - writeStallDurationSensor, - blockCacheDataHitRatioSensor, - blockCacheIndexHitRatioSensor, - blockCacheFilterHitRatioSensor, - bytesWrittenDuringCompactionSensor, - bytesReadDuringCompactionSensor, - numberOfOpenFilesSensor, - numberOfFileErrorsSensor - ); recorder.record(0L); - - verify( - bytesWrittenToDatabaseSensor, - bytesReadFromDatabaseSensor, - memtableBytesFlushedSensor, - memtableHitRatioSensor, - writeStallDurationSensor, - blockCacheDataHitRatioSensor, - blockCacheIndexHitRatioSensor, - blockCacheFilterHitRatioSensor, - bytesWrittenDuringCompactionSensor, - bytesReadDuringCompactionSensor, - numberOfOpenFilesSensor, - numberOfFileErrorsSensor - ); } @Test public void shouldCorrectlyHandleHitRatioRecordingsWithZeroHitsAndMisses() { - resetToNice(statisticsToAdd1); + reset(statisticsToAdd1); recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, cacheToAdd1, statisticsToAdd1); - expect(statisticsToAdd1.getTickerCount(anyObject())).andStubReturn(0L); - replay(statisticsToAdd1); + when(statisticsToAdd1.getTickerCount(anyObject())).thenReturn(0L); memtableHitRatioSensor.record(0, 0L); blockCacheDataHitRatioSensor.record(0, 0L); blockCacheIndexHitRatioSensor.record(0, 0L); blockCacheFilterHitRatioSensor.record(0, 0L); - replay(memtableHitRatioSensor); - replay(blockCacheDataHitRatioSensor); - replay(blockCacheIndexHitRatioSensor); - replay(blockCacheFilterHitRatioSensor); recorder.record(0L); - - verify(memtableHitRatioSensor); - verify(blockCacheDataHitRatioSensor); - verify(blockCacheIndexHitRatioSensor); - verify(blockCacheFilterHitRatioSensor); } private void setUpMetricsMock() { - mockStatic(RocksDBMetrics.class); + rocksDBMetricsMockedStatic.close(); + rocksDBMetricsMockedStatic = Mockito.mockStatic(RocksDBMetrics.class); final RocksDBMetricContext metricsContext = new RocksDBMetricContext(TASK_ID1.toString(), METRICS_SCOPE, STORE_NAME); - expect(RocksDBMetrics.bytesWrittenToDatabaseSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(bytesWrittenToDatabaseSensor); - expect(RocksDBMetrics.bytesReadFromDatabaseSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(bytesReadFromDatabaseSensor); - expect(RocksDBMetrics.memtableBytesFlushedSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(memtableBytesFlushedSensor); - expect(RocksDBMetrics.memtableHitRatioSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(memtableHitRatioSensor); - expect(RocksDBMetrics.writeStallDurationSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(writeStallDurationSensor); - expect(RocksDBMetrics.blockCacheDataHitRatioSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(blockCacheDataHitRatioSensor); - expect(RocksDBMetrics.blockCacheIndexHitRatioSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(blockCacheIndexHitRatioSensor); - expect(RocksDBMetrics.blockCacheFilterHitRatioSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(blockCacheFilterHitRatioSensor); - expect(RocksDBMetrics.bytesWrittenDuringCompactionSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(bytesWrittenDuringCompactionSensor); - expect(RocksDBMetrics.bytesReadDuringCompactionSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(bytesReadDuringCompactionSensor); - expect(RocksDBMetrics.numberOfOpenFilesSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(numberOfOpenFilesSensor); - expect(RocksDBMetrics.numberOfFileErrorsSensor(eq(streamsMetrics), eq(metricsContext))) - .andReturn(numberOfFileErrorsSensor); - RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), eq(metricsContext), anyObject()); - replay(RocksDBMetrics.class); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricsContext)) + .thenReturn(bytesWrittenToDatabaseSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.bytesReadFromDatabaseSensor(streamsMetrics, metricsContext)) + .thenReturn(bytesReadFromDatabaseSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.memtableBytesFlushedSensor(streamsMetrics, metricsContext)) + .thenReturn(memtableBytesFlushedSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.memtableHitRatioSensor(streamsMetrics, metricsContext)) + .thenReturn(memtableHitRatioSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.writeStallDurationSensor(streamsMetrics, metricsContext)) + .thenReturn(writeStallDurationSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.blockCacheDataHitRatioSensor(streamsMetrics, metricsContext)) + .thenReturn(blockCacheDataHitRatioSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.blockCacheIndexHitRatioSensor(streamsMetrics, metricsContext)) + .thenReturn(blockCacheIndexHitRatioSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.blockCacheFilterHitRatioSensor(streamsMetrics, metricsContext)) + .thenReturn(blockCacheFilterHitRatioSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.bytesWrittenDuringCompactionSensor(streamsMetrics, metricsContext)) + .thenReturn(bytesWrittenDuringCompactionSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.bytesReadDuringCompactionSensor(streamsMetrics, metricsContext)) + .thenReturn(bytesReadDuringCompactionSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.numberOfOpenFilesSensor(streamsMetrics, metricsContext)) + .thenReturn(numberOfOpenFilesSensor); + rocksDBMetricsMockedStatic.when(() -> RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricsContext)) + .thenReturn(numberOfFileErrorsSensor); + RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), eq(metricsContext), any()); + RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), eq(metricsContext), any()); } private void setUpMetricsStubMock() { Review comment: What is the difference between `setUpMetricsStubMock` and `setUpMetricsMock`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org