chia7712 commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r667361897



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,10 +246,15 @@ State setState(final State newState) {
 
     public boolean isRunning() {
         synchronized (stateLock) {
-            return state.isAlive();
+            return getIsAlive();
         }
     }
 
+    //For KafkaStreamTest
+    public boolean getIsAlive() {

Review comment:
       this naming is a bit weird to me. How about `isStateAlive`?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -96,13 +94,7 @@
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
 import static 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
 
-import static org.easymock.EasyMock.anyBoolean;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
-import static org.easymock.EasyMock.anyInt;

Review comment:
       Is `easymock` still used by streams module? If not, could you remove it 
from `build.gradle`?

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,10 +246,15 @@ State setState(final State newState) {
 
     public boolean isRunning() {
         synchronized (stateLock) {
-            return state.isAlive();
+            return getIsAlive();
         }
     }
 
+    //For KafkaStreamTest

Review comment:
       ditto. Could you add more description?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -182,152 +192,138 @@ public void before() throws Exception {
         prepareStreams();
     }
 
+    @After
+    public void tearDown() {
+        kafkaStreamsMockedStatic.close();
+        clientMetricsMockedStatic.close();
+        streamThreadMockedStatic.close();
+        globalStreamThreadMockedConstruction.close();
+    }
+
     private void prepareStreams() throws Exception {
         // setup metrics
-        PowerMock.expectNew(Metrics.class,
-            anyObject(MetricConfig.class),
-            capture(metricsReportersCapture),
-            anyObject(Time.class),
-            anyObject(MetricsContext.class)
-        ).andAnswer(() -> {
+        kafkaStreamsMockedStatic = mockStatic(KafkaStreams.class, 
withSettings()
+                .defaultAnswer(InvocationOnMock::callRealMethod));
+        kafkaStreamsMockedStatic.when(() -> KafkaStreams.createThisMetrics(
+                any(MetricConfig.class),
+                metricsReportersCapture.capture(),
+                any(Time.class),
+                any(MetricsContext.class)
+        )).thenAnswer(invocation -> {
             for (final MetricsReporter reporter : 
metricsReportersCapture.getValue()) {
                 reporter.init(Collections.emptyList());
             }
             return metrics;
-        }).anyTimes();
-        metrics.close();
-        EasyMock.expectLastCall().andAnswer(() -> {
+        });
+
+        doAnswer(invocation -> {
             for (final MetricsReporter reporter : 
metricsReportersCapture.getValue()) {
                 reporter.close();
             }
             return null;
-        }).anyTimes();
-
-        PowerMock.mockStatic(ClientMetrics.class);
-        EasyMock.expect(ClientMetrics.version()).andReturn("1.56");
-        EasyMock.expect(ClientMetrics.commitId()).andReturn("1a2b3c4d5e");
-        ClientMetrics.addVersionMetric(anyObject(StreamsMetricsImpl.class));
-        ClientMetrics.addCommitIdMetric(anyObject(StreamsMetricsImpl.class));
-        
ClientMetrics.addApplicationIdMetric(anyObject(StreamsMetricsImpl.class), 
EasyMock.eq(APPLICATION_ID));
-        
ClientMetrics.addTopologyDescriptionMetric(anyObject(StreamsMetricsImpl.class), 
anyString());
-        ClientMetrics.addStateMetric(anyObject(StreamsMetricsImpl.class), 
anyObject());
-        
ClientMetrics.addNumAliveStreamThreadMetric(anyObject(StreamsMetricsImpl.class),
 anyObject());
+        }).when(metrics).close();
+
+        clientMetricsMockedStatic = mockStatic(ClientMetrics.class);
+        
clientMetricsMockedStatic.when(ClientMetrics::version).thenReturn("1.56");
+        
clientMetricsMockedStatic.when(ClientMetrics::commitId).thenReturn("1a2b3c4d5e");
+        ClientMetrics.addVersionMetric(any(StreamsMetricsImpl.class));
+        ClientMetrics.addCommitIdMetric(any(StreamsMetricsImpl.class));
+        ClientMetrics.addApplicationIdMetric(any(StreamsMetricsImpl.class), 
eq(APPLICATION_ID));
+        
ClientMetrics.addTopologyDescriptionMetric(any(StreamsMetricsImpl.class), 
any());
+        ClientMetrics.addStateMetric(any(StreamsMetricsImpl.class), any());
+        
ClientMetrics.addNumAliveStreamThreadMetric(any(StreamsMetricsImpl.class), 
any());
 
         // setup stream threads
-        PowerMock.mockStatic(StreamThread.class);
-        EasyMock.expect(StreamThread.create(
-            anyObject(InternalTopologyBuilder.class),
-            anyObject(StreamsConfig.class),
-            anyObject(KafkaClientSupplier.class),
-            anyObject(Admin.class),
-            anyObject(UUID.class),
-            anyObject(String.class),
-            anyObject(StreamsMetricsImpl.class),
-            anyObject(Time.class),
-            anyObject(StreamsMetadataState.class),
-            anyLong(),
-            anyObject(StateDirectory.class),
-            anyObject(StateRestoreListener.class),
-            anyInt(),
-            anyObject(Runnable.class),
-            anyObject()
-        )).andReturn(streamThreadOne).andReturn(streamThreadTwo);
-
-        
EasyMock.expect(StreamThread.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes();
-        
EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes();
-        EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
-        EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
+        streamThreadMockedStatic = mockStatic(StreamThread.class);
+        streamThreadMockedStatic.when(() -> StreamThread.create(
+                any(InternalTopologyBuilder.class),
+                any(StreamsConfig.class),
+                any(KafkaClientSupplier.class),
+                any(Admin.class),
+                any(UUID.class),
+                any(String.class),
+                any(StreamsMetricsImpl.class),
+                any(Time.class),
+                any(StreamsMetadataState.class),
+                anyLong(),
+                any(StateDirectory.class),
+                any(StateRestoreListener.class),
+                anyInt(),
+                any(Runnable.class),
+                any()
+        )).thenReturn(streamThreadOne).thenReturn(streamThreadTwo);
+
+        streamThreadMockedStatic.when(() -> 
StreamThread.eosEnabled(any(StreamsConfig.class))).thenReturn(false);
+        streamThreadMockedStatic.when(() -> 
StreamThread.processingMode(any(StreamsConfig.class))).thenReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE);
+        when(streamThreadOne.getId()).thenReturn(1L);
+        when(streamThreadTwo.getId()).thenReturn(2L);
+
         prepareStreamThread(streamThreadOne, 1, true);
         prepareStreamThread(streamThreadTwo, 2, false);
 
         // setup global threads
         final AtomicReference<GlobalStreamThread.State> globalThreadState = 
new AtomicReference<>(GlobalStreamThread.State.CREATED);
-        PowerMock.expectNew(GlobalStreamThread.class,
-            anyObject(ProcessorTopology.class),
-            anyObject(StreamsConfig.class),
-            anyObject(Consumer.class),
-            anyObject(StateDirectory.class),
-            anyLong(),
-            anyObject(StreamsMetricsImpl.class),
-            anyObject(Time.class),
-            anyString(),
-            anyObject(StateRestoreListener.class),
-            anyObject(StreamsUncaughtExceptionHandler.class)
-        ).andReturn(globalStreamThread).anyTimes();
-        
EasyMock.expect(globalStreamThread.state()).andAnswer(globalThreadState::get).anyTimes();
-        
globalStreamThread.setStateListener(capture(threadStatelistenerCapture));
-        EasyMock.expectLastCall().anyTimes();
-
-        globalStreamThread.start();
-        EasyMock.expectLastCall().andAnswer(() -> {
-            globalThreadState.set(GlobalStreamThread.State.RUNNING);
-            threadStatelistenerCapture.getValue().onChange(globalStreamThread,
-                GlobalStreamThread.State.RUNNING,
-                GlobalStreamThread.State.CREATED);
-            return null;
-        }).anyTimes();
-        globalStreamThread.shutdown();
-        EasyMock.expectLastCall().andAnswer(() -> {
-            supplier.restoreConsumer.close();
 
-            for (final MockProducer<byte[], byte[]> producer : 
supplier.producers) {
-                producer.close();
-            }
-            globalThreadState.set(GlobalStreamThread.State.DEAD);
-            threadStatelistenerCapture.getValue().onChange(globalStreamThread,
-                GlobalStreamThread.State.PENDING_SHUTDOWN,
-                GlobalStreamThread.State.RUNNING);
-            threadStatelistenerCapture.getValue().onChange(globalStreamThread,
-                GlobalStreamThread.State.DEAD,
-                GlobalStreamThread.State.PENDING_SHUTDOWN);
-            return null;
-        }).anyTimes();
-        
EasyMock.expect(globalStreamThread.stillRunning()).andReturn(globalThreadState.get()
 == GlobalStreamThread.State.RUNNING).anyTimes();
-        globalStreamThread.join();
-        EasyMock.expectLastCall().anyTimes();
-
-        PowerMock.replay(
-            StreamThread.class,
-            Metrics.class,
-            metrics,
-            ClientMetrics.class,
-            streamThreadOne,
-            streamThreadTwo,
-            GlobalStreamThread.class,
-            globalStreamThread
-        );
+        globalStreamThreadMockedConstruction = 
mockConstruction(GlobalStreamThread.class,
+                (mock, context) -> {
+                    when(mock.state()).thenAnswer(invocation -> 
globalThreadState.get());
+                    
doNothing().when(mock).setStateListener(threadStateListenerCapture.capture());
+                    doAnswer(invocation -> {
+                        
globalThreadState.set(GlobalStreamThread.State.RUNNING);
+                        threadStateListenerCapture.getValue().onChange(mock,
+                                GlobalStreamThread.State.RUNNING,
+                                GlobalStreamThread.State.CREATED);
+                        return null;
+                    }).when(mock).start();
+                    doAnswer(invocation -> {
+                        supplier.restoreConsumer.close();
+
+                        for (final MockProducer<byte[], byte[]> producer : 
supplier.producers) {
+                            producer.close();
+                        }
+                        globalThreadState.set(GlobalStreamThread.State.DEAD);
+                        threadStateListenerCapture.getValue().onChange(mock,
+                                GlobalStreamThread.State.PENDING_SHUTDOWN,
+                                GlobalStreamThread.State.RUNNING);
+                        threadStateListenerCapture.getValue().onChange(mock,
+                                GlobalStreamThread.State.DEAD,
+                                GlobalStreamThread.State.PENDING_SHUTDOWN);
+                        return null;
+                    }).when(mock).shutdown();
+                    
when(mock.stillRunning()).thenReturn(globalThreadState.get() == 
GlobalStreamThread.State.RUNNING);
+                });
+
     }
 
     private void prepareStreamThread(final StreamThread thread,
                                      final int threadId,
                                      final boolean terminable) throws 
Exception {
         final AtomicReference<StreamThread.State> state = new 
AtomicReference<>(StreamThread.State.CREATED);
-        EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes();
+        when(thread.state()).thenAnswer(invocation -> state.get());
 
-        thread.setStateListener(capture(threadStatelistenerCapture));
-        EasyMock.expectLastCall().anyTimes();
+        
doNothing().when(thread).setStateListener(threadStateListenerCapture.capture());
 
-        EasyMock.expect(thread.getStateLock()).andReturn(new 
Object()).anyTimes();
+        when(thread.getStateLock()).thenReturn(new Object());
 
-        thread.start();
-        EasyMock.expectLastCall().andAnswer(() -> {
+        doAnswer(invocation -> {
             state.set(StreamThread.State.STARTING);
-            threadStatelistenerCapture.getValue().onChange(thread,
-                StreamThread.State.STARTING,
-                StreamThread.State.CREATED);
-            threadStatelistenerCapture.getValue().onChange(thread,
-                StreamThread.State.PARTITIONS_REVOKED,
-                StreamThread.State.STARTING);
-            threadStatelistenerCapture.getValue().onChange(thread,
-                StreamThread.State.PARTITIONS_ASSIGNED,
-                StreamThread.State.PARTITIONS_REVOKED);
-            threadStatelistenerCapture.getValue().onChange(thread,
-                StreamThread.State.RUNNING,
-                StreamThread.State.PARTITIONS_ASSIGNED);
+            threadStateListenerCapture.getValue().onChange(thread,
+                    StreamThread.State.STARTING,

Review comment:
       redundant indent?

##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -61,18 +59,18 @@
 import org.apache.kafka.test.MockMetricsReporter;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.TestUtils;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.After;
 import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;

Review comment:
       Is `powermock` still used by streams module? If not, could you remove it 
from build.gradle?

##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -939,6 +939,11 @@ private StreamThread createAndAddStreamThread(final long 
cacheSizePerThread, fin
         return streamThread;
     }
 
+    //For KafkaStreamsTest

Review comment:
       Why? Could you please add more details?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to