cadonna commented on code in PR #15261: URL: https://github.com/apache/kafka/pull/15261#discussion_r1471396786
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2332,19 +2335,12 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { task00.setCommittableOffsetsAndMetadata(offsets); // first `handleAssignment` - expectRestoreToBeCompleted(consumer); - when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); - expectLastCall(); - - // `handleRevocation` - consumer.commitSync(offsets); - expectLastCall(); + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); + when(mockitoConsumer.assignment()).thenReturn(assignment); - // second `handleAssignment` - consumer.commitSync(offsets); - expectLastCall(); + when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); - replay(consumer); + taskManager.setMainConsumer(mockitoConsumer); Review Comment: That could be set in `setupTaskManager()`. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2016,13 +2015,14 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02))); handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap()); - reset(consumer); - expectConsumerAssignmentPaused(consumer); - replay(consumer); taskManager.handleRebalanceComplete(); assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01))); verify(stateDirectory); + + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); Review Comment: This assignment is duplicated a couple of times. Could you try to avoid that duplication? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2369,12 +2366,9 @@ public void closeClean() { } }; - // first `handleAssignment` - expectRestoreToBeCompleted(consumer); when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); - expectLastCall(); - replay(consumer); + taskManager.setMainConsumer(mockitoConsumer); Review Comment: Set in `setupTaskManager()`. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2210,6 +2210,9 @@ public void shouldComputeOffsetSumForStandbyTask() throws Exception { restoringTask.setChangelogOffsets(changelogOffsets); assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums)); + + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); + Mockito.verify(mockitoConsumer).resume(assignment); Review Comment: I do not think we need this verification. Method `resume()` is called in some setup code of the test. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -4821,8 +4911,10 @@ private Map<TaskId, StateMachineTask> handleAssignment(final Map<TaskId, Set<Top when(standbyTaskCreator.createTasks(standbyAssignment)).thenReturn(standbyTasks); when(activeTaskCreator.createTasks(any(), Mockito.eq(allActiveTasksAssignment))).thenReturn(allActiveTasks); - expectRestoreToBeCompleted(consumer); - replay(consumer); + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); + lenient().when(mockitoConsumer.assignment()).thenReturn(assignment); + + taskManager.setMainConsumer(mockitoConsumer); Review Comment: Why did you not modify this call in `setUpTaskManager()`? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2332,19 +2335,12 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { task00.setCommittableOffsetsAndMetadata(offsets); // first `handleAssignment` - expectRestoreToBeCompleted(consumer); - when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); - expectLastCall(); - - // `handleRevocation` - consumer.commitSync(offsets); - expectLastCall(); + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); + when(mockitoConsumer.assignment()).thenReturn(assignment); Review Comment: I do not think you need this in this test. This call is not relevant for this test. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2358,6 +2354,7 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap()); assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap()); Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00); + Mockito.verify(mockitoConsumer).resume(assignment); Review Comment: I do not think you need this in this test. This call is not relevant for this test. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2437,6 +2432,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { assertThat(taskManager.lockedTaskDirectories(), is(emptySet())); Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00); + Mockito.verify(mockitoConsumer).resume(assignment); Review Comment: Not needed because not relevant for this test. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2399,7 +2393,8 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() throws Exception { final StateMachineTask task01 = new StateMachineTask(taskId01, taskId01Partitions, false, stateManager); // `handleAssignment` - expectRestoreToBeCompleted(consumer); + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); + when(mockitoConsumer.assignment()).thenReturn(assignment); Review Comment: Not needed because not relevant for this test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org