cadonna commented on code in PR #12821: URL: https://github.com/apache/kafka/pull/12821#discussion_r1046120121
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -42,77 +40,56 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static org.easymock.EasyMock.createStrictControl; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(Utils.class) +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class StateManagerUtilTest { - @Mock(type = MockType.NICE) + @Mock private ProcessorStateManager stateManager; - @Mock(type = MockType.NICE) + @Mock private StateDirectory stateDirectory; - @Mock(type = MockType.NICE) + @Mock private ProcessorTopology topology; - @Mock(type = MockType.NICE) + @Mock private InternalProcessorContext processorContext; - private IMocksControl ctrl; - private Logger logger = new LogContext("test").logger(AbstractTask.class); private final TaskId taskId = new TaskId(0, 0); - @Before - public void setup() { - ctrl = createStrictControl(); - topology = ctrl.createMock(ProcessorTopology.class); - processorContext = ctrl.createMock(InternalProcessorContext.class); - - stateManager = ctrl.createMock(ProcessorStateManager.class); - stateDirectory = ctrl.createMock(StateDirectory.class); - } - @Test public void testRegisterStateStoreWhenTopologyEmpty() { - expect(topology.stateStores()).andReturn(emptyList()); - - ctrl.checkOrder(true); - ctrl.replay(); + when(topology.stateStores()).thenReturn(emptyList()); StateManagerUtil.registerStateStores(logger, "logPrefix:", topology, stateManager, stateDirectory, processorContext); - - ctrl.verify(); } @Test public void testRegisterStateStoreFailToLockStateDirectory() { - expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false))); - - expect(stateManager.taskId()).andReturn(taskId); + when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false))); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -42,77 +40,56 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static org.easymock.EasyMock.createStrictControl; -import static org.easymock.EasyMock.expect; -import static org.easymock.EasyMock.expectLastCall; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; -import static org.powermock.api.easymock.PowerMock.mockStatic; -import static org.powermock.api.easymock.PowerMock.replayAll; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(Utils.class) +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.StrictStubs.class) public class StateManagerUtilTest { - @Mock(type = MockType.NICE) + @Mock private ProcessorStateManager stateManager; - @Mock(type = MockType.NICE) + @Mock private StateDirectory stateDirectory; - @Mock(type = MockType.NICE) + @Mock private ProcessorTopology topology; - @Mock(type = MockType.NICE) + @Mock private InternalProcessorContext processorContext; - private IMocksControl ctrl; - private Logger logger = new LogContext("test").logger(AbstractTask.class); private final TaskId taskId = new TaskId(0, 0); - @Before - public void setup() { - ctrl = createStrictControl(); - topology = ctrl.createMock(ProcessorTopology.class); - processorContext = ctrl.createMock(InternalProcessorContext.class); - - stateManager = ctrl.createMock(ProcessorStateManager.class); - stateDirectory = ctrl.createMock(StateDirectory.class); - } - @Test public void testRegisterStateStoreWhenTopologyEmpty() { - expect(topology.stateStores()).andReturn(emptyList()); - - ctrl.checkOrder(true); - ctrl.replay(); + when(topology.stateStores()).thenReturn(emptyList()); StateManagerUtil.registerStateStores(logger, "logPrefix:", topology, stateManager, stateDirectory, processorContext); - - ctrl.verify(); } @Test public void testRegisterStateStoreFailToLockStateDirectory() { - expect(topology.stateStores()).andReturn(singletonList(new MockKeyValueStore("store", false))); - - expect(stateManager.taskId()).andReturn(taskId); + when(topology.stateStores()).thenReturn(singletonList(new MockKeyValueStore("store", false))); - expect(stateDirectory.lock(taskId)).andReturn(false); + when(stateManager.taskId()).thenReturn(taskId); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -121,64 +98,48 @@ public void testRegisterStateStores() { final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); final List<StateStore> stateStores = Arrays.asList(store1, store2); - expect(topology.stateStores()).andReturn(stateStores); - - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(true); - expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -121,64 +98,48 @@ public void testRegisterStateStores() { final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); final List<StateStore> stateStores = Arrays.asList(store1, store2); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -121,64 +98,48 @@ public void testRegisterStateStores() { final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); final List<StateStore> stateStores = Arrays.asList(store1, store2); - expect(topology.stateStores()).andReturn(stateStores); - - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(true); - expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager); - expect(topology.stateStores()).andReturn(stateStores); + when(topology.stateStores()).thenReturn(stateStores); - stateManager.registerStateStores(stateStores, processorContext); + when(stateManager.taskId()).thenReturn(taskId); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -121,64 +98,48 @@ public void testRegisterStateStores() { final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); final List<StateStore> stateStores = Arrays.asList(store1, store2); - expect(topology.stateStores()).andReturn(stateStores); - - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(true); - expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager); - expect(topology.stateStores()).andReturn(stateStores); + when(topology.stateStores()).thenReturn(stateStores); - stateManager.registerStateStores(stateStores, processorContext); + when(stateManager.taskId()).thenReturn(taskId); - stateManager.initializeStoreOffsetsFromCheckpoint(true); - expectLastCall(); + when(stateDirectory.lock(taskId)).thenReturn(true); + when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true); - ctrl.checkOrder(true); - ctrl.replay(); + when(topology.stateStores()).thenReturn(stateStores); StateManagerUtil.registerStateStores(logger, "logPrefix:", topology, stateManager, stateDirectory, processorContext); - ctrl.verify(); + inOrder.verify(stateManager).registerStateStores(stateStores, processorContext); + inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true); + verifyNoMoreInteractions(stateManager); } @Test public void testCloseStateManagerClean() { - expect(stateManager.taskId()).andReturn(taskId); + final InOrder inOrder = inOrder(stateManager, stateDirectory); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateManager.taskId()).thenReturn(taskId); - stateManager.close(); - expectLastCall(); - - stateDirectory.unlock(taskId); - expectLastCall(); - - ctrl.checkOrder(true); - ctrl.replay(); + when(stateDirectory.lock(taskId)).thenReturn(true); StateManagerUtil.closeStateManager(logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE); - ctrl.verify(); + inOrder.verify(stateManager).close(); + inOrder.verify(stateDirectory).unlock(taskId); + verifyNoMoreInteractions(stateManager, stateDirectory); } @Test public void testCloseStateManagerThrowsExceptionWhenClean() { - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateManager.taskId()).thenReturn(taskId); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -121,64 +98,48 @@ public void testRegisterStateStores() { final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); final List<StateStore> stateStores = Arrays.asList(store1, store2); - expect(topology.stateStores()).andReturn(stateStores); - - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(true); - expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager); - expect(topology.stateStores()).andReturn(stateStores); + when(topology.stateStores()).thenReturn(stateStores); - stateManager.registerStateStores(stateStores, processorContext); + when(stateManager.taskId()).thenReturn(taskId); - stateManager.initializeStoreOffsetsFromCheckpoint(true); - expectLastCall(); + when(stateDirectory.lock(taskId)).thenReturn(true); + when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true); - ctrl.checkOrder(true); - ctrl.replay(); + when(topology.stateStores()).thenReturn(stateStores); StateManagerUtil.registerStateStores(logger, "logPrefix:", topology, stateManager, stateDirectory, processorContext); - ctrl.verify(); + inOrder.verify(stateManager).registerStateStores(stateStores, processorContext); + inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true); + verifyNoMoreInteractions(stateManager); } @Test public void testCloseStateManagerClean() { - expect(stateManager.taskId()).andReturn(taskId); + final InOrder inOrder = inOrder(stateManager, stateDirectory); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateManager.taskId()).thenReturn(taskId); - stateManager.close(); - expectLastCall(); - - stateDirectory.unlock(taskId); - expectLastCall(); - - ctrl.checkOrder(true); - ctrl.replay(); + when(stateDirectory.lock(taskId)).thenReturn(true); StateManagerUtil.closeStateManager(logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE); - ctrl.verify(); + inOrder.verify(stateManager).close(); + inOrder.verify(stateDirectory).unlock(taskId); + verifyNoMoreInteractions(stateManager, stateDirectory); } @Test public void testCloseStateManagerThrowsExceptionWhenClean() { - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateManager.taskId()).thenReturn(taskId); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); + when(stateDirectory.lock(taskId)).thenReturn(true); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateDirectory.lock(taskId)).thenReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - - stateDirectory.unlock(taskId); - - ctrl.checkOrder(true); - ctrl.replay(); + doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager, stateDirectory); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateDirectory.lock(taskId)).thenReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - - stateDirectory.unlock(taskId); - - ctrl.checkOrder(true); - ctrl.replay(); + doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager, stateDirectory); - stateManager.close(); - expectLastCall(); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - - stateDirectory.unlock(taskId); - expectLastCall(); - - ctrl.checkOrder(true); - ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); - ctrl.verify(); + inOrder.verify(stateManager).close(); + inOrder.verify(stateDirectory).unlock(taskId); + verifyNoMoreInteractions(stateManager, stateDirectory); } @Test - public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { + public void shouldStillWipeStateStoresIfCloseThrowsException() { final File randomFile = new File("/random/path"); - mockStatic(Utils.class); - - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("Close failed")); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); - expect(stateManager.baseDir()).andReturn(randomFile); + doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); - Utils.delete(randomFile); + when(stateManager.baseDir()).thenReturn(randomFile); - stateDirectory.unlock(taskId); - expectLastCall(); + try (MockedStatic<Utils> utils = mockStatic(Utils.class)) { + assertThrows(ProcessorStateException.class, () -> + StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); + } - ctrl.checkOrder(true); - ctrl.replay(); - - replayAll(); - - assertThrows(ProcessorStateException.class, () -> - StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test - public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { + public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() { final File unknownFile = new File("/unknown/path"); - mockStatic(Utils.class); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateDirectory.lock(taskId)).thenReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - - stateDirectory.unlock(taskId); - - ctrl.checkOrder(true); - ctrl.replay(); + doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager, stateDirectory); - stateManager.close(); - expectLastCall(); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - - stateDirectory.unlock(taskId); - expectLastCall(); - - ctrl.checkOrder(true); - ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); - ctrl.verify(); + inOrder.verify(stateManager).close(); + inOrder.verify(stateDirectory).unlock(taskId); + verifyNoMoreInteractions(stateManager, stateDirectory); } @Test - public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { + public void shouldStillWipeStateStoresIfCloseThrowsException() { final File randomFile = new File("/random/path"); - mockStatic(Utils.class); - - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("Close failed")); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); - expect(stateManager.baseDir()).andReturn(randomFile); + doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateDirectory.lock(taskId)).thenReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - - stateDirectory.unlock(taskId); - - ctrl.checkOrder(true); - ctrl.replay(); + doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager, stateDirectory); - stateManager.close(); - expectLastCall(); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - - stateDirectory.unlock(taskId); - expectLastCall(); - - ctrl.checkOrder(true); - ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); - ctrl.verify(); + inOrder.verify(stateManager).close(); + inOrder.verify(stateDirectory).unlock(taskId); + verifyNoMoreInteractions(stateManager, stateDirectory); } @Test - public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { + public void shouldStillWipeStateStoresIfCloseThrowsException() { final File randomFile = new File("/random/path"); - mockStatic(Utils.class); - - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("Close failed")); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); - expect(stateManager.baseDir()).andReturn(randomFile); + doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); - Utils.delete(randomFile); + when(stateManager.baseDir()).thenReturn(randomFile); - stateDirectory.unlock(taskId); - expectLastCall(); + try (MockedStatic<Utils> utils = mockStatic(Utils.class)) { + assertThrows(ProcessorStateException.class, () -> + StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); + } - ctrl.checkOrder(true); - ctrl.replay(); - - replayAll(); - - assertThrows(ProcessorStateException.class, () -> - StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test - public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { + public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() { final File unknownFile = new File("/unknown/path"); - mockStatic(Utils.class); - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager, stateDirectory); - stateManager.close(); - expectLastCall(); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); + when(stateManager.baseDir()).thenReturn(unknownFile); - expect(stateManager.baseDir()).andReturn(unknownFile); + try (MockedStatic<Utils> utils = mockStatic(Utils.class)) { + utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed")); - Utils.delete(unknownFile); - expectLastCall().andThrow(new IOException("Deletion failed")); + final ProcessorStateException thrown = assertThrows( + ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - stateDirectory.unlock(taskId); - expectLastCall(); + assertEquals(IOException.class, thrown.getCause().getClass()); + } - ctrl.checkOrder(true); - ctrl.replay(); - - replayAll(); - - final ProcessorStateException thrown = assertThrows( - ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, - "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - - assertEquals(IOException.class, thrown.getCause().getClass()); - - ctrl.verify(); + inOrder.verify(stateManager).close(); + inOrder.verify(stateDirectory).unlock(taskId); + verifyNoMoreInteractions(stateManager, stateDirectory); } @Test - public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() { - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(false); - - stateManager.close(); - expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!")); + public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() { + final InOrder inOrder = inOrder(stateManager, stateDirectory); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +139,92 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateDirectory.lock(taskId)).thenReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - - stateDirectory.unlock(taskId); - - ctrl.checkOrder(true); - ctrl.replay(); + doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - - stateManager.close(); - expectLastCall(); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - - stateDirectory.unlock(taskId); - expectLastCall(); - - ctrl.checkOrder(true); - ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); - ctrl.verify(); + verify(stateManager).close(); + verify(stateDirectory).unlock(taskId); } @Test public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { final File randomFile = new File("/random/path"); - mockStatic(Utils.class); - - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("Close failed")); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); - expect(stateManager.baseDir()).andReturn(randomFile); + doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); - Utils.delete(randomFile); + when(stateManager.baseDir()).thenReturn(randomFile); - stateDirectory.unlock(taskId); - expectLastCall(); + try (MockedStatic<Utils> utils = mockStatic(Utils.class)) { + assertThrows(ProcessorStateException.class, () -> + StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); + } - ctrl.checkOrder(true); - ctrl.replay(); - - replayAll(); - - assertThrows(ProcessorStateException.class, () -> - StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { final File unknownFile = new File("/unknown/path"); - mockStatic(Utils.class); - - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - - stateManager.close(); - expectLastCall(); - - expect(stateManager.baseDir()).andReturn(unknownFile); - Utils.delete(unknownFile); - expectLastCall().andThrow(new IOException("Deletion failed")); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); + when(stateManager.baseDir()).thenReturn(unknownFile); - stateDirectory.unlock(taskId); - expectLastCall(); + try (MockedStatic<Utils> utils = mockStatic(Utils.class)) { + utils.when(() -> Utils.delete(unknownFile)).thenThrow(new IOException("Deletion failed")); - ctrl.checkOrder(true); - ctrl.replay(); + final ProcessorStateException thrown = assertThrows( + ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, + "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - replayAll(); + assertEquals(IOException.class, thrown.getCause().getClass()); + } - final ProcessorStateException thrown = assertThrows( - ProcessorStateException.class, () -> StateManagerUtil.closeStateManager(logger, - "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - - assertEquals(IOException.class, thrown.getCause().getClass()); - - ctrl.verify(); + verify(stateManager).close(); + verify(stateDirectory).unlock(taskId); } @Test - public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() { - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(false); - - stateManager.close(); - expectLastCall().andThrow(new AssertionError("Should not be trying to close state you don't own!")); - - ctrl.checkOrder(true); - ctrl.replay(); - - replayAll(); + public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() { + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(false); StateManagerUtil.closeStateManager( - logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE); - } - - @Test - public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws IOException { Review Comment: I see what you mean, but unit tests should also document the behavior of production code and be robust against code changes. In this case, we have indeed to separate cases: 1. When exactly-once is enabled and the state manager is closed dirty and the lock is not owned then the state manager should not be closed, the directory should not be wiped and the directory should not be unlocked. 2. When exactly-once is disabled and the lock is not owned then the state manager should not be closed and the directory should not be unlocked. So, I think what you did is fine, although would not verify that `stateManager.baseDir()` is never called but that `Utils.delete()` is never called. However, you need to run that test once with: ```java StateManagerUtil.closeStateManager( logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); ``` and once with ```java StateManagerUtil.closeStateManager( logger, "logPrefix:", true, false, stateManager, stateDirectory, TaskType.ACTIVE); ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -121,64 +98,48 @@ public void testRegisterStateStores() { final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); final List<StateStore> stateStores = Arrays.asList(store1, store2); - expect(topology.stateStores()).andReturn(stateStores); - - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(true); - expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager); - expect(topology.stateStores()).andReturn(stateStores); + when(topology.stateStores()).thenReturn(stateStores); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -121,64 +98,48 @@ public void testRegisterStateStores() { final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); final List<StateStore> stateStores = Arrays.asList(store1, store2); - expect(topology.stateStores()).andReturn(stateStores); - - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(true); - expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager); - expect(topology.stateStores()).andReturn(stateStores); + when(topology.stateStores()).thenReturn(stateStores); - stateManager.registerStateStores(stateStores, processorContext); + when(stateManager.taskId()).thenReturn(taskId); - stateManager.initializeStoreOffsetsFromCheckpoint(true); - expectLastCall(); + when(stateDirectory.lock(taskId)).thenReturn(true); + when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -121,64 +98,48 @@ public void testRegisterStateStores() { final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); final List<StateStore> stateStores = Arrays.asList(store1, store2); - expect(topology.stateStores()).andReturn(stateStores); - - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(true); - expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager); - expect(topology.stateStores()).andReturn(stateStores); + when(topology.stateStores()).thenReturn(stateStores); - stateManager.registerStateStores(stateStores, processorContext); + when(stateManager.taskId()).thenReturn(taskId); - stateManager.initializeStoreOffsetsFromCheckpoint(true); - expectLastCall(); + when(stateDirectory.lock(taskId)).thenReturn(true); + when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true); - ctrl.checkOrder(true); - ctrl.replay(); + when(topology.stateStores()).thenReturn(stateStores); StateManagerUtil.registerStateStores(logger, "logPrefix:", topology, stateManager, stateDirectory, processorContext); - ctrl.verify(); + inOrder.verify(stateManager).registerStateStores(stateStores, processorContext); + inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true); + verifyNoMoreInteractions(stateManager); } @Test public void testCloseStateManagerClean() { - expect(stateManager.taskId()).andReturn(taskId); + final InOrder inOrder = inOrder(stateManager, stateDirectory); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateDirectory.lock(taskId)).thenReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - - stateDirectory.unlock(taskId); - - ctrl.checkOrder(true); - ctrl.replay(); + doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager, stateDirectory); - stateManager.close(); - expectLastCall(); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - - stateDirectory.unlock(taskId); - expectLastCall(); - - ctrl.checkOrder(true); - ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); - ctrl.verify(); + inOrder.verify(stateManager).close(); + inOrder.verify(stateDirectory).unlock(taskId); + verifyNoMoreInteractions(stateManager, stateDirectory); } @Test - public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { + public void shouldStillWipeStateStoresIfCloseThrowsException() { final File randomFile = new File("/random/path"); - mockStatic(Utils.class); - - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("Close failed")); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateDirectory.lock(taskId)).thenReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - - stateDirectory.unlock(taskId); - - ctrl.checkOrder(true); - ctrl.replay(); + doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager, stateDirectory); - stateManager.close(); - expectLastCall(); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -121,64 +98,48 @@ public void testRegisterStateStores() { final MockKeyValueStore store2 = new MockKeyValueStore("store2", false); final List<StateStore> stateStores = Arrays.asList(store1, store2); - expect(topology.stateStores()).andReturn(stateStores); - - expect(stateManager.taskId()).andReturn(taskId); - - expect(stateDirectory.lock(taskId)).andReturn(true); - expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager); - expect(topology.stateStores()).andReturn(stateStores); + when(topology.stateStores()).thenReturn(stateStores); - stateManager.registerStateStores(stateStores, processorContext); + when(stateManager.taskId()).thenReturn(taskId); - stateManager.initializeStoreOffsetsFromCheckpoint(true); - expectLastCall(); + when(stateDirectory.lock(taskId)).thenReturn(true); + when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true); - ctrl.checkOrder(true); - ctrl.replay(); + when(topology.stateStores()).thenReturn(stateStores); StateManagerUtil.registerStateStores(logger, "logPrefix:", topology, stateManager, stateDirectory, processorContext); - ctrl.verify(); + inOrder.verify(stateManager).registerStateStores(stateStores, processorContext); + inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true); + verifyNoMoreInteractions(stateManager); } @Test public void testCloseStateManagerClean() { - expect(stateManager.taskId()).andReturn(taskId); + final InOrder inOrder = inOrder(stateManager, stateDirectory); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateManager.taskId()).thenReturn(taskId); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateDirectory.lock(taskId)).thenReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("state manager failed to close")); - - stateDirectory.unlock(taskId); - - ctrl.checkOrder(true); - ctrl.replay(); + doThrow(new ProcessorStateException("state manager failed to close")).when(stateManager).close(); assertThrows( ProcessorStateException.class, () -> StateManagerUtil.closeStateManager( logger, "logPrefix:", false, false, stateManager, stateDirectory, TaskType.ACTIVE)); - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerWithStateStoreWipeOut() { - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager, stateDirectory); - stateManager.close(); - expectLastCall(); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); // The `baseDir` will be accessed when attempting to delete the state store. - expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store")); - - stateDirectory.unlock(taskId); - expectLastCall(); - - ctrl.checkOrder(true); - ctrl.replay(); + when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store")); StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE); - ctrl.verify(); + inOrder.verify(stateManager).close(); + inOrder.verify(stateDirectory).unlock(taskId); + verifyNoMoreInteractions(stateManager, stateDirectory); } @Test - public void shouldStillWipeStateStoresIfCloseThrowsException() throws IOException { + public void shouldStillWipeStateStoresIfCloseThrowsException() { final File randomFile = new File("/random/path"); - mockStatic(Utils.class); - - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); - stateManager.close(); - expectLastCall().andThrow(new ProcessorStateException("Close failed")); + when(stateManager.taskId()).thenReturn(taskId); + when(stateDirectory.lock(taskId)).thenReturn(true); - expect(stateManager.baseDir()).andReturn(randomFile); + doThrow(new ProcessorStateException("Close failed")).when(stateManager).close(); - Utils.delete(randomFile); + when(stateManager.baseDir()).thenReturn(randomFile); - stateDirectory.unlock(taskId); - expectLastCall(); + try (MockedStatic<Utils> utils = mockStatic(Utils.class)) { + assertThrows(ProcessorStateException.class, () -> + StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); + } - ctrl.checkOrder(true); - ctrl.replay(); - - replayAll(); - - assertThrows(ProcessorStateException.class, () -> - StateManagerUtil.closeStateManager(logger, "logPrefix:", false, true, stateManager, stateDirectory, TaskType.ACTIVE)); - - ctrl.verify(); + verify(stateDirectory).unlock(taskId); } @Test - public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws IOException { + public void testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() { final File unknownFile = new File("/unknown/path"); - mockStatic(Utils.class); - expect(stateManager.taskId()).andReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + final InOrder inOrder = inOrder(stateManager, stateDirectory); Review Comment: ```suggestion ``` ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java: ########## @@ -187,151 +148,101 @@ public void testCloseStateManagerThrowsExceptionWhenClean() { // Thrown stateMgr exception will not be wrapped. assertEquals("state manager failed to close", thrown.getMessage()); - ctrl.verify(); + // The unlock logic should still be executed. + verify(stateDirectory).unlock(taskId); } @Test public void testCloseStateManagerThrowsExceptionWhenDirty() { - expect(stateManager.taskId()).andReturn(taskId); + when(stateManager.taskId()).thenReturn(taskId); - expect(stateDirectory.lock(taskId)).andReturn(true); + when(stateDirectory.lock(taskId)).thenReturn(true); Review Comment: ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org