cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1046120121
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +40,56 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class StateManagerUtilTest {
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorStateManager stateManager;
- @Mock(type = MockType.NICE)
+ @Mock
private StateDirectory stateDirectory;
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorTopology topology;
- @Mock(type = MockType.NICE)
+ @Mock
private InternalProcessorContext processorContext;
- private IMocksControl ctrl;
-
private Logger logger = new LogContext("test").logger(AbstractTask.class);
private final TaskId taskId = new TaskId(0, 0);
- @Before
- public void setup() {
- ctrl = createStrictControl();
- topology = ctrl.createMock(ProcessorTopology.class);
- processorContext = ctrl.createMock(InternalProcessorContext.class);
-
- stateManager = ctrl.createMock(ProcessorStateManager.class);
- stateDirectory = ctrl.createMock(StateDirectory.class);
- }
-
@Test
public void testRegisterStateStoreWhenTopologyEmpty() {
- expect(topology.stateStores()).andReturn(emptyList());
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(emptyList());
StateManagerUtil.registerStateStores(logger,
"logPrefix:", topology, stateManager, stateDirectory,
processorContext);
-
- ctrl.verify();
}
@Test
public void testRegisterStateStoreFailToLockStateDirectory() {
- expect(topology.stateStores()).andReturn(singletonList(new
MockKeyValueStore("store", false)));
-
- expect(stateManager.taskId()).andReturn(taskId);
+ when(topology.stateStores()).thenReturn(singletonList(new
MockKeyValueStore("store", false)));
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +40,56 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
public class StateManagerUtilTest {
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorStateManager stateManager;
- @Mock(type = MockType.NICE)
+ @Mock
private StateDirectory stateDirectory;
- @Mock(type = MockType.NICE)
+ @Mock
private ProcessorTopology topology;
- @Mock(type = MockType.NICE)
+ @Mock
private InternalProcessorContext processorContext;
- private IMocksControl ctrl;
-
private Logger logger = new LogContext("test").logger(AbstractTask.class);
private final TaskId taskId = new TaskId(0, 0);
- @Before
- public void setup() {
- ctrl = createStrictControl();
- topology = ctrl.createMock(ProcessorTopology.class);
- processorContext = ctrl.createMock(InternalProcessorContext.class);
-
- stateManager = ctrl.createMock(ProcessorStateManager.class);
- stateDirectory = ctrl.createMock(StateDirectory.class);
- }
-
@Test
public void testRegisterStateStoreWhenTopologyEmpty() {
- expect(topology.stateStores()).andReturn(emptyList());
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(emptyList());
StateManagerUtil.registerStateStores(logger,
"logPrefix:", topology, stateManager, stateDirectory,
processorContext);
-
- ctrl.verify();
}
@Test
public void testRegisterStateStoreFailToLockStateDirectory() {
- expect(topology.stateStores()).andReturn(singletonList(new
MockKeyValueStore("store", false)));
-
- expect(stateManager.taskId()).andReturn(taskId);
+ when(topology.stateStores()).thenReturn(singletonList(new
MockKeyValueStore("store", false)));
- expect(stateDirectory.lock(taskId)).andReturn(false);
+ when(stateManager.taskId()).thenReturn(taskId);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2",
false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2",
false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2",
false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2",
false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(stateStores);
StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext);
- ctrl.verify();
+ inOrder.verify(stateManager).registerStateStores(stateStores,
processorContext);
+
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+ verifyNoMoreInteractions(stateManager);
}
@Test
public void testCloseStateManagerClean() {
- expect(stateManager.taskId()).andReturn(taskId);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.close();
- expectLastCall();
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
StateManagerUtil.closeStateManager(logger,
"logPrefix:", true, false, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenClean() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateManager.taskId()).thenReturn(taskId);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2",
false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(stateStores);
StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext);
- ctrl.verify();
+ inOrder.verify(stateManager).registerStateStores(stateStores,
processorContext);
+
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+ verifyNoMoreInteractions(stateManager);
}
@Test
public void testCloseStateManagerClean() {
- expect(stateManager.taskId()).andReturn(taskId);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.close();
- expectLastCall();
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
StateManagerUtil.closeStateManager(logger,
"logPrefix:", true, false, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenClean() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
+ when(stateDirectory.lock(taskId)).thenReturn(true);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws
IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close
failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:",
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false,
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws
IOException {
+ public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws
IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close
failed")).when(stateManager).close();
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws
IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close
failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:",
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false,
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws
IOException {
+ public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- expect(stateManager.baseDir()).andReturn(unknownFile);
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new
IOException("Deletion failed"));
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager,
stateDirectory, TaskType.ACTIVE));
- stateDirectory.unlock(taskId);
- expectLastCall();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to
close state you don't own!"));
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void shouldStillWipeStateStoresIfCloseThrowsException() throws
IOException {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close
failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:",
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false,
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws
IOException {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
-
- stateManager.close();
- expectLastCall();
-
- expect(stateManager.baseDir()).andReturn(unknownFile);
- Utils.delete(unknownFile);
- expectLastCall().andThrow(new IOException("Deletion failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateManager.baseDir()).thenReturn(unknownFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ utils.when(() -> Utils.delete(unknownFile)).thenThrow(new
IOException("Deletion failed"));
- ctrl.checkOrder(true);
- ctrl.replay();
+ final ProcessorStateException thrown = assertThrows(
+ ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
+ "logPrefix:", false, true, stateManager,
stateDirectory, TaskType.ACTIVE));
- replayAll();
+ assertEquals(IOException.class, thrown.getCause().getClass());
+ }
- final ProcessorStateException thrown = assertThrows(
- ProcessorStateException.class, () ->
StateManagerUtil.closeStateManager(logger,
- "logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE));
-
- assertEquals(IOException.class, thrown.getCause().getClass());
-
- ctrl.verify();
+ verify(stateManager).close();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(false);
-
- stateManager.close();
- expectLastCall().andThrow(new AssertionError("Should not be trying to
close state you don't own!"));
-
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
+ public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(false);
StateManagerUtil.closeStateManager(
- logger, "logPrefix:", true, false, stateManager, stateDirectory,
TaskType.ACTIVE);
- }
-
- @Test
- public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws
IOException {
Review Comment:
I see what you mean, but unit tests should also document the behavior of
production code and be robust against code changes. In this case, we have
indeed to separate cases:
1. When exactly-once is enabled and the state manager is closed dirty and
the lock is not owned then the state manager should not be closed, the
directory should not be wiped and the directory should not be unlocked.
2. When exactly-once is disabled and the lock is not owned then the state
manager should not be closed and the directory should not be unlocked.
So, I think what you did is fine, although would not verify that
`stateManager.baseDir()` is never called but that `Utils.delete()` is never
called. However, you need to run that test once with:
```java
StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
```
and once with
```java
StateManagerUtil.closeStateManager(
logger, "logPrefix:", true, false, stateManager, stateDirectory,
TaskType.ACTIVE);
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2",
false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2",
false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2",
false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(stateStores);
StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext);
- ctrl.verify();
+ inOrder.verify(stateManager).registerStateStores(stateStores,
processorContext);
+
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+ verifyNoMoreInteractions(stateManager);
}
@Test
public void testCloseStateManagerClean() {
- expect(stateManager.taskId()).andReturn(taskId);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws
IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
final MockKeyValueStore store2 = new MockKeyValueStore("store2",
false);
final List<StateStore> stateStores = Arrays.asList(store1, store2);
- expect(topology.stateStores()).andReturn(stateStores);
-
- expect(stateManager.taskId()).andReturn(taskId);
-
- expect(stateDirectory.lock(taskId)).andReturn(true);
- expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager);
- expect(topology.stateStores()).andReturn(stateStores);
+ when(topology.stateStores()).thenReturn(stateStores);
- stateManager.registerStateStores(stateStores, processorContext);
+ when(stateManager.taskId()).thenReturn(taskId);
- stateManager.initializeStoreOffsetsFromCheckpoint(true);
- expectLastCall();
+ when(stateDirectory.lock(taskId)).thenReturn(true);
+ when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
- ctrl.checkOrder(true);
- ctrl.replay();
+ when(topology.stateStores()).thenReturn(stateStores);
StateManagerUtil.registerStateStores(logger, "logPrefix:",
topology, stateManager, stateDirectory, processorContext);
- ctrl.verify();
+ inOrder.verify(stateManager).registerStateStores(stateStores,
processorContext);
+
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+ verifyNoMoreInteractions(stateManager);
}
@Test
public void testCloseStateManagerClean() {
- expect(stateManager.taskId()).andReturn(taskId);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateManager.taskId()).thenReturn(taskId);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("state manager
failed to close"));
-
- stateDirectory.unlock(taskId);
-
- ctrl.checkOrder(true);
- ctrl.replay();
+ doThrow(new ProcessorStateException("state manager failed to
close")).when(stateManager).close();
assertThrows(
ProcessorStateException.class,
() -> StateManagerUtil.closeStateManager(
logger, "logPrefix:", false, false, stateManager,
stateDirectory, TaskType.ACTIVE));
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerWithStateStoreWipeOut() {
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
- stateManager.close();
- expectLastCall();
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
// The `baseDir` will be accessed when attempting to delete the state
store.
-
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
- stateDirectory.unlock(taskId);
- expectLastCall();
-
- ctrl.checkOrder(true);
- ctrl.replay();
+
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
StateManagerUtil.closeStateManager(logger,
"logPrefix:", false, true, stateManager, stateDirectory,
TaskType.ACTIVE);
- ctrl.verify();
+ inOrder.verify(stateManager).close();
+ inOrder.verify(stateDirectory).unlock(taskId);
+ verifyNoMoreInteractions(stateManager, stateDirectory);
}
@Test
- public void shouldStillWipeStateStoresIfCloseThrowsException() throws
IOException {
+ public void shouldStillWipeStateStoresIfCloseThrowsException() {
final File randomFile = new File("/random/path");
- mockStatic(Utils.class);
-
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
- stateManager.close();
- expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
- expect(stateManager.baseDir()).andReturn(randomFile);
+ doThrow(new ProcessorStateException("Close
failed")).when(stateManager).close();
- Utils.delete(randomFile);
+ when(stateManager.baseDir()).thenReturn(randomFile);
- stateDirectory.unlock(taskId);
- expectLastCall();
+ try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+ assertThrows(ProcessorStateException.class, () ->
+ StateManagerUtil.closeStateManager(logger, "logPrefix:",
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+ }
- ctrl.checkOrder(true);
- ctrl.replay();
-
- replayAll();
-
- assertThrows(ProcessorStateException.class, () ->
- StateManagerUtil.closeStateManager(logger, "logPrefix:", false,
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
- ctrl.verify();
+ verify(stateDirectory).unlock(taskId);
}
@Test
- public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws
IOException {
+ public void
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
final File unknownFile = new File("/unknown/path");
- mockStatic(Utils.class);
- expect(stateManager.taskId()).andReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ final InOrder inOrder = inOrder(stateManager, stateDirectory);
Review Comment:
```suggestion
```
##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void
testCloseStateManagerThrowsExceptionWhenClean() {
// Thrown stateMgr exception will not be wrapped.
assertEquals("state manager failed to close", thrown.getMessage());
- ctrl.verify();
+ // The unlock logic should still be executed.
+ verify(stateDirectory).unlock(taskId);
}
@Test
public void testCloseStateManagerThrowsExceptionWhenDirty() {
- expect(stateManager.taskId()).andReturn(taskId);
+ when(stateManager.taskId()).thenReturn(taskId);
- expect(stateDirectory.lock(taskId)).andReturn(true);
+ when(stateDirectory.lock(taskId)).thenReturn(true);
Review Comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]