dajac commented on code in PR #14544:
URL: https://github.com/apache/kafka/pull/14544#discussion_r1400669970


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3693,8 +3693,49 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleConsumerGroupDescribe(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
-    requestHelper.sendMaybeThrottle(request, 
request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
-    CompletableFuture.completedFuture[Unit](())
+    val consumerGroupDescribeRequest = 
request.body[ConsumerGroupDescribeRequest]
+
+    if (!config.isNewGroupCoordinatorEnabled) {
+      // The API is not supported by the "old" group coordinator (the 
default). If the
+      // new one is not enabled, we fail directly here.
+      requestHelper.sendMaybeThrottle(request, 
request.body[ConsumerGroupDescribeRequest].getErrorResponse(Errors.UNSUPPORTED_VERSION.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val response = new ConsumerGroupDescribeResponseData()
+
+      val authorizedGroups = new ArrayBuffer[String]()
+      consumerGroupDescribeRequest.data.groupIds.forEach { groupId =>
+        if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
+          response.groups.add(new 
ConsumerGroupDescribeResponseData.DescribedGroup()
+            .setGroupId(groupId)
+            .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code)
+            .setErrorMessage(Errors.GROUP_AUTHORIZATION_FAILED.message)

Review Comment:
   We usually don't set the error message when it is basically the default 
error message so I would remove it here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -253,8 +254,8 @@ GroupMetadataManager build() {
                 genericGroupMaxSessionTimeoutMs
             );
         }
-    }
 
+    }

Review Comment:
   Let's revert this one as well.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -133,6 +134,7 @@
 public class GroupMetadataManager {
 
     public static class Builder {
+

Review Comment:
   Let's revert this change as it is unnecessary.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8660,6 +8665,105 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testConsumerGroupDescribeNoErrors() {
+        String consumerGroupId = "consumerGroupId";
+        int epoch = 10;
+        String memberId = Uuid.randomUuid().toString();
+        String topicName = "topicName";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 
epoch))
+            .build();
+
+        ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+            .setSubscribedTopicNames(Collections.singletonList(topicName));
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(
+            consumerGroupId,
+            memberBuilder.build()
+        ));
+        context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 
epoch + 1));
+
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId));
+        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup();
+        describedGroup.setGroupEpoch(epoch + 1);
+        describedGroup.setGroupId(consumerGroupId);
+        
describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember(new
 Assignment(Collections.emptyMap()))));
+        describedGroup.setAssignorName("");
+        describedGroup.setGroupState("assigning");
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = 
Collections.singletonList(
+            describedGroup
+        );
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testConsumerGroupDescribeWithErrors() {
+        String groupId = "groupId";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+        context.replay(newGroupMetadataRecord(
+            groupId,
+            new GroupMetadataValue()
+                .setMembers(Collections.emptyList()),
+            MetadataVersion.latest()
+        ));

Review Comment:
   I suppose that we could remove this, no?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -445,6 +446,41 @@ public List<ListGroupsResponseData.ListedGroup> 
listGroups(List<String> statesFi
         return groupStream.map(group -> 
group.asListedGroup(committedOffset)).collect(Collectors.toList());
     }
 
+
+    public List<ConsumerGroupDescribeResponseData.DescribedGroup> 
consumerGroupDescribe(
+        List<String> groupIds,
+        long committedOffset
+    ) {
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> response = new 
ArrayList<>();
+
+        for (String groupId: groupIds) {
+            Group group = groups.get(groupId, committedOffset);
+
+            ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = 
new ConsumerGroupDescribeResponseData.DescribedGroup()
+                .setGroupId(groupId);
+
+            if (group == null || !CONSUMER.equals(group.type())) {
+                // We don't support upgrading/downgrading between protocols at 
the moment so
+                // we set an error if a group exists with the wrong type.
+                describedGroup.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code());
+            } else {
+                ConsumerGroup consumerGroup = (ConsumerGroup) group;
+                describedGroup.setGroupState(consumerGroup.stateAsString())
+                    .setGroupEpoch(consumerGroup.groupEpoch())
+                    .setAssignmentEpoch(consumerGroup.assignmentEpoch())
+                    
.setAssignorName(consumerGroup.preferredServerAssignor().isPresent() ?
+                        consumerGroup.preferredServerAssignor().get() : "");

Review Comment:
   This should actually be 
`consumerGroup.preferredServerAssignor().orElse(defaultAssignor.name()`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java:
##########
@@ -525,6 +527,69 @@ public CompletableFuture<ListGroupsResponseData> 
listGroups(
         return future;
     }
 
+    /**
+     * See {@link GroupCoordinator#consumerGroupDescribe(RequestContext, 
List)}.
+     */
+    @Override
+    public 
CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>> 
consumerGroupDescribe(
+        RequestContext context,
+        List<String> groupIds
+    ) {
+        if (!isActive.get()) {
+            return 
CompletableFuture.completedFuture(ConsumerGroupDescribeRequest.getErrorDescribedGroupList(
+                groupIds,
+                Errors.COORDINATOR_NOT_AVAILABLE
+            ));
+        }
+
+        final 
List<CompletableFuture<List<ConsumerGroupDescribeResponseData.DescribedGroup>>> 
futures =
+            new ArrayList<>(groupIds.size());
+        final Map<TopicPartition, List<String>> groupsByTopicPartition = new 
HashMap<>();
+        groupIds.forEach(groupId -> {
+            // For backwards compatibility, we support DescribeGroups for the 
empty group id.
+            if (groupId == null) {
+                
futures.add(CompletableFuture.completedFuture(Collections.singletonList(
+                    new ConsumerGroupDescribeResponseData.DescribedGroup()
+                        .setGroupId(null)
+                        .setErrorCode(Errors.INVALID_GROUP_ID.code())
+                        .setErrorMessage(Errors.INVALID_GROUP_ID.message())

Review Comment:
   Let's remove the error message.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8660,6 +8665,105 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testConsumerGroupDescribeNoErrors() {
+        String consumerGroupId = "consumerGroupId";
+        int epoch = 10;
+        String memberId = Uuid.randomUuid().toString();
+        String topicName = "topicName";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 
epoch))

Review Comment:
   I think that you could leverage `ConsumerGroupBuilder` a bit more to add a 
few groups and/or members.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6465,12 +6465,45 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+    val groupIds = List("group-id-0", "group-id-1", "group-id-2").asJava
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.addAll(groupIds)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+
+    createKafkaApis(
+      overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> 
"true")
+    ).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val describedGroups = List(
+      new DescribedGroup().setGroupId(groupIds[0]),
+      new DescribedGroup().setGroupId(groupIds[1]),
+      new DescribedGroup().setGroupId(groupIds[2])
+    ).asJava
+
+    future.complete(describedGroups)
+    val consumerGroupDescribeResponseData = new 
ConsumerGroupDescribeResponseData()

Review Comment:
   nit: `expectedConsumerGroupDescribeResponse`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -545,6 +547,32 @@ public String currentAssignmentSummary() {
             ')';
     }
 
+    public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(Assignment targetAssignment) {

Review Comment:
   Instead of passing the `targetAssignment` here, I wonder if we could just 
set it on the caller side. Have you considered this?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8660,6 +8665,105 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testConsumerGroupDescribeNoErrors() {
+        String consumerGroupId = "consumerGroupId";
+        int epoch = 10;
+        String memberId = Uuid.randomUuid().toString();
+        String topicName = "topicName";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 
epoch))
+            .build();
+
+        ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+            .setSubscribedTopicNames(Collections.singletonList(topicName));
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(
+            consumerGroupId,
+            memberBuilder.build()
+        ));
+        context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 
epoch + 1));
+

Review Comment:
   This test is actually not supposed to work because you don't commit after 
replaying the records so the state should not be accessible.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8660,6 +8665,105 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testConsumerGroupDescribeNoErrors() {
+        String consumerGroupId = "consumerGroupId";
+        int epoch = 10;
+        String memberId = Uuid.randomUuid().toString();
+        String topicName = "topicName";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 
epoch))
+            .build();
+
+        ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+            .setSubscribedTopicNames(Collections.singletonList(topicName));
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(
+            consumerGroupId,
+            memberBuilder.build()
+        ));
+        context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 
epoch + 1));
+
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId));
+        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup();
+        describedGroup.setGroupEpoch(epoch + 1);
+        describedGroup.setGroupId(consumerGroupId);
+        
describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember(new
 Assignment(Collections.emptyMap()))));
+        describedGroup.setAssignorName("");
+        describedGroup.setGroupState("assigning");
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = 
Collections.singletonList(
+            describedGroup
+        );
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testConsumerGroupDescribeWithErrors() {
+        String groupId = "groupId";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+        context.replay(newGroupMetadataRecord(
+            groupId,
+            new GroupMetadataValue()
+                .setMembers(Collections.emptyList()),
+            MetadataVersion.latest()
+        ));
+
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(Collections.singletonList(groupId));
+        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup();
+        describedGroup.setGroupId(groupId);
+        describedGroup.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code());

Review Comment:
   nit: We usually format the code as follow when we construct such object.
   
   ```
   ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup()
       .setGroupid()
       .set....;
   ```
   
   It would be great if you could update all occurrences in the new tests.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -9603,6 +9707,24 @@ private static JoinGroupRequestProtocolCollection 
toProtocols(String... protocol
         return protocols;
     }
 
+    private static Record newConsumerGroupMetadataRecord(

Review Comment:
   There is already everything you need in `RecordHelpers`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -8660,6 +8665,105 @@ public void testListGroups() {
         assertEquals(expectAllGroupMap, actualAllGroupMap);
     }
 
+    @Test
+    public void testConsumerGroupDescribeNoErrors() {
+        String consumerGroupId = "consumerGroupId";
+        int epoch = 10;
+        String memberId = Uuid.randomUuid().toString();
+        String topicName = "topicName";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .withConsumerGroup(new ConsumerGroupBuilder(consumerGroupId, 
epoch))
+            .build();
+
+        ConsumerGroupMember.Builder memberBuilder = new 
ConsumerGroupMember.Builder(memberId)
+            .setSubscribedTopicNames(Collections.singletonList(topicName));
+        context.replay(RecordHelpers.newMemberSubscriptionRecord(
+            consumerGroupId,
+            memberBuilder.build()
+        ));
+        context.replay(RecordHelpers.newGroupEpochRecord(consumerGroupId, 
epoch + 1));
+
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(Arrays.asList(consumerGroupId));
+        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup();
+        describedGroup.setGroupEpoch(epoch + 1);
+        describedGroup.setGroupId(consumerGroupId);
+        
describedGroup.setMembers(Collections.singletonList(memberBuilder.build().asConsumerGroupDescribeMember(new
 Assignment(Collections.emptyMap()))));
+        describedGroup.setAssignorName("");
+        describedGroup.setGroupState("assigning");
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = 
Collections.singletonList(
+            describedGroup
+        );
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testConsumerGroupDescribeWithErrors() {
+        String groupId = "groupId";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+        context.replay(newGroupMetadataRecord(
+            groupId,
+            new GroupMetadataValue()
+                .setMembers(Collections.emptyList()),
+            MetadataVersion.latest()
+        ));
+
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> actual = 
context.sendConsumerGroupDescribe(Collections.singletonList(groupId));
+        ConsumerGroupDescribeResponseData.DescribedGroup describedGroup = new 
ConsumerGroupDescribeResponseData.DescribedGroup();
+        describedGroup.setGroupId(groupId);
+        describedGroup.setErrorCode(Errors.GROUP_ID_NOT_FOUND.code());
+        List<ConsumerGroupDescribeResponseData.DescribedGroup> expected = 
Collections.singletonList(
+            describedGroup
+        );
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void testConsumerGroupDescribeBeforeAndAfterCommittingOffset() {
+        String consumerGroupId = "consumerGroupId";
+
+        MockPartitionAssignor assignor = new MockPartitionAssignor("range");
+        GroupMetadataManagerTestContext context = new 
GroupMetadataManagerTestContext.Builder()
+            .withAssignors(Collections.singletonList(assignor))
+            .build();
+
+        // Add group without committing
+        context.replay(newConsumerGroupMetadataRecord(
+            consumerGroupId,
+            new ConsumerGroupMetadataValue(),
+            MetadataVersion.latest()
+        ));

Review Comment:
   I wonder if we could build a bigger test case with a few members. It would 
also be good to have target assignment records and current assignment records 
for completeness.



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -6206,12 +6206,40 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED.code, 
response.data.errorCode)
   }
 
+  @Test
+  def testConsumerGroupDescribe(): Unit = {
+    val groupId = "group0"
+    val consumerGroupDescribeRequestData = new 
ConsumerGroupDescribeRequestData()
+    consumerGroupDescribeRequestData.groupIds.add(groupId)
+    val requestChannelRequest = buildRequest(new 
ConsumerGroupDescribeRequest.Builder(consumerGroupDescribeRequestData, 
true).build())
+
+    val future = new 
CompletableFuture[util.List[ConsumerGroupDescribeResponseData.DescribedGroup]]()
+    when(groupCoordinator.consumerGroupDescribe(
+      any[RequestContext],
+      any[util.List[String]]
+    )).thenReturn(future)
+
+    createKafkaApis(
+      overrideProperties = Map(KafkaConfig.NewGroupCoordinatorEnableProp -> 
"true")
+    ).handle(requestChannelRequest, RequestLocal.NoCaching)
+
+    val response = 
verifyNoThrottling[ConsumerGroupDescribeResponse](requestChannelRequest)
+
+    val describedGroups = List(new DescribedGroup()).asJava
+    val consumerGroupDescribeResponseData = new 
ConsumerGroupDescribeResponseData()
+      .setGroups(describedGroups)
+    future.complete(describedGroups)
+
+    assertEquals(consumerGroupDescribeResponseData, response.data)
+  }
+

Review Comment:
   It would be great if we could add those. We could also do it as a follow-up 
if you like.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to