cadonna commented on code in PR #15261:
URL: https://github.com/apache/kafka/pull/15261#discussion_r1471396786


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2332,19 +2335,12 @@ public void 
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
         task00.setCommittableOffsetsAndMetadata(offsets);
 
         // first `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
-        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expectLastCall();
-
-        // `handleRevocation`
-        consumer.commitSync(offsets);
-        expectLastCall();
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        when(mockitoConsumer.assignment()).thenReturn(assignment);
 
-        // second `handleAssignment`
-        consumer.commitSync(offsets);
-        expectLastCall();
+        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
 
-        replay(consumer);
+        taskManager.setMainConsumer(mockitoConsumer);

Review Comment:
   That could be set in `setupTaskManager()`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2016,13 +2015,14 @@ public void 
shouldReleaseLockForUnassignedTasksAfterRebalance() throws Exception
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, 
taskId01, taskId02)));
 
         handleAssignment(taskId00Assignment, taskId01Assignment, emptyMap());
-        reset(consumer);
-        expectConsumerAssignmentPaused(consumer);
-        replay(consumer);
 
         taskManager.handleRebalanceComplete();
         assertThat(taskManager.lockedTaskDirectories(), is(mkSet(taskId00, 
taskId01)));
         verify(stateDirectory);
+
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));

Review Comment:
   This assignment is duplicated a couple of times. Could you try to avoid that 
duplication?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2369,12 +2366,9 @@ public void closeClean() {
             }
         };
 
-        // first `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
         when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expectLastCall();
 
-        replay(consumer);
+        taskManager.setMainConsumer(mockitoConsumer);

Review Comment:
   Set in `setupTaskManager()`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2210,6 +2210,9 @@ public void shouldComputeOffsetSumForStandbyTask() throws 
Exception {
         restoringTask.setChangelogOffsets(changelogOffsets);
 
         assertThat(taskManager.getTaskOffsetSums(), is(expectedOffsetSums));
+
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        Mockito.verify(mockitoConsumer).resume(assignment);

Review Comment:
   I do not think we need this verification. Method `resume()` is called in 
some setup code of the test.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -4821,8 +4911,10 @@ private Map<TaskId, StateMachineTask> 
handleAssignment(final Map<TaskId, Set<Top
         
when(standbyTaskCreator.createTasks(standbyAssignment)).thenReturn(standbyTasks);
         when(activeTaskCreator.createTasks(any(), 
Mockito.eq(allActiveTasksAssignment))).thenReturn(allActiveTasks);
 
-        expectRestoreToBeCompleted(consumer);
-        replay(consumer);
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        lenient().when(mockitoConsumer.assignment()).thenReturn(assignment);
+
+        taskManager.setMainConsumer(mockitoConsumer);

Review Comment:
   Why did you not modify this call in `setUpTaskManager()`?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2332,19 +2335,12 @@ public void 
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
         task00.setCommittableOffsetsAndMetadata(offsets);
 
         // first `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
-        when(activeTaskCreator.createTasks(any(), 
Mockito.eq(taskId00Assignment))).thenReturn(singletonList(task00));
-        expectLastCall();
-
-        // `handleRevocation`
-        consumer.commitSync(offsets);
-        expectLastCall();
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        when(mockitoConsumer.assignment()).thenReturn(assignment);

Review Comment:
   I do not think you need this in this test. This call is not relevant for 
this test. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2358,6 +2354,7 @@ public void 
shouldCloseActiveUnassignedSuspendedTasksWhenClosingRevokedTasks() {
         assertThat(taskManager.activeTaskMap(), Matchers.anEmptyMap());
         assertThat(taskManager.standbyTaskMap(), Matchers.anEmptyMap());
         
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+        Mockito.verify(mockitoConsumer).resume(assignment);

Review Comment:
   I do not think you need this in this test. This call is not relevant for 
this test.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2437,6 +2432,7 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() 
throws Exception {
 
         assertThat(taskManager.lockedTaskDirectories(), is(emptySet()));
         
Mockito.verify(activeTaskCreator).closeAndRemoveTaskProducerIfNeeded(taskId00);
+        Mockito.verify(mockitoConsumer).resume(assignment);

Review Comment:
   Not needed because not relevant for this test.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -2399,7 +2393,8 @@ public void shouldCloseActiveTasksWhenHandlingLostTasks() 
throws Exception {
         final StateMachineTask task01 = new StateMachineTask(taskId01, 
taskId01Partitions, false, stateManager);
 
         // `handleAssignment`
-        expectRestoreToBeCompleted(consumer);
+        final Set<TopicPartition> assignment = singleton(new 
TopicPartition("assignment", 0));
+        when(mockitoConsumer.assignment()).thenReturn(assignment);

Review Comment:
   Not needed because not relevant for this test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to