chia7712 commented on a change in pull request #11017:
URL: https://github.com/apache/kafka/pull/11017#discussion_r667361897
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,10 +246,15 @@ State setState(final State newState) {
public boolean isRunning() {
synchronized (stateLock) {
- return state.isAlive();
+ return getIsAlive();
}
}
+ //For KafkaStreamTest
+ public boolean getIsAlive() {
Review comment:
this naming is a bit weird to me. How about `isStateAlive`?
##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -96,13 +94,7 @@
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitForApplicationState;
-import static org.easymock.EasyMock.anyBoolean;
import static org.apache.kafka.test.TestUtils.waitForCondition;
-import static org.easymock.EasyMock.anyInt;
Review comment:
Is `easymock` still used by streams module? If not, could you remove it
from `build.gradle`?
##########
File path:
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
##########
@@ -246,10 +246,15 @@ State setState(final State newState) {
public boolean isRunning() {
synchronized (stateLock) {
- return state.isAlive();
+ return getIsAlive();
}
}
+ //For KafkaStreamTest
Review comment:
ditto. Could you add more description?
##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -182,152 +192,138 @@ public void before() throws Exception {
prepareStreams();
}
+ @After
+ public void tearDown() {
+ kafkaStreamsMockedStatic.close();
+ clientMetricsMockedStatic.close();
+ streamThreadMockedStatic.close();
+ globalStreamThreadMockedConstruction.close();
+ }
+
private void prepareStreams() throws Exception {
// setup metrics
- PowerMock.expectNew(Metrics.class,
- anyObject(MetricConfig.class),
- capture(metricsReportersCapture),
- anyObject(Time.class),
- anyObject(MetricsContext.class)
- ).andAnswer(() -> {
+ kafkaStreamsMockedStatic = mockStatic(KafkaStreams.class,
withSettings()
+ .defaultAnswer(InvocationOnMock::callRealMethod));
+ kafkaStreamsMockedStatic.when(() -> KafkaStreams.createThisMetrics(
+ any(MetricConfig.class),
+ metricsReportersCapture.capture(),
+ any(Time.class),
+ any(MetricsContext.class)
+ )).thenAnswer(invocation -> {
for (final MetricsReporter reporter :
metricsReportersCapture.getValue()) {
reporter.init(Collections.emptyList());
}
return metrics;
- }).anyTimes();
- metrics.close();
- EasyMock.expectLastCall().andAnswer(() -> {
+ });
+
+ doAnswer(invocation -> {
for (final MetricsReporter reporter :
metricsReportersCapture.getValue()) {
reporter.close();
}
return null;
- }).anyTimes();
-
- PowerMock.mockStatic(ClientMetrics.class);
- EasyMock.expect(ClientMetrics.version()).andReturn("1.56");
- EasyMock.expect(ClientMetrics.commitId()).andReturn("1a2b3c4d5e");
- ClientMetrics.addVersionMetric(anyObject(StreamsMetricsImpl.class));
- ClientMetrics.addCommitIdMetric(anyObject(StreamsMetricsImpl.class));
-
ClientMetrics.addApplicationIdMetric(anyObject(StreamsMetricsImpl.class),
EasyMock.eq(APPLICATION_ID));
-
ClientMetrics.addTopologyDescriptionMetric(anyObject(StreamsMetricsImpl.class),
anyString());
- ClientMetrics.addStateMetric(anyObject(StreamsMetricsImpl.class),
anyObject());
-
ClientMetrics.addNumAliveStreamThreadMetric(anyObject(StreamsMetricsImpl.class),
anyObject());
+ }).when(metrics).close();
+
+ clientMetricsMockedStatic = mockStatic(ClientMetrics.class);
+
clientMetricsMockedStatic.when(ClientMetrics::version).thenReturn("1.56");
+
clientMetricsMockedStatic.when(ClientMetrics::commitId).thenReturn("1a2b3c4d5e");
+ ClientMetrics.addVersionMetric(any(StreamsMetricsImpl.class));
+ ClientMetrics.addCommitIdMetric(any(StreamsMetricsImpl.class));
+ ClientMetrics.addApplicationIdMetric(any(StreamsMetricsImpl.class),
eq(APPLICATION_ID));
+
ClientMetrics.addTopologyDescriptionMetric(any(StreamsMetricsImpl.class),
any());
+ ClientMetrics.addStateMetric(any(StreamsMetricsImpl.class), any());
+
ClientMetrics.addNumAliveStreamThreadMetric(any(StreamsMetricsImpl.class),
any());
// setup stream threads
- PowerMock.mockStatic(StreamThread.class);
- EasyMock.expect(StreamThread.create(
- anyObject(InternalTopologyBuilder.class),
- anyObject(StreamsConfig.class),
- anyObject(KafkaClientSupplier.class),
- anyObject(Admin.class),
- anyObject(UUID.class),
- anyObject(String.class),
- anyObject(StreamsMetricsImpl.class),
- anyObject(Time.class),
- anyObject(StreamsMetadataState.class),
- anyLong(),
- anyObject(StateDirectory.class),
- anyObject(StateRestoreListener.class),
- anyInt(),
- anyObject(Runnable.class),
- anyObject()
- )).andReturn(streamThreadOne).andReturn(streamThreadTwo);
-
-
EasyMock.expect(StreamThread.eosEnabled(anyObject(StreamsConfig.class))).andReturn(false).anyTimes();
-
EasyMock.expect(StreamThread.processingMode(anyObject(StreamsConfig.class))).andReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE).anyTimes();
- EasyMock.expect(streamThreadOne.getId()).andReturn(1L).anyTimes();
- EasyMock.expect(streamThreadTwo.getId()).andReturn(2L).anyTimes();
+ streamThreadMockedStatic = mockStatic(StreamThread.class);
+ streamThreadMockedStatic.when(() -> StreamThread.create(
+ any(InternalTopologyBuilder.class),
+ any(StreamsConfig.class),
+ any(KafkaClientSupplier.class),
+ any(Admin.class),
+ any(UUID.class),
+ any(String.class),
+ any(StreamsMetricsImpl.class),
+ any(Time.class),
+ any(StreamsMetadataState.class),
+ anyLong(),
+ any(StateDirectory.class),
+ any(StateRestoreListener.class),
+ anyInt(),
+ any(Runnable.class),
+ any()
+ )).thenReturn(streamThreadOne).thenReturn(streamThreadTwo);
+
+ streamThreadMockedStatic.when(() ->
StreamThread.eosEnabled(any(StreamsConfig.class))).thenReturn(false);
+ streamThreadMockedStatic.when(() ->
StreamThread.processingMode(any(StreamsConfig.class))).thenReturn(StreamThread.ProcessingMode.AT_LEAST_ONCE);
+ when(streamThreadOne.getId()).thenReturn(1L);
+ when(streamThreadTwo.getId()).thenReturn(2L);
+
prepareStreamThread(streamThreadOne, 1, true);
prepareStreamThread(streamThreadTwo, 2, false);
// setup global threads
final AtomicReference<GlobalStreamThread.State> globalThreadState =
new AtomicReference<>(GlobalStreamThread.State.CREATED);
- PowerMock.expectNew(GlobalStreamThread.class,
- anyObject(ProcessorTopology.class),
- anyObject(StreamsConfig.class),
- anyObject(Consumer.class),
- anyObject(StateDirectory.class),
- anyLong(),
- anyObject(StreamsMetricsImpl.class),
- anyObject(Time.class),
- anyString(),
- anyObject(StateRestoreListener.class),
- anyObject(StreamsUncaughtExceptionHandler.class)
- ).andReturn(globalStreamThread).anyTimes();
-
EasyMock.expect(globalStreamThread.state()).andAnswer(globalThreadState::get).anyTimes();
-
globalStreamThread.setStateListener(capture(threadStatelistenerCapture));
- EasyMock.expectLastCall().anyTimes();
-
- globalStreamThread.start();
- EasyMock.expectLastCall().andAnswer(() -> {
- globalThreadState.set(GlobalStreamThread.State.RUNNING);
- threadStatelistenerCapture.getValue().onChange(globalStreamThread,
- GlobalStreamThread.State.RUNNING,
- GlobalStreamThread.State.CREATED);
- return null;
- }).anyTimes();
- globalStreamThread.shutdown();
- EasyMock.expectLastCall().andAnswer(() -> {
- supplier.restoreConsumer.close();
- for (final MockProducer<byte[], byte[]> producer :
supplier.producers) {
- producer.close();
- }
- globalThreadState.set(GlobalStreamThread.State.DEAD);
- threadStatelistenerCapture.getValue().onChange(globalStreamThread,
- GlobalStreamThread.State.PENDING_SHUTDOWN,
- GlobalStreamThread.State.RUNNING);
- threadStatelistenerCapture.getValue().onChange(globalStreamThread,
- GlobalStreamThread.State.DEAD,
- GlobalStreamThread.State.PENDING_SHUTDOWN);
- return null;
- }).anyTimes();
-
EasyMock.expect(globalStreamThread.stillRunning()).andReturn(globalThreadState.get()
== GlobalStreamThread.State.RUNNING).anyTimes();
- globalStreamThread.join();
- EasyMock.expectLastCall().anyTimes();
-
- PowerMock.replay(
- StreamThread.class,
- Metrics.class,
- metrics,
- ClientMetrics.class,
- streamThreadOne,
- streamThreadTwo,
- GlobalStreamThread.class,
- globalStreamThread
- );
+ globalStreamThreadMockedConstruction =
mockConstruction(GlobalStreamThread.class,
+ (mock, context) -> {
+ when(mock.state()).thenAnswer(invocation ->
globalThreadState.get());
+
doNothing().when(mock).setStateListener(threadStateListenerCapture.capture());
+ doAnswer(invocation -> {
+
globalThreadState.set(GlobalStreamThread.State.RUNNING);
+ threadStateListenerCapture.getValue().onChange(mock,
+ GlobalStreamThread.State.RUNNING,
+ GlobalStreamThread.State.CREATED);
+ return null;
+ }).when(mock).start();
+ doAnswer(invocation -> {
+ supplier.restoreConsumer.close();
+
+ for (final MockProducer<byte[], byte[]> producer :
supplier.producers) {
+ producer.close();
+ }
+ globalThreadState.set(GlobalStreamThread.State.DEAD);
+ threadStateListenerCapture.getValue().onChange(mock,
+ GlobalStreamThread.State.PENDING_SHUTDOWN,
+ GlobalStreamThread.State.RUNNING);
+ threadStateListenerCapture.getValue().onChange(mock,
+ GlobalStreamThread.State.DEAD,
+ GlobalStreamThread.State.PENDING_SHUTDOWN);
+ return null;
+ }).when(mock).shutdown();
+
when(mock.stillRunning()).thenReturn(globalThreadState.get() ==
GlobalStreamThread.State.RUNNING);
+ });
+
}
private void prepareStreamThread(final StreamThread thread,
final int threadId,
final boolean terminable) throws
Exception {
final AtomicReference<StreamThread.State> state = new
AtomicReference<>(StreamThread.State.CREATED);
- EasyMock.expect(thread.state()).andAnswer(state::get).anyTimes();
+ when(thread.state()).thenAnswer(invocation -> state.get());
- thread.setStateListener(capture(threadStatelistenerCapture));
- EasyMock.expectLastCall().anyTimes();
+
doNothing().when(thread).setStateListener(threadStateListenerCapture.capture());
- EasyMock.expect(thread.getStateLock()).andReturn(new
Object()).anyTimes();
+ when(thread.getStateLock()).thenReturn(new Object());
- thread.start();
- EasyMock.expectLastCall().andAnswer(() -> {
+ doAnswer(invocation -> {
state.set(StreamThread.State.STARTING);
- threadStatelistenerCapture.getValue().onChange(thread,
- StreamThread.State.STARTING,
- StreamThread.State.CREATED);
- threadStatelistenerCapture.getValue().onChange(thread,
- StreamThread.State.PARTITIONS_REVOKED,
- StreamThread.State.STARTING);
- threadStatelistenerCapture.getValue().onChange(thread,
- StreamThread.State.PARTITIONS_ASSIGNED,
- StreamThread.State.PARTITIONS_REVOKED);
- threadStatelistenerCapture.getValue().onChange(thread,
- StreamThread.State.RUNNING,
- StreamThread.State.PARTITIONS_ASSIGNED);
+ threadStateListenerCapture.getValue().onChange(thread,
+ StreamThread.State.STARTING,
Review comment:
redundant indent?
##########
File path: streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
##########
@@ -61,18 +59,18 @@
import org.apache.kafka.test.MockMetricsReporter;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.TestUtils;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
+import org.junit.After;
import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
Review comment:
Is `powermock` still used by streams module? If not, could you remove it
from build.gradle?
##########
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##########
@@ -939,6 +939,11 @@ private StreamThread createAndAddStreamThread(final long
cacheSizePerThread, fin
return streamThread;
}
+ //For KafkaStreamsTest
Review comment:
Why? Could you please add more details?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]