jolshan commented on code in PR #12855:
URL: https://github.com/apache/kafka/pull/12855#discussion_r1035413661


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1567,53 +1567,54 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleDescribeGroupRequest(request: RequestChannel.Request): Unit = {
+  def handleDescribeGroupsRequest(request: RequestChannel.Request): Unit = {
+    val describeRequest = request.body[DescribeGroupsRequest]
+    val includeAuthorizedOperations = 
describeRequest.data.includeAuthorizedOperations
 
-    def sendResponseCallback(describeGroupsResponseData: 
DescribeGroupsResponseData): Unit = {
-      def createResponse(requestThrottleMs: Int): AbstractResponse = {
-        describeGroupsResponseData.setThrottleTimeMs(requestThrottleMs)
-        new DescribeGroupsResponse(describeGroupsResponseData)
-      }
-      requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    def sendResponse(response: AbstractResponse): Unit = {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
+        response.maybeSetThrottleTimeMs(requestThrottleMs)
+        response
+      })
     }
 
-    val describeRequest = request.body[DescribeGroupsRequest]
-    val describeGroupsResponseData = new DescribeGroupsResponseData()
-
+    val futures = 
mutable.ArrayBuffer.empty[CompletableFuture[DescribeGroupsResponseData.DescribedGroup]]
     describeRequest.data.groups.forEach { groupId =>
       if (!authHelper.authorize(request.context, DESCRIBE, GROUP, groupId)) {
-        
describeGroupsResponseData.groups.add(DescribeGroupsResponse.forError(groupId, 
Errors.GROUP_AUTHORIZATION_FAILED))
+        futures += 
CompletableFuture.completedFuture(DescribeGroupsResponse.forError(
+          groupId,
+          Errors.GROUP_AUTHORIZATION_FAILED
+        ))
       } else {
-        val (error, summary) = groupCoordinator.handleDescribeGroup(groupId)
-        val members = summary.members.map { member =>
-          new DescribeGroupsResponseData.DescribedGroupMember()
-            .setMemberId(member.memberId)
-            .setGroupInstanceId(member.groupInstanceId.orNull)
-            .setClientId(member.clientId)
-            .setClientHost(member.clientHost)
-            .setMemberAssignment(member.assignment)
-            .setMemberMetadata(member.metadata)
-        }
-
-        val describedGroup = new DescribeGroupsResponseData.DescribedGroup()
-          .setErrorCode(error.code)
-          .setGroupId(groupId)
-          .setGroupState(summary.state)
-          .setProtocolType(summary.protocolType)
-          .setProtocolData(summary.protocol)
-          .setMembers(members.asJava)
-
-        if (request.header.apiVersion >= 3) {
-          if (error == Errors.NONE && 
describeRequest.data.includeAuthorizedOperations) {
-            
describedGroup.setAuthorizedOperations(authHelper.authorizedOperations(request, 
new Resource(ResourceType.GROUP, groupId)))
+        val ctx = makeGroupCoordinatorRequestContextFrom(request, 
RequestLocal.NoCaching)
+        futures += newGroupCoordinator.describeGroup(ctx, 
groupId).handle[DescribeGroupsResponseData.DescribedGroup] { (response, 
exception) =>
+          if (exception != null) {

Review Comment:
   Not a huge deal but was wondering how you decided what to include in the 
newGroupCoordinator method vs what to keep outside of it. The below lines about 
exception being null could be included right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to