IgnacioAcunaF commented on a change in pull request #10858:
URL: https://github.com/apache/kafka/pull/10858#discussion_r652242501



##########
File path: core/src/test/scala/unit/kafka/admin/ConsumerGroupServiceTest.scala
##########
@@ -62,6 +62,28 @@ class ConsumerGroupServiceTest {
     verify(admin, times(1)).listOffsets(offsetsArgMatcher, any())
   }
 
+  @Test
+  def testAdminRequestsForDescribeNegativeOffsets(): Unit = {
+    val args = Array("--bootstrap-server", "localhost:9092", "--group", group, 
"--describe", "--offsets")
+    val groupService = consumerGroupService(args)
+
+    
when(admin.describeConsumerGroups(ArgumentMatchers.eq(Collections.singletonList(group)),
 any()))
+      .thenReturn(describeGroupsResult(ConsumerGroupState.STABLE))
+    when(admin.listConsumerGroupOffsets(ArgumentMatchers.eq(group), any()))
+      .thenReturn(listGroupNegativeOffsetsResult)
+    when(admin.listOffsets(offsetsArgMatcher, any()))
+      .thenReturn(listOffsetsResult)
+
+    val (state, assignments) = groupService.collectGroupOffsets(group)
+    assertEquals(Some("Stable"), state)
+    assertTrue(assignments.nonEmpty)
+    assertEquals(topicPartitions.size, assignments.get.size)

Review comment:
       Yes, i could add that kind of assertions. 
   Regarding the second question, I encounter something interesting that we may 
need to improve on the original test. This is the original function (AS-IS) of 
describing the ConsumerGroup:
   
   ```
   private def describeGroupsResult(groupState: ConsumerGroupState): 
DescribeConsumerGroupsResult = {
       val member1 = new MemberDescription("member1", Optional.of("instance1"), 
"client1", "host1", null)
       val description = new ConsumerGroupDescription(group,
         true,
         Collections.singleton(member1),
         classOf[RangeAssignor].getName,
         groupState,
         new Node(1, "localhost", 9092))
       new DescribeConsumerGroupsResult(Collections.singletonMap(group, 
KafkaFuture.completedFuture(description)))
     }
     ```
     
    As you can see, last parameter of member1 declaration is _null_. That is 
the assignment of the partitions to the consumer. Always is set to null, so no 
partition is assigned to the consumer group. For example, this is the print of 
the consumerGroups of the _testAdminRequestsForDescribeOffsets_ (whith no 
change so far);
    
    (groupId=testGroup, isSimpleConsumerGroup=true, members=(memberId=member1, 
groupInstanceId=instance1, clientId=client1, host=host1, 
assignment=(topicPartitions=)), 
partitionAssignor=org.apache.kafka.clients.consumer.RangeAssignor, 
state=Stable, coordinator=localhost:9092 (id: 1 rack: null), 
authorizedOperations=[])
   
   That is the result of  `val consumerGroups = 
describeConsumerGroups(groupIds)` on ConsumerGroupCommand's 
collectGroupsOffsets function of the regular test. As you can see, there aren't 
assigned partitions to the consumer: `assignment=(topicPartitions=)`
   
   So the regular test is skipping the _assignedTopicPartitions_ part, and 
going directly to _unassignedPartitions_. So the test is **only covering 
unassignedPartitions case.** 
   
   Do you think is something worth to improve?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to