dajac commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1396140016


##########
clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequest.java:
##########
@@ -83,4 +85,16 @@ public static ConsumerGroupDescribeRequest parse(ByteBuffer 
buffer, short versio
             version
         );
     }
+
+    public static List<ConsumerGroupDescribeResponseData.DescribedGroup> 
getErrorDescribedGroupList(

Review Comment:
   Could we add a unit test for this new method?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6465,12 +6465,42 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+    val groupId = "group0"
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.add(groupId)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      requestChannelRequest.context,
+      consumerGroupDescribeRequestData.groupIds
+//      any[RequestContext],
+//      any[util.List[String]]
+    )).thenReturn(future)
+
+    createKafkaApis(
+      overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> 
"true")
+    ).handle(requestChannelRequest, RequestLocal.NoCaching)
+

Review Comment:
   I suppose that you need to complete the future here before calling 
`verifyNoThrottling`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -445,6 +446,42 @@ public List<ListGroupsResponseData.ListedGroup> 
listGroups(List<String> statesFi
         return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
     }
 
+
+    public List<ConsumerGroupDescribeResponseData.DescribedGroup> 
consumerGroupDescribe(
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new 
ArrayList<>();
+
+        for (String groupId: groupIds) {
+            Group group = groups.get(groupId, committedOffset);

Review Comment:
   I wonder if we could follow a bit more what we did in `describeGroups`. 
Would it be possible?
   
   For instance, there we use `genericGroup` which handles the group type check 
as well. We could use a similar method for consumer groups, I think.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -525,6 +527,69 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
         return future;
     }
 
+    /**
+     * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, 
List)}.
+     */
+    @Override
+    public 
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
consumerGroupDescribe(
+        RequestContext context,
+        List<String> groupIds
+    ) {
+        if (!isActive.get()) {
+            return 
CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+                groupIds,
+                Errors.COORDINATOR_NOT_AVAILABLE
+            ));
+        }
+
+        final 
List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>> 
futures =
+            new ArrayList<>(groupIds.size());
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DescribeGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+                    new ConsumerGroupDescribeResponseData.DescribedGroup()
+                        .setGroupId(null)
+                        .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                        .setErrorMessage(Errors.INVALID_GROUP_ID.message())

Review Comment:
   nit: Let's remove this one.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -545,6 +547,32 @@ public String currentAssignmentSummary() {
             ')';
     }
 
+    public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(Assignment targetAssignment) {
+        return new ConsumerGroupDescribeResponseData.Member()
+            .setMemberEpoch(memberEpoch)
+            .setMemberId(Uuid.fromString(memberId))

Review Comment:
   It may be better to change the member id field to a string as we use a 
string everywhere. I suppose that I got it wrong in the KIP.



##########
clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupDescribeRequest.java:
##########
@@ -83,4 +85,16 @@ public static ConsumerGroupDescribeRequest parse(ByteBuffer 
buffer, short versio
             version
         );
     }
+
+    public static List<ConsumerGroupDescribeResponseData.DescribedGroup> 
getErrorDescribedGroupList(
+        List<String> groupIds,
+        Errors error
+    ) {
+        return groupIds.stream()
+            .map(groupId -> new 
ConsumerGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(groupId)
+                .setErrorCode(error.code())
+                .setErrorMessage(error.message())

Review Comment:
   It would be better to remove the message here. We usually don't put it if it 
is the default one.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6465,12 +6465,42 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+    val groupId = "group0"
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.add(groupId)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      requestChannelRequest.context,
+      consumerGroupDescribeRequestData.groupIds
+//      any[RequestContext],
+//      any[util.List[String]]
+    )).thenReturn(future)
+
+    createKafkaApis(
+      overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> 
"true")
+    ).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
+
+    val describedGroups = List(new DescribedGroup()).asJava

Review Comment:
   Could we put add more groups with their info (e.g. id, etc).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -445,6 +446,42 @@ public List<ListGroupsResponseData.ListedGroup> 
listGroups(List<String> statesFi
         return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
     }
 
+
+    public List<ConsumerGroupDescribeResponseData.DescribedGroup> 
consumerGroupDescribe(
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new 
ArrayList<>();
+
+        for (String groupId: groupIds) {
+            Group group = groups.get(groupId, committedOffset);
+
+            ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = 
new ConsumerGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(groupId);
+
+            if (group == null || !CONSUMER.equals(group.type())) {
+                // We don't support upgrading/downgrading between protocols at 
the moment so
+                // we set an error if a group exists with the wrong type.
+                
describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message());

Review Comment:
   I think that we should return `GroupIdNotFoundException` and we can also 
remove the error message here.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6206,12 +6206,40 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+    val groupId = "group0"
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.add(groupId)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+
+    createKafkaApis(
+      overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> 
"true")
+    ).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
+
+    val describedGroups = List(new DescribedGroup()).asJava
+    val consumerGroupDescribeResponseData = new 
ConsumerGroupDescribeResponseData()
+      .setGroups(describedGroups)
+    future.complete(describedGroups)
+
+    assertEquals(consumerGroupDescribeResponseData, response.data)
+  }
+

Review Comment:
   +1



##########
core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala:
##########
@@ -601,4 +601,13 @@ private[group] class GroupCoordinatorAdapter(
   override def shutdown(): Unit = {
     coordinator.shutdown()
   }
+
+  override def consumerGroupDescribe(

Review Comment:
   Should we add a unit test for this new method?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6465,12 +6465,42 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+    val groupId = "group0"
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.add(groupId)

Review Comment:
   It would be great to use more groups (2 or 3).



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -445,6 +446,42 @@ public List<ListGroupsResponseData.ListedGroup> 
listGroups(List<String> statesFi
         return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
     }
 
+
+    public List<ConsumerGroupDescribeResponseData.DescribedGroup> 
consumerGroupDescribe(
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new 
ArrayList<>();
+
+        for (String groupId: groupIds) {
+            Group group = groups.get(groupId, committedOffset);
+
+            ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = 
new ConsumerGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(groupId);
+
+            if (group == null || !CONSUMER.equals(group.type())) {
+                // We don't support upgrading/downgrading between protocols at 
the moment so
+                // we set an error if a group exists with the wrong type.
+                
describedGroup.setErrorMessage(Errors.INVALID_GROUP_ID.message());
+                describedGroup.setErrorCode(Errors.INVALID_GROUP_ID.code());
+            } else {
+                ConsumerGroup consumerGroup = (ConsumerGroup) group;
+                describedGroup.setGroupState(consumerGroup.stateAsString())
+                    .setGroupEpoch(consumerGroup.groupEpoch())
+                    .setAssignmentEpoch(consumerGroup.assignmentEpoch())
+                    
.setAssignorName(consumerGroup.preferredServerAssignor().isPresent() ?
+                        consumerGroup.preferredServerAssignor().get() : "");
+                consumerGroup.members().forEach(
+                    (id, member) -> 
describedGroup.members().add(member.asConsumerGroupDescribeMember(consumerGroup.targetAssignment(member.memberId())))

Review Comment:
   All the reads here must take into account the `committedOffset`. It may be 
better to add a `asDescribedGroup` method to the consumer group to prepare it.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8660,6 +8665,68 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testConsumerGroupDescribeNoErrors() {
+        String consumerGroupId = "consumerGroupId";
+        int epoch = 10;
+        String memberId = Uuid.randomUuid().toString();
+        String topicName = "topicName";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 
epoch))
+            .build();
+
+        ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+            .setSubscribedTopicNames(Collections.singletonList(topicName));
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(
+            consumerGroupId,
+            memberBuilder.build()
+        ));
+        context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 
epoch + 1));
+
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId));
+        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup();
+        describedGroup.setGroupEpoch(epoch + 1);
+        describedGroup.setGroupId(consumerGroupId);
+        
describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember()));
+        describedGroup.setAssignorName(null);
+        describedGroup.setGroupState("assigning");
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = 
Collections.singletonList(
+            describedGroup
+        );
+
+        assertEquals(expected, actual);
+    }

Review Comment:
   +1. It would be great if we could do this. You can take a look at how we did 
it in other tests. If we are blocked, ping me on slack.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to