chia7712 commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r705913852



##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -977,6 +977,11 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
         return streamThread;
     }
 
+    // Ensure Mockito stub construct with capture argument for 
KafkaStreamsTest.
+    public static Metrics createThisMetrics(final MetricConfig metricConfig, 
final List<MetricsReporter> reporters, final Time time, final MetricsContext 
metricsContext) {

Review comment:
       Why need the word "this"?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,10 +246,15 @@ State setState(final State newState) {
 
     public boolean isRunning() {
         synchronized (stateLock) {
-            return state.isAlive();
+            return isStateAlive();
         }
     }
 
+    // Ensure Mockito can stub method for KafkaStreamTest.
+    public boolean isStateAlive() {

Review comment:
       Personally, it is an anti-pattern. There are two Public methods 
returning same "value" but one of them is holding lock. It is hard to use them 
correctly. At any rate, this is unrelated to this PR.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -903,20 +897,15 @@ public void 
shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
             streams.start();
         }
 
-        PowerMock.verify(Executors.class);

Review comment:
       please add `verify` to make sure `rocksDBMetricsRecordingTriggerThread` 
is used to create thread in this test.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -335,41 +332,37 @@ private void prepareStreamThread(final StreamThread 
thread,
                 "",
                 Collections.emptySet(),
                 Collections.emptySet()
-            )
-        ).anyTimes();
-        
EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class),
 anyLong())).andStubReturn(true);
-        EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
-        thread.resizeCache(EasyMock.anyLong());
-        EasyMock.expectLastCall().anyTimes();
-        thread.requestLeaveGroupDuringShutdown();
-        EasyMock.expectLastCall().anyTimes();
-        
EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + 
threadId);
-        thread.shutdown();
-        EasyMock.expectLastCall().andAnswer(() -> {
+        ));
+        when(thread.waitOnThreadState(isA(StreamThread.State.class), 
anyLong())).thenReturn(true);
+        when(thread.isStateAlive()).thenReturn(true);
+        verify(thread, atMostOnce()).isStateAlive();

Review comment:
       It seems to me this check is no-op since the thread is not ready to be 
used. Hence, the execution times is always 0.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -335,41 +332,37 @@ private void prepareStreamThread(final StreamThread 
thread,
                 "",
                 Collections.emptySet(),
                 Collections.emptySet()
-            )
-        ).anyTimes();
-        
EasyMock.expect(thread.waitOnThreadState(EasyMock.isA(StreamThread.State.class),
 anyLong())).andStubReturn(true);
-        EasyMock.expect(thread.isAlive()).andReturn(true).times(0, 1);
-        thread.resizeCache(EasyMock.anyLong());
-        EasyMock.expectLastCall().anyTimes();
-        thread.requestLeaveGroupDuringShutdown();
-        EasyMock.expectLastCall().anyTimes();
-        
EasyMock.expect(thread.getName()).andStubReturn("processId-StreamThread-" + 
threadId);
-        thread.shutdown();
-        EasyMock.expectLastCall().andAnswer(() -> {
+        ));
+        when(thread.waitOnThreadState(isA(StreamThread.State.class), 
anyLong())).thenReturn(true);
+        when(thread.isStateAlive()).thenReturn(true);
+        verify(thread, atMostOnce()).isStateAlive();
+        when(thread.getName()).thenReturn("processId-StreamThread-" + 
threadId);
+
+        doAnswer(invocation -> {
             supplier.consumer.close();
             supplier.restoreConsumer.close();
             for (final MockProducer<byte[], byte[]> producer : 
supplier.producers) {
                 producer.close();
             }
             state.set(StreamThread.State.DEAD);
 
-            threadStatelistenerCapture.getValue().onChange(thread, 
StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
-            threadStatelistenerCapture.getValue().onChange(thread, 
StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
+            threadStateListenerCapture.getValue().onChange(thread, 
StreamThread.State.PENDING_SHUTDOWN, StreamThread.State.RUNNING);
+            threadStateListenerCapture.getValue().onChange(thread, 
StreamThread.State.DEAD, StreamThread.State.PENDING_SHUTDOWN);
             return null;
-        }).anyTimes();
-        EasyMock.expect(thread.isRunning()).andReturn(state.get() == 
StreamThread.State.RUNNING).anyTimes();
-        thread.join();
+        }).when(thread).shutdown();
+
+        when(thread.isRunning()).thenReturn(state.get() == 
StreamThread.State.RUNNING);
+
         if (terminable) {

Review comment:
       If you don't want to check the `join` method, please remove this empty 
body.

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -874,26 +868,26 @@ public void shouldNotBlockInCloseForZeroDuration() {
 
     @Test
     public void 
shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
-        PowerMock.mockStatic(Executors.class);
-        final ScheduledExecutorService cleanupSchedule = 
EasyMock.niceMock(ScheduledExecutorService.class);
+        final MockedStatic<Executors> executorsMockedStatic = 
mockStatic(Executors.class);
+        final ScheduledExecutorService cleanupSchedule = 
mock(ScheduledExecutorService.class, withSettings().lenient());
         final ScheduledExecutorService rocksDBMetricsRecordingTriggerThread =
-            EasyMock.mock(ScheduledExecutorService.class);
-        EasyMock.expect(Executors.newSingleThreadScheduledExecutor(
-            anyObject(ThreadFactory.class)
-        )).andReturn(cleanupSchedule);
-        EasyMock.expect(Executors.newSingleThreadScheduledExecutor(
-            anyObject(ThreadFactory.class)
-        )).andReturn(rocksDBMetricsRecordingTriggerThread);
-        
EasyMock.expect(rocksDBMetricsRecordingTriggerThread.scheduleAtFixedRate(
-            EasyMock.anyObject(RocksDBMetricsRecordingTrigger.class),
-            EasyMock.eq(0L),
-            EasyMock.eq(1L),
-            EasyMock.eq(TimeUnit.MINUTES)
-        )).andReturn(null);
-        
EasyMock.expect(rocksDBMetricsRecordingTriggerThread.shutdownNow()).andReturn(null);
-        PowerMock.replay(Executors.class);
-        PowerMock.replay(rocksDBMetricsRecordingTriggerThread);
-        PowerMock.replay(cleanupSchedule);
+            mock(ScheduledExecutorService.class);
+
+        executorsMockedStatic.when(() -> 
Executors.newSingleThreadScheduledExecutor(
+            any(ThreadFactory.class)
+        )).thenReturn(cleanupSchedule);
+        executorsMockedStatic.when(() -> 
Executors.newSingleThreadScheduledExecutor(

Review comment:
       The different `ThreadFactory` should get different 
`ScheduledExecutorService` (see production code `KafkaStreams`). That was a bug 
in previous test. However, it was fine since all this test want to do is to 
make sure `rocksDBMetricsRecordingTriggerThread` is executed.
   
   Could you follow the production code to return different 
`ScheduledExecutorService`? or add comments to say "this is fine because ..."

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -903,20 +897,15 @@ public void 
shouldTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsDebug() {
             streams.start();
         }
 
-        PowerMock.verify(Executors.class);
-        PowerMock.verify(rocksDBMetricsRecordingTriggerThread);
     }
 
     @Test
     public void 
shouldNotTriggerRecordingOfRocksDBMetricsIfRecordingLevelIsInfo() {
-        PowerMock.mockStatic(Executors.class);
-        final ScheduledExecutorService cleanupSchedule = 
EasyMock.niceMock(ScheduledExecutorService.class);
-        final ScheduledExecutorService rocksDBMetricsRecordingTriggerThread =
-            EasyMock.mock(ScheduledExecutorService.class);
-        EasyMock.expect(Executors.newSingleThreadScheduledExecutor(
-            anyObject(ThreadFactory.class)
-        )).andReturn(cleanupSchedule);
-        PowerMock.replay(Executors.class, 
rocksDBMetricsRecordingTriggerThread, cleanupSchedule);
+        final MockedStatic<Executors> executorsMockedStatic = 
mockStatic(Executors.class);
+        final ScheduledExecutorService cleanupSchedule = 
mock(ScheduledExecutorService.class, withSettings().lenient());
+        final ScheduledExecutorService rocksDBMetricsRecordingTriggerThread = 
mock(ScheduledExecutorService.class);

Review comment:
       please make sure `rocksDBMetricsRecordingTriggerThread` is not used to 
create thread in this test

##########
File path: build.gradle
##########
@@ -1751,8 +1749,6 @@ project(':streams') {
     testImplementation libs.junitJupiterApi
     testImplementation libs.junitVintageEngine
     testImplementation libs.easymock
-    testImplementation libs.powermockJunit4

Review comment:
       `TableSourceNodeTest` is still using powermock




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to