cadonna commented on code in PR #12821:
URL: https://github.com/apache/kafka/pull/12821#discussion_r1046120121


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +40,56 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorTopology topology;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext processorContext;
 
-    private IMocksControl ctrl;
-
     private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
     private final TaskId taskId = new TaskId(0, 0);
 
-    @Before
-    public void setup() {
-        ctrl = createStrictControl();
-        topology = ctrl.createMock(ProcessorTopology.class);
-        processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-        stateManager = ctrl.createMock(ProcessorStateManager.class);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
-    }
-
     @Test
     public void testRegisterStateStoreWhenTopologyEmpty() {
-        expect(topology.stateStores()).andReturn(emptyList());
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(emptyList());
 
         StateManagerUtil.registerStateStores(logger,
             "logPrefix:", topology, stateManager, stateDirectory, 
processorContext);
-
-        ctrl.verify();
     }
 
     @Test
     public void testRegisterStateStoreFailToLockStateDirectory() {
-        expect(topology.stateStores()).andReturn(singletonList(new 
MockKeyValueStore("store", false)));
-
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(topology.stateStores()).thenReturn(singletonList(new 
MockKeyValueStore("store", false)));
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -42,77 +40,56 @@
 
 import static java.util.Collections.emptyList;
 import static java.util.Collections.singletonList;
-import static org.easymock.EasyMock.createStrictControl;
-import static org.easymock.EasyMock.expect;
-import static org.easymock.EasyMock.expectLastCall;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThrows;
-import static org.powermock.api.easymock.PowerMock.mockStatic;
-import static org.powermock.api.easymock.PowerMock.replayAll;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(Utils.class)
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.StrictStubs.class)
 public class StateManagerUtilTest {
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorStateManager stateManager;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private StateDirectory stateDirectory;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private ProcessorTopology topology;
 
-    @Mock(type = MockType.NICE)
+    @Mock
     private InternalProcessorContext processorContext;
 
-    private IMocksControl ctrl;
-
     private Logger logger = new LogContext("test").logger(AbstractTask.class);
 
     private final TaskId taskId = new TaskId(0, 0);
 
-    @Before
-    public void setup() {
-        ctrl = createStrictControl();
-        topology = ctrl.createMock(ProcessorTopology.class);
-        processorContext = ctrl.createMock(InternalProcessorContext.class);
-
-        stateManager = ctrl.createMock(ProcessorStateManager.class);
-        stateDirectory = ctrl.createMock(StateDirectory.class);
-    }
-
     @Test
     public void testRegisterStateStoreWhenTopologyEmpty() {
-        expect(topology.stateStores()).andReturn(emptyList());
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(emptyList());
 
         StateManagerUtil.registerStateStores(logger,
             "logPrefix:", topology, stateManager, stateDirectory, 
processorContext);
-
-        ctrl.verify();
     }
 
     @Test
     public void testRegisterStateStoreFailToLockStateDirectory() {
-        expect(topology.stateStores()).andReturn(singletonList(new 
MockKeyValueStore("store", false)));
-
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(topology.stateStores()).thenReturn(singletonList(new 
MockKeyValueStore("store", false)));
 
-        expect(stateDirectory.lock(taskId)).andReturn(false);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", 
false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", 
false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", 
false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", 
false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, 
processorContext);
+        
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.close();
-        expectLastCall();
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", true, false, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", 
false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, 
processorContext);
+        
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.close();
-        expectLastCall();
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", true, false, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state 
store.
-        
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
+    public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state 
store.
-        
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state 
store.
-        
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
+    public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
 
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        expect(stateManager.baseDir()).andReturn(unknownFile);
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new 
IOException("Deletion failed"));
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to 
close state you don't own!"));
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +139,92 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state 
store.
-        
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-
-        stateManager.close();
-        expectLastCall();
-
-        expect(stateManager.baseDir()).andReturn(unknownFile);
 
-        Utils.delete(unknownFile);
-        expectLastCall().andThrow(new IOException("Deletion failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateManager.baseDir()).thenReturn(unknownFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            utils.when(() -> Utils.delete(unknownFile)).thenThrow(new 
IOException("Deletion failed"));
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+            final ProcessorStateException thrown = assertThrows(
+                    ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
+                            "logPrefix:", false, true, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        replayAll();
+            assertEquals(IOException.class, thrown.getCause().getClass());
+        }
 
-        final ProcessorStateException thrown = assertThrows(
-            ProcessorStateException.class, () -> 
StateManagerUtil.closeStateManager(logger,
-                "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE));
-
-        assertEquals(IOException.class, thrown.getCause().getClass());
-
-        ctrl.verify();
+        verify(stateManager).close();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void shouldNotCloseStateManagerIfUnableToLockTaskDirectory() {
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(false);
-
-        stateManager.close();
-        expectLastCall().andThrow(new AssertionError("Should not be trying to 
close state you don't own!"));
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
+    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() {
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(false);
 
         StateManagerUtil.closeStateManager(
-            logger, "logPrefix:", true, false, stateManager, stateDirectory, 
TaskType.ACTIVE);
-    }
-
-    @Test
-    public void shouldNotWipeStateStoresIfUnableToLockTaskDirectory() throws 
IOException {

Review Comment:
   I see what you mean, but unit tests should also document the behavior of 
production code and be robust against code changes. In this case, we have 
indeed to separate cases:
   
   1. When exactly-once is enabled and the state manager is closed dirty and 
the lock is not owned then the state manager should not be closed, the 
directory should not be wiped and the directory should not be unlocked.
   2. When exactly-once is disabled and the lock is not owned then the state 
manager should not be closed and the directory should not be unlocked.
   
   So, I think what you did is fine, although would not verify that 
`stateManager.baseDir()` is never called but that `Utils.delete()` is never 
called. However, you need to run that test once with:
   
   ```java
   StateManagerUtil.closeStateManager(
               logger, "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
   ``` 
   
   and once with
   
   ```java
   StateManagerUtil.closeStateManager(
               logger, "logPrefix:", true, false, stateManager, stateDirectory, 
TaskType.ACTIVE);
   ``` 
   



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", 
false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", 
false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", 
false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, 
processorContext);
+        
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state 
store.
-        
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -121,64 +98,48 @@ public void testRegisterStateStores() {
         final MockKeyValueStore store2 = new MockKeyValueStore("store2", 
false);
         final List<StateStore> stateStores = Arrays.asList(store1, store2);
 
-        expect(topology.stateStores()).andReturn(stateStores);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-
-        expect(stateDirectory.lock(taskId)).andReturn(true);
-        expect(stateDirectory.directoryForTaskIsEmpty(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager);
 
-        expect(topology.stateStores()).andReturn(stateStores);
+        when(topology.stateStores()).thenReturn(stateStores);
 
-        stateManager.registerStateStores(stateStores, processorContext);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        stateManager.initializeStoreOffsetsFromCheckpoint(true);
-        expectLastCall();
+        when(stateDirectory.lock(taskId)).thenReturn(true);
+        when(stateDirectory.directoryForTaskIsEmpty(taskId)).thenReturn(true);
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        when(topology.stateStores()).thenReturn(stateStores);
 
         StateManagerUtil.registerStateStores(logger, "logPrefix:",
             topology, stateManager, stateDirectory, processorContext);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).registerStateStores(stateStores, 
processorContext);
+        
inOrder.verify(stateManager).initializeStoreOffsetsFromCheckpoint(true);
+        verifyNoMoreInteractions(stateManager);
     }
 
     @Test
     public void testCloseStateManagerClean() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateManager.taskId()).thenReturn(taskId);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
-
-        stateDirectory.unlock(taskId);
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        doThrow(new ProcessorStateException("state manager failed to 
close")).when(stateManager).close();
 
         assertThrows(
             ProcessorStateException.class,
             () -> StateManagerUtil.closeStateManager(
                 logger, "logPrefix:", false, false, stateManager, 
stateDirectory, TaskType.ACTIVE));
 
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerWithStateStoreWipeOut() {
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 
-        stateManager.close();
-        expectLastCall();
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
         // The `baseDir` will be accessed when attempting to delete the state 
store.
-        
expect(stateManager.baseDir()).andReturn(TestUtils.tempDirectory("state_store"));
-
-        stateDirectory.unlock(taskId);
-        expectLastCall();
-
-        ctrl.checkOrder(true);
-        ctrl.replay();
+        
when(stateManager.baseDir()).thenReturn(TestUtils.tempDirectory("state_store"));
 
         StateManagerUtil.closeStateManager(logger,
             "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
-        ctrl.verify();
+        inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).unlock(taskId);
+        verifyNoMoreInteractions(stateManager, stateDirectory);
     }
 
     @Test
-    public void  shouldStillWipeStateStoresIfCloseThrowsException() throws 
IOException {
+    public void  shouldStillWipeStateStoresIfCloseThrowsException() {
         final File randomFile = new File("/random/path");
-        mockStatic(Utils.class);
-
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
 
-        stateManager.close();
-        expectLastCall().andThrow(new ProcessorStateException("Close failed"));
+        when(stateManager.taskId()).thenReturn(taskId);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 
-        expect(stateManager.baseDir()).andReturn(randomFile);
+        doThrow(new ProcessorStateException("Close 
failed")).when(stateManager).close();
 
-        Utils.delete(randomFile);
+        when(stateManager.baseDir()).thenReturn(randomFile);
 
-        stateDirectory.unlock(taskId);
-        expectLastCall();
+        try (MockedStatic<Utils> utils = mockStatic(Utils.class)) {
+            assertThrows(ProcessorStateException.class, () ->
+                    StateManagerUtil.closeStateManager(logger, "logPrefix:", 
false, true, stateManager, stateDirectory, TaskType.ACTIVE));
+        }
 
-        ctrl.checkOrder(true);
-        ctrl.replay();
-
-        replayAll();
-
-        assertThrows(ProcessorStateException.class, () ->
-            StateManagerUtil.closeStateManager(logger, "logPrefix:", false, 
true, stateManager, stateDirectory, TaskType.ACTIVE));
-
-        ctrl.verify();
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
-    public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() throws 
IOException {
+    public void 
testCloseStateManagerWithStateStoreWipeOutRethrowWrappedIOException() {
         final File unknownFile = new File("/unknown/path");
-        mockStatic(Utils.class);
 
-        expect(stateManager.taskId()).andReturn(taskId);
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        final InOrder inOrder = inOrder(stateManager, stateDirectory);
 

Review Comment:
   ```suggestion
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java:
##########
@@ -187,151 +148,101 @@ public void 
testCloseStateManagerThrowsExceptionWhenClean() {
         // Thrown stateMgr exception will not be wrapped.
         assertEquals("state manager failed to close", thrown.getMessage());
 
-        ctrl.verify();
+        // The unlock logic should still be executed.
+        verify(stateDirectory).unlock(taskId);
     }
 
     @Test
     public void testCloseStateManagerThrowsExceptionWhenDirty() {
-        expect(stateManager.taskId()).andReturn(taskId);
+        when(stateManager.taskId()).thenReturn(taskId);
 
-        expect(stateDirectory.lock(taskId)).andReturn(true);
+        when(stateDirectory.lock(taskId)).thenReturn(true);
 

Review Comment:
   ```suggestion
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to