chia7712 commented on a change in pull request #10976:
URL: https://github.com/apache/kafka/pull/10976#discussion_r706140746



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
##########
@@ -378,261 +347,189 @@ public void shouldRecordStatisticsBasedMetrics() {
         reset(statisticsToAdd1);
         reset(statisticsToAdd2);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L);
         bytesWrittenToDatabaseSensor.record(1 + 2, 0L);
-        replay(bytesWrittenToDatabaseSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L);
         bytesReadFromDatabaseSensor.record(2 + 3, 0L);
-        replay(bytesReadFromDatabaseSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L);
         memtableBytesFlushedSensor.record(3 + 4, 0L);
-        replay(memtableBytesFlushedSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L);
         memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L);
-        replay(memtableHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L);
         writeStallDurationSensor.record(4 + 5, 0L);
-        replay(writeStallDurationSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L);
         blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L);
-        replay(blockCacheDataHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(4L);
         blockCacheIndexHitRatioSensor.record((double) 6 / (6 + 6), 0L);
-        replay(blockCacheIndexHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(2L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(2L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(5L);
         blockCacheFilterHitRatioSensor.record((double) 5 / (5 + 9), 0L);
-        replay(blockCacheFilterHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(4L);
         bytesWrittenDuringCompactionSensor.record(2 + 4, 0L);
-        replay(bytesWrittenDuringCompactionSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(5L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(6L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(5L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(6L);
         bytesReadDuringCompactionSensor.record(5 + 6, 0L);
-        replay(bytesReadDuringCompactionSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L);
         numberOfOpenFilesSensor.record((5 + 7) - (3 + 4), 0L);
-        replay(numberOfOpenFilesSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L);
         numberOfFileErrorsSensor.record(11 + 34, 0L);
-        replay(numberOfFileErrorsSensor);
-
-        replay(statisticsToAdd1);
-        replay(statisticsToAdd2);
 
         recorder.record(0L);
-
-        verify(statisticsToAdd1);
-        verify(statisticsToAdd2);
-        verify(
-            bytesWrittenToDatabaseSensor,
-            bytesReadFromDatabaseSensor,
-            memtableBytesFlushedSensor,
-            memtableHitRatioSensor,
-            writeStallDurationSensor,
-            blockCacheDataHitRatioSensor,
-            blockCacheIndexHitRatioSensor,
-            blockCacheFilterHitRatioSensor,
-            bytesWrittenDuringCompactionSensor,
-            bytesReadDuringCompactionSensor,
-            numberOfOpenFilesSensor,
-            numberOfFileErrorsSensor
-        );
     }
 
     @Test
     public void shouldNotRecordStatisticsBasedMetricsIfStatisticsIsNull() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, null);
-        replay(
-            bytesWrittenToDatabaseSensor,
-            bytesReadFromDatabaseSensor,
-            memtableBytesFlushedSensor,
-            memtableHitRatioSensor,
-            writeStallDurationSensor,
-            blockCacheDataHitRatioSensor,
-            blockCacheIndexHitRatioSensor,
-            blockCacheFilterHitRatioSensor,
-            bytesWrittenDuringCompactionSensor,
-            bytesReadDuringCompactionSensor,
-            numberOfOpenFilesSensor,
-            numberOfFileErrorsSensor
-        );
 
         recorder.record(0L);

Review comment:
       Please verify that no `sensor#record` is executed after 
`recorder.record(0L);`

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
##########
@@ -300,65 +287,47 @@ public void 
shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfte
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
         reset(recordingTrigger);
         recordingTrigger.addMetricsRecorder(recorder);
-        replay(recordingTrigger);
 
         recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
-
-        verify(recordingTrigger);
     }
 
     @Test
     public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
         reset(recordingTrigger);
-        replay(recordingTrigger);
 
         recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
-
-        verify(recordingTrigger);
     }
 
     @Test
     public void shouldCloseStatisticsWhenValueProvidersAreRemoved() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
         reset(statisticsToAdd1);
         statisticsToAdd1.close();
-        replay(statisticsToAdd1);
 
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-
-        verify(statisticsToAdd1);
     }
 
     @Test
     public void 
shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, null);
         reset(statisticsToAdd1);
-        replay(statisticsToAdd1);
 
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);

Review comment:
       Please using Mockito.verify to make sure 
recorder.removeValueProviders(SEGMENT_STORE_NAME_2); is not executed when 
executing recorder.removeValueProviders(SEGMENT_STORE_NAME_2);

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
##########
@@ -300,65 +287,47 @@ public void 
shouldAddItselfToRecordingTriggerWhenFirstValueProvidersAreAddedAfte
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
         reset(recordingTrigger);
         recordingTrigger.addMetricsRecorder(recorder);
-        replay(recordingTrigger);
 
         recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
-
-        verify(recordingTrigger);
     }
 
     @Test
     public void shouldNotAddItselfToRecordingTriggerWhenNotEmpty2() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
         reset(recordingTrigger);
-        replay(recordingTrigger);
 
         recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
-
-        verify(recordingTrigger);
     }
 
     @Test
     public void shouldCloseStatisticsWhenValueProvidersAreRemoved() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
         reset(statisticsToAdd1);
         statisticsToAdd1.close();
-        replay(statisticsToAdd1);
 
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-
-        verify(statisticsToAdd1);
     }
 
     @Test
     public void 
shouldNotCloseStatisticsWhenValueProvidersWithoutStatisticsAreRemoved() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, null);
         reset(statisticsToAdd1);
-        replay(statisticsToAdd1);
 
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
-
-        verify(statisticsToAdd1);
     }
 
     @Test
     public void 
shouldRemoveItselfFromRecordingTriggerWhenLastValueProvidersAreRemoved() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
         recorder.addValueProviders(SEGMENT_STORE_NAME_2, dbToAdd2, 
cacheToAdd2, statisticsToAdd2);
         reset(recordingTrigger);
-        replay(recordingTrigger);
 
         recorder.removeValueProviders(SEGMENT_STORE_NAME_1);
 
-        verify(recordingTrigger);
-
         reset(recordingTrigger);
         recordingTrigger.removeMetricsRecorder(recorder);
-        replay(recordingTrigger);
 
         recorder.removeValueProviders(SEGMENT_STORE_NAME_2);

Review comment:
       Please using `Mockito.verify` to make sure 
`recorder.removeValueProviders(SEGMENT_STORE_NAME_2);` happens once when 
calling `recorder.removeValueProviders(SEGMENT_STORE_NAME_2);`

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
##########
@@ -378,261 +347,189 @@ public void shouldRecordStatisticsBasedMetrics() {
         reset(statisticsToAdd1);
         reset(statisticsToAdd2);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L);
         bytesWrittenToDatabaseSensor.record(1 + 2, 0L);
-        replay(bytesWrittenToDatabaseSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L);
         bytesReadFromDatabaseSensor.record(2 + 3, 0L);
-        replay(bytesReadFromDatabaseSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L);
         memtableBytesFlushedSensor.record(3 + 4, 0L);
-        replay(memtableBytesFlushedSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L);
         memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L);
-        replay(memtableHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L);
         writeStallDurationSensor.record(4 + 5, 0L);
-        replay(writeStallDurationSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L);
         blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L);
-        replay(blockCacheDataHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(4L);
         blockCacheIndexHitRatioSensor.record((double) 6 / (6 + 6), 0L);
-        replay(blockCacheIndexHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(2L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(2L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(5L);
         blockCacheFilterHitRatioSensor.record((double) 5 / (5 + 9), 0L);
-        replay(blockCacheFilterHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(4L);
         bytesWrittenDuringCompactionSensor.record(2 + 4, 0L);
-        replay(bytesWrittenDuringCompactionSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(5L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(6L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(5L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(6L);
         bytesReadDuringCompactionSensor.record(5 + 6, 0L);
-        replay(bytesReadDuringCompactionSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L);
         numberOfOpenFilesSensor.record((5 + 7) - (3 + 4), 0L);
-        replay(numberOfOpenFilesSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L);
         numberOfFileErrorsSensor.record(11 + 34, 0L);
-        replay(numberOfFileErrorsSensor);
-
-        replay(statisticsToAdd1);
-        replay(statisticsToAdd2);
 
         recorder.record(0L);

Review comment:
       please make sure the value of sensor is equal to mocked value.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/state/internals/metrics/RocksDBMetricsRecorderTest.java
##########
@@ -378,261 +347,189 @@ public void shouldRecordStatisticsBasedMetrics() {
         reset(statisticsToAdd1);
         reset(statisticsToAdd2);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(1L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).andReturn(2L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(1L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_WRITTEN)).thenReturn(2L);
         bytesWrittenToDatabaseSensor.record(1 + 2, 0L);
-        replay(bytesWrittenToDatabaseSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).andReturn(3L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BYTES_READ)).thenReturn(3L);
         bytesReadFromDatabaseSensor.record(2 + 3, 0L);
-        replay(bytesReadFromDatabaseSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.FLUSH_WRITE_BYTES)).thenReturn(4L);
         memtableBytesFlushedSensor.record(3 + 4, 0L);
-        replay(memtableBytesFlushedSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(1L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(1L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.MEMTABLE_MISS)).thenReturn(4L);
         memtableHitRatioSensor.record((double) 4 / (4 + 6), 0L);
-        replay(memtableHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).andReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.STALL_MICROS)).thenReturn(5L);
         writeStallDurationSensor.record(4 + 5, 0L);
-        replay(writeStallDurationSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(5L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).andReturn(2L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_DATA_MISS)).thenReturn(2L);
         blockCacheDataHitRatioSensor.record((double) 8 / (8 + 6), 0L);
-        replay(blockCacheDataHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(4L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_HIT)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_INDEX_MISS)).thenReturn(4L);
         blockCacheIndexHitRatioSensor.record((double) 6 / (6 + 6), 0L);
-        replay(blockCacheIndexHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(2L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(4L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).andReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(2L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(4L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_HIT)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.BLOCK_CACHE_FILTER_MISS)).thenReturn(5L);
         blockCacheFilterHitRatioSensor.record((double) 5 / (5 + 9), 0L);
-        replay(blockCacheFilterHitRatioSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(2L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(2L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_WRITE_BYTES)).thenReturn(4L);
         bytesWrittenDuringCompactionSensor.record(2 + 4, 0L);
-        replay(bytesWrittenDuringCompactionSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(5L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).andReturn(6L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(5L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.COMPACT_READ_BYTES)).thenReturn(6L);
         bytesReadDuringCompactionSensor.record(5 + 6, 0L);
-        replay(bytesReadDuringCompactionSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(5L);
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(3L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).andReturn(7L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).andReturn(4L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(5L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(3L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_OPENS)).thenReturn(7L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_CLOSES)).thenReturn(4L);
         numberOfOpenFilesSensor.record((5 + 7) - (3 + 4), 0L);
-        replay(numberOfOpenFilesSensor);
 
-        
expect(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(34L);
-        
expect(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).andReturn(11L);
+        
when(statisticsToAdd1.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(34L);
+        
when(statisticsToAdd2.getAndResetTickerCount(TickerType.NO_FILE_ERRORS)).thenReturn(11L);
         numberOfFileErrorsSensor.record(11 + 34, 0L);
-        replay(numberOfFileErrorsSensor);
-
-        replay(statisticsToAdd1);
-        replay(statisticsToAdd2);
 
         recorder.record(0L);
-
-        verify(statisticsToAdd1);
-        verify(statisticsToAdd2);
-        verify(
-            bytesWrittenToDatabaseSensor,
-            bytesReadFromDatabaseSensor,
-            memtableBytesFlushedSensor,
-            memtableHitRatioSensor,
-            writeStallDurationSensor,
-            blockCacheDataHitRatioSensor,
-            blockCacheIndexHitRatioSensor,
-            blockCacheFilterHitRatioSensor,
-            bytesWrittenDuringCompactionSensor,
-            bytesReadDuringCompactionSensor,
-            numberOfOpenFilesSensor,
-            numberOfFileErrorsSensor
-        );
     }
 
     @Test
     public void shouldNotRecordStatisticsBasedMetricsIfStatisticsIsNull() {
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, null);
-        replay(
-            bytesWrittenToDatabaseSensor,
-            bytesReadFromDatabaseSensor,
-            memtableBytesFlushedSensor,
-            memtableHitRatioSensor,
-            writeStallDurationSensor,
-            blockCacheDataHitRatioSensor,
-            blockCacheIndexHitRatioSensor,
-            blockCacheFilterHitRatioSensor,
-            bytesWrittenDuringCompactionSensor,
-            bytesReadDuringCompactionSensor,
-            numberOfOpenFilesSensor,
-            numberOfFileErrorsSensor
-        );
 
         recorder.record(0L);
-
-        verify(
-            bytesWrittenToDatabaseSensor,
-            bytesReadFromDatabaseSensor,
-            memtableBytesFlushedSensor,
-            memtableHitRatioSensor,
-            writeStallDurationSensor,
-            blockCacheDataHitRatioSensor,
-            blockCacheIndexHitRatioSensor,
-            blockCacheFilterHitRatioSensor,
-            bytesWrittenDuringCompactionSensor,
-            bytesReadDuringCompactionSensor,
-            numberOfOpenFilesSensor,
-            numberOfFileErrorsSensor
-        );
     }
 
     @Test
     public void shouldCorrectlyHandleHitRatioRecordingsWithZeroHitsAndMisses() 
{
-        resetToNice(statisticsToAdd1);
+        reset(statisticsToAdd1);
         recorder.addValueProviders(SEGMENT_STORE_NAME_1, dbToAdd1, 
cacheToAdd1, statisticsToAdd1);
-        expect(statisticsToAdd1.getTickerCount(anyObject())).andStubReturn(0L);
-        replay(statisticsToAdd1);
+        when(statisticsToAdd1.getTickerCount(anyObject())).thenReturn(0L);
         memtableHitRatioSensor.record(0, 0L);
         blockCacheDataHitRatioSensor.record(0, 0L);
         blockCacheIndexHitRatioSensor.record(0, 0L);
         blockCacheFilterHitRatioSensor.record(0, 0L);
-        replay(memtableHitRatioSensor);
-        replay(blockCacheDataHitRatioSensor);
-        replay(blockCacheIndexHitRatioSensor);
-        replay(blockCacheFilterHitRatioSensor);
 
         recorder.record(0L);
-
-        verify(memtableHitRatioSensor);
-        verify(blockCacheDataHitRatioSensor);
-        verify(blockCacheIndexHitRatioSensor);
-        verify(blockCacheFilterHitRatioSensor);
     }
 
     private void setUpMetricsMock() {
-        mockStatic(RocksDBMetrics.class);
+        rocksDBMetricsMockedStatic.close();
+        rocksDBMetricsMockedStatic = Mockito.mockStatic(RocksDBMetrics.class);
         final RocksDBMetricContext metricsContext =
             new RocksDBMetricContext(TASK_ID1.toString(), METRICS_SCOPE, 
STORE_NAME);
-        expect(RocksDBMetrics.bytesWrittenToDatabaseSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(bytesWrittenToDatabaseSensor);
-        expect(RocksDBMetrics.bytesReadFromDatabaseSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(bytesReadFromDatabaseSensor);
-        expect(RocksDBMetrics.memtableBytesFlushedSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(memtableBytesFlushedSensor);
-        expect(RocksDBMetrics.memtableHitRatioSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(memtableHitRatioSensor);
-        expect(RocksDBMetrics.writeStallDurationSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(writeStallDurationSensor);
-        expect(RocksDBMetrics.blockCacheDataHitRatioSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(blockCacheDataHitRatioSensor);
-        
expect(RocksDBMetrics.blockCacheIndexHitRatioSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(blockCacheIndexHitRatioSensor);
-        
expect(RocksDBMetrics.blockCacheFilterHitRatioSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(blockCacheFilterHitRatioSensor);
-        
expect(RocksDBMetrics.bytesWrittenDuringCompactionSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(bytesWrittenDuringCompactionSensor);
-        
expect(RocksDBMetrics.bytesReadDuringCompactionSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(bytesReadDuringCompactionSensor);
-        expect(RocksDBMetrics.numberOfOpenFilesSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(numberOfOpenFilesSensor);
-        expect(RocksDBMetrics.numberOfFileErrorsSensor(eq(streamsMetrics), 
eq(metricsContext)))
-            .andReturn(numberOfFileErrorsSensor);
-        RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        
RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), 
eq(metricsContext), anyObject());
-        replay(RocksDBMetrics.class);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.bytesWrittenToDatabaseSensor(streamsMetrics, metricsContext))
+            .thenReturn(bytesWrittenToDatabaseSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.bytesReadFromDatabaseSensor(streamsMetrics, metricsContext))
+            .thenReturn(bytesReadFromDatabaseSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.memtableBytesFlushedSensor(streamsMetrics, metricsContext))
+            .thenReturn(memtableBytesFlushedSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.memtableHitRatioSensor(streamsMetrics, metricsContext))
+            .thenReturn(memtableHitRatioSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.writeStallDurationSensor(streamsMetrics, metricsContext))
+            .thenReturn(writeStallDurationSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.blockCacheDataHitRatioSensor(streamsMetrics, metricsContext))
+            .thenReturn(blockCacheDataHitRatioSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.blockCacheIndexHitRatioSensor(streamsMetrics, metricsContext))
+            .thenReturn(blockCacheIndexHitRatioSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.blockCacheFilterHitRatioSensor(streamsMetrics, metricsContext))
+            .thenReturn(blockCacheFilterHitRatioSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.bytesWrittenDuringCompactionSensor(streamsMetrics, 
metricsContext))
+            .thenReturn(bytesWrittenDuringCompactionSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.bytesReadDuringCompactionSensor(streamsMetrics, metricsContext))
+            .thenReturn(bytesReadDuringCompactionSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.numberOfOpenFilesSensor(streamsMetrics, metricsContext))
+            .thenReturn(numberOfOpenFilesSensor);
+        rocksDBMetricsMockedStatic.when(() -> 
RocksDBMetrics.numberOfFileErrorsSensor(streamsMetrics, metricsContext))
+            .thenReturn(numberOfFileErrorsSensor);
+        RocksDBMetrics.addNumImmutableMemTableMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addCurSizeActiveMemTable(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addCurSizeAllMemTables(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addSizeAllMemTables(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addNumEntriesActiveMemTableMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addNumEntriesImmMemTablesMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addNumDeletesActiveMemTableMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addNumDeletesImmMemTablesMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addMemTableFlushPending(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addNumRunningFlushesMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addCompactionPendingMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addNumRunningCompactionsMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        
RocksDBMetrics.addEstimatePendingCompactionBytesMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addTotalSstFilesSizeMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addLiveSstFilesSizeMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addNumLiveVersionMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addBlockCacheCapacityMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addBlockCacheUsageMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addBlockCachePinnedUsageMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addEstimateNumKeysMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addEstimateTableReadersMemMetric(eq(streamsMetrics), 
eq(metricsContext), any());
+        RocksDBMetrics.addBackgroundErrorsMetric(eq(streamsMetrics), 
eq(metricsContext), any());
     }
 
     private void setUpMetricsStubMock() {

Review comment:
       What is the difference between `setUpMetricsStubMock` and 
`setUpMetricsMock`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to