cadonna commented on code in PR #15261: URL: https://github.com/apache/kafka/pull/15261#discussion_r1512305606
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2016,13 +2015,14 @@ public void shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01, taskId02))); handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap()); - reset(consumer); - expectConsumerAssignmentPaused(consumer); - replay(consumer); taskManager.handleRebalanceComplete(); assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, taskId01))); verify(stateDirectory); + + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); Review Comment: I see `singleton(new TopicPartition("assignment", 0))` added around 45 times in this PR. I assume this is because `handleAssignment()` uses it internally. I propose to define a field final static variable `assignment` or similar that can be reused. Alternatively, the assignment can also be passed to `handleAssignment()`. Maybe to limit the changes in this PR, you should go for the former option of the field variable. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2210,6 +2210,9 @@ public void shouldComputeOffsetSumForStandbyTask() throws Exception { restoringTask.setChangelogOffsets(changelogOffsets); assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums)); + + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); + Mockito.verify(mockitoConsumer).resume(assignment); Review Comment: With EasyMock you need to call `verify()` to verify a call. I do not see a call to `verify()` in the old code. That tells me that verifying the call to `consumer.resume()` was not relevant in the old code of this test. Additionally, the goal of this test, is to verify the result of `taskManager.getTaskOffsetSums()`. For that you need some tasks assigned. So part of the setup of this test is to install an assignment so that you get tasks assigned. With `Mockito.verify(mockitoConsumer).resume(assignment)` you would test rather an unimportant part of the setup of this test. To sum up, I have the impression that you verify more in the migration that it was verified in the old code and you added a rather unimportant verification. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2332,19 +2335,12 @@ public void shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() { task00.setCommittableOffsetsAndMetadata(offsets); // first `handleAssignment` - expectRestoreToBeCompleted(consumer); - when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); - expectLastCall(); - - // `handleRevocation` - consumer.commitSync(offsets); - expectLastCall(); + final Set<TopicPartition> assignment = singleton(new TopicPartition("assignment", 0)); + when(mockitoConsumer.assignment()).thenReturn(assignment); - // second `handleAssignment` - consumer.commitSync(offsets); - expectLastCall(); + when(activeTaskCreator.createTasks(any(), Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00)); - replay(consumer); + taskManager.setMainConsumer(mockitoConsumer); Review Comment: Fair enough! Waiting for your follow-up PR then. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org