jeffkbkim commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1063877209


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##########
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
         this.error = null;
     }
 
+    public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short 
version) {
+        super(ApiKeys.OFFSET_FETCH);
+        data = new OffsetFetchResponseData();
+
+        if (version >= 8) {
+            data.setGroups(groups);
+            error = null;
+
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+            }
+        } else {
+            if (groups.size() != 1) {
+                throw new UnsupportedVersionException(
+                    "Version " + version + " of OffsetFetchResponse only 
support one group."

Review Comment:
   nit: supports



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##########
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
         this.error = null;
     }
 
+    public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short 
version) {
+        super(ApiKeys.OFFSET_FETCH);
+        data = new OffsetFetchResponseData();
+
+        if (version >= 8) {
+            data.setGroups(groups);
+            error = null;
+
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+            }
+        } else {
+            if (groups.size() != 1) {
+                throw new UnsupportedVersionException(
+                    "Version " + version + " of OffsetFetchResponse only 
support one group."
+                );
+            }
+
+            OffsetFetchResponseGroup group = groups.get(0);
+            data.setErrorCode(group.errorCode());
+            error = Errors.forCode(group.errorCode());
+
+            group.topics().forEach(topic -> {
+                OffsetFetchResponseTopic newTopic = new 
OffsetFetchResponseTopic().setName(topic.name());
+                data.topics().add(newTopic);
+
+                topic.partitions().forEach(partition -> {
+                    OffsetFetchResponsePartition newPartition;
+
+                    if (version < 2 && group.errorCode() != 
Errors.NONE.code()) {
+                        // Versions prior to version 2 does not support a top 
level error. Therefore

Review Comment:
   nit: do not & Therefore,



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel,
       offsetFetchResponse
     }
     requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    CompletableFuture.completedFuture[Unit](())
   }
 
-  private def handleOffsetFetchRequestBetweenV1AndV7(request: 
RequestChannel.Request): Unit = {
-    val header = request.header
+  private def handleOffsetFetchRequestFromCoordinator(request: 
RequestChannel.Request): CompletableFuture[Unit] = {
     val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupId = offsetFetchRequest.groupId()
-    val (error, partitionData) = fetchOffsets(groupId, 
offsetFetchRequest.isAllPartitions,
-      offsetFetchRequest.requireStable, offsetFetchRequest.partitions, 
request.context)
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse =
-        if (error != Errors.NONE) {
-          offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
-        } else {
-          new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
partitionData.asJava)
-        }
-      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    val groups = offsetFetchRequest.groups()
+    val requireStable = offsetFetchRequest.requireStable()
+
+    val futures = new 
mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
+    groups.forEach { groupOffsetFetch =>
+      val isAllPartitions = groupOffsetFetch.topics == null
+      val future = if (isAllPartitions) {
+        fetchAllOffsets(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      } else {
+        fetchOffsets(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      }
+      futures += future
     }
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
-  }
-
-  private def handleOffsetFetchRequestV8AndAbove(request: 
RequestChannel.Request): Unit = {
-    val header = request.header
-    val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupIds = offsetFetchRequest.groupIds().asScala
-    val groupToErrorMap =  mutable.Map.empty[String, Errors]
-    val groupToPartitionData =  mutable.Map.empty[String, 
util.Map[TopicPartition, PartitionData]]
-    val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
-    groupIds.foreach(g => {
-      val (error, partitionData) = fetchOffsets(g,
-        offsetFetchRequest.isAllPartitionsForGroup(g),
-        offsetFetchRequest.requireStable(),
-        groupToTopicPartitions.get(g), request.context)
-      groupToErrorMap += (g -> error)
-      groupToPartitionData += (g -> partitionData.asJava)
-    })
 
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
-        groupToErrorMap.asJava, groupToPartitionData.asJava)
-      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) =>
+      val groupResponses = new 
ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size)
+      futures.foreach(future => groupResponses += future.get())
+      requestHelper.sendMaybeThrottle(request, new 
OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion))
     }
-
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
   }
 
-  private def fetchOffsets(groupId: String, isAllPartitions: Boolean, 
requireStable: Boolean,
-                           partitions: util.List[TopicPartition], context: 
RequestContext): (Errors, Map[TopicPartition, 
OffsetFetchResponse.PartitionData]) = {
-    if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
-      (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
-    } else {
-      if (isAllPartitions) {
-        val (error, allPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable)
-        if (error != Errors.NONE) {
-          (error, allPartitionData)
-        } else {
-          // clients are not allowed to see offsets for topics that are not 
authorized for Describe
-          val (authorizedPartitionData, _) = 
authHelper.partitionMapByAuthorized(context,
-            DESCRIBE, TOPIC, allPartitionData)(_.topic)
-          (Errors.NONE, authorizedPartitionData)
-        }
+  private def fetchAllOffsets(

Review Comment:
   fetchAllOffsetsForGroup and fetchOffsetsForGroup makes it more readable for 
me. wdyt?
   
   looking at the new GroupCoordinator interface seems like they have a 
counterpart method. thoughts on changing both?



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1395,79 +1389,123 @@ class KafkaApis(val requestChannel: RequestChannel,
       offsetFetchResponse
     }
     requestHelper.sendResponseMaybeThrottle(request, createResponse)
+    CompletableFuture.completedFuture[Unit](())
   }
 
-  private def handleOffsetFetchRequestBetweenV1AndV7(request: 
RequestChannel.Request): Unit = {
-    val header = request.header
+  private def handleOffsetFetchRequestFromCoordinator(request: 
RequestChannel.Request): CompletableFuture[Unit] = {
     val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupId = offsetFetchRequest.groupId()
-    val (error, partitionData) = fetchOffsets(groupId, 
offsetFetchRequest.isAllPartitions,
-      offsetFetchRequest.requireStable, offsetFetchRequest.partitions, 
request.context)
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse =
-        if (error != Errors.NONE) {
-          offsetFetchRequest.getErrorResponse(requestThrottleMs, error)
-        } else {
-          new OffsetFetchResponse(requestThrottleMs, Errors.NONE, 
partitionData.asJava)
-        }
-      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    val groups = offsetFetchRequest.groups()
+    val requireStable = offsetFetchRequest.requireStable()
+
+    val futures = new 
mutable.ArrayBuffer[CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup]](groups.size)
+    groups.forEach { groupOffsetFetch =>
+      val isAllPartitions = groupOffsetFetch.topics == null
+      val future = if (isAllPartitions) {
+        fetchAllOffsets(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      } else {
+        fetchOffsets(
+          request.context,
+          groupOffsetFetch,
+          requireStable
+        )
+      }
+      futures += future
     }
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
-  }
-
-  private def handleOffsetFetchRequestV8AndAbove(request: 
RequestChannel.Request): Unit = {
-    val header = request.header
-    val offsetFetchRequest = request.body[OffsetFetchRequest]
-    val groupIds = offsetFetchRequest.groupIds().asScala
-    val groupToErrorMap =  mutable.Map.empty[String, Errors]
-    val groupToPartitionData =  mutable.Map.empty[String, 
util.Map[TopicPartition, PartitionData]]
-    val groupToTopicPartitions = offsetFetchRequest.groupIdsToPartitions()
-    groupIds.foreach(g => {
-      val (error, partitionData) = fetchOffsets(g,
-        offsetFetchRequest.isAllPartitionsForGroup(g),
-        offsetFetchRequest.requireStable(),
-        groupToTopicPartitions.get(g), request.context)
-      groupToErrorMap += (g -> error)
-      groupToPartitionData += (g -> partitionData.asJava)
-    })
 
-    def createResponse(requestThrottleMs: Int): AbstractResponse = {
-      val offsetFetchResponse = new OffsetFetchResponse(requestThrottleMs,
-        groupToErrorMap.asJava, groupToPartitionData.asJava)
-      trace(s"Sending offset fetch response $offsetFetchResponse for 
correlation id ${header.correlationId} to client ${header.clientId}.")
-      offsetFetchResponse
+    CompletableFuture.allOf(futures.toArray: _*).handle[Unit] { (_, _) =>
+      val groupResponses = new 
ArrayBuffer[OffsetFetchResponseData.OffsetFetchResponseGroup](futures.size)
+      futures.foreach(future => groupResponses += future.get())
+      requestHelper.sendMaybeThrottle(request, new 
OffsetFetchResponse(groupResponses.asJava, request.context.apiVersion))
     }
-
-    requestHelper.sendResponseMaybeThrottle(request, createResponse)
   }
 
-  private def fetchOffsets(groupId: String, isAllPartitions: Boolean, 
requireStable: Boolean,
-                           partitions: util.List[TopicPartition], context: 
RequestContext): (Errors, Map[TopicPartition, 
OffsetFetchResponse.PartitionData]) = {
-    if (!authHelper.authorize(context, DESCRIBE, GROUP, groupId)) {
-      (Errors.GROUP_AUTHORIZATION_FAILED, Map.empty)
-    } else {
-      if (isAllPartitions) {
-        val (error, allPartitionData) = 
groupCoordinator.handleFetchOffsets(groupId, requireStable)
-        if (error != Errors.NONE) {
-          (error, allPartitionData)
-        } else {
-          // clients are not allowed to see offsets for topics that are not 
authorized for Describe
-          val (authorizedPartitionData, _) = 
authHelper.partitionMapByAuthorized(context,
-            DESCRIBE, TOPIC, allPartitionData)(_.topic)
-          (Errors.NONE, authorizedPartitionData)
-        }
+  private def fetchAllOffsets(
+    requestContext: RequestContext,
+    groupOffsetFetch: OffsetFetchRequestData.OffsetFetchRequestGroup,
+    requireStable: Boolean
+  ): CompletableFuture[OffsetFetchResponseData.OffsetFetchResponseGroup] = {
+    if (!authHelper.authorize(requestContext, DESCRIBE, GROUP, 
groupOffsetFetch.groupId)) {
+      return CompletableFuture.completedFuture(new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId(groupOffsetFetch.groupId)
+        .setErrorCode(Errors.GROUP_AUTHORIZATION_FAILED.code))
+    }

Review Comment:
   seems like this block is reused in `fetchOffsets()`, can we move it up to 
`handleOffsetFetchRequestFromCoordinator()`?



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -3405,6 +3405,341 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = {
+    // Version 0 gets offsets from Zookeeper. We are not interested
+    // in testing this here.
+    if (version == 0) return
+
+    def makeRequest(version: Short): RequestChannel.Request = {
+      val groups = Map(
+        "group-1" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("foo", 1)
+        ).asJava,
+        "group-2" -> null,
+        "group-3" -> null,
+      ).asJava
+      buildRequest(new OffsetFetchRequest.Builder(groups, false, 
false).build(version))
+    }
+
+    if (version < 8) {
+      // Request version earlier than version 8 do not support batching.

Review Comment:
   nit: do not support batching groups?



##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##########
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
         this.error = null;
     }
 
+    public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short 
version) {
+        super(ApiKeys.OFFSET_FETCH);
+        data = new OffsetFetchResponseData();
+
+        if (version >= 8) {
+            data.setGroups(groups);
+            error = null;
+
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+            }
+        } else {
+            if (groups.size() != 1) {
+                throw new UnsupportedVersionException(
+                    "Version " + version + " of OffsetFetchResponse only 
support one group."
+                );
+            }
+
+            OffsetFetchResponseGroup group = groups.get(0);
+            data.setErrorCode(group.errorCode());
+            error = Errors.forCode(group.errorCode());
+
+            group.topics().forEach(topic -> {
+                OffsetFetchResponseTopic newTopic = new 
OffsetFetchResponseTopic().setName(topic.name());
+                data.topics().add(newTopic);
+
+                topic.partitions().forEach(partition -> {
+                    OffsetFetchResponsePartition newPartition;
+
+                    if (version < 2 && group.errorCode() != 
Errors.NONE.code()) {
+                        // Versions prior to version 2 does not support a top 
level error. Therefore
+                        // we put it at the partition level.
+                        newPartition = new OffsetFetchResponsePartition()
+                            .setPartitionIndex(partition.partitionIndex())
+                            .setErrorCode(group.errorCode());
+                    } else {

Review Comment:
   i'm a bit confused on the else statement. we can hit this either when 
version >= 2 or group error == NONE. 
   
   based on the comment above, it seems that for version >= 2 we don't have to 
put error at the partition level. also, do we need to add offset/metadata if 
there is an error for version >= 2?



##########
core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala:
##########
@@ -150,42 +149,6 @@ class OffsetFetchRequestTest extends BaseRequestTest {
     }
   }
 
-  @Test
-  def testOffsetFetchRequestWithMultipleGroupsWithOneGroupRepeating(): Unit = {

Review Comment:
   +1 for having duplicate responses if a group is repeated. i don't think this 
warrants an invalid request but we should conform to the norm if there is one. 



##########
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##########
@@ -3405,6 +3405,341 @@ class KafkaApisTest {
     assertEquals(Errors.GROUP_AUTHORIZATION_FAILED, response.error)
   }
 
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchWithMultipleGroups(version: Short): Unit = {
+    // Version 0 gets offsets from Zookeeper. We are not interested
+    // in testing this here.
+    if (version == 0) return
+
+    def makeRequest(version: Short): RequestChannel.Request = {
+      val groups = Map(
+        "group-1" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("foo", 1)
+        ).asJava,
+        "group-2" -> null,
+        "group-3" -> null,
+      ).asJava
+      buildRequest(new OffsetFetchRequest.Builder(groups, false, 
false).build(version))
+    }
+
+    if (version < 8) {
+      // Request version earlier than version 8 do not support batching.
+      assertThrows(classOf[UnsupportedVersionException], () => 
makeRequest(version))
+    } else {
+      val requestChannelRequest = makeRequest(version)
+
+      val group1Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+      when(newGroupCoordinator.fetchOffsets(
+        requestChannelRequest.context,
+        "group-1",
+        List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+          .setName("foo")
+          .setPartitionIndexes(List[Integer](0, 1).asJava)
+        ).asJava,
+        false
+      )).thenReturn(group1Future)
+
+      val group2Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+      when(newGroupCoordinator.fetchAllOffsets(
+        requestChannelRequest.context,
+        "group-2",
+        false
+      )).thenReturn(group2Future)
+
+      val group3Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+      when(newGroupCoordinator.fetchAllOffsets(
+        requestChannelRequest.context,
+        "group-3",
+        false
+      )).thenReturn(group3Future)
+
+      createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
+
+      val group1Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("group-1")
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopics()
+            .setName("foo")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(2)
+            ).asJava)
+        ).asJava)
+
+      val group2Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("group-2")
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopics()
+            .setName("bar")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(2),
+              new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+                .setPartitionIndex(2)
+                .setCommittedOffset(300)
+                .setCommittedLeaderEpoch(3)
+            ).asJava)
+        ).asJava)
+
+      val group3Response = new 
OffsetFetchResponseData.OffsetFetchResponseGroup()
+        .setGroupId("group-3")
+        .setErrorCode(Errors.INVALID_GROUP_ID.code)
+
+      val expectedOffsetFetchResponse = new OffsetFetchResponseData()
+        .setGroups(List(group1Response, group2Response, group3Response).asJava)
+
+      group1Future.complete(group1Response.topics)
+      group2Future.complete(group2Response.topics)
+      group3Future.completeExceptionally(Errors.INVALID_GROUP_ID.exception)
+
+      val response = 
verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+      assertEquals(expectedOffsetFetchResponse, response.data)
+    }
+  }
+
+  @ParameterizedTest
+  @ApiKeyVersionsSource(apiKey = ApiKeys.OFFSET_FETCH)
+  def testHandleOffsetFetchWithSingleGroup(version: Short): Unit = {
+    // Version 0 gets offsets from Zookeeper. We are not interested
+    // in testing this here.
+    if (version == 0) return
+
+    def makeRequest(version: Short): RequestChannel.Request = {
+      buildRequest(new OffsetFetchRequest.Builder(
+        "group-1",
+        false,
+        List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("foo", 1)
+        ).asJava,
+        false
+      ).build(version))
+    }
+
+    val requestChannelRequest = makeRequest(version)
+
+    val future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+    when(newGroupCoordinator.fetchOffsets(
+      requestChannelRequest.context,
+      "group-1",
+      List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+        .setName("foo")
+        .setPartitionIndexes(List[Integer](0, 1).asJava)
+      ).asJava,
+      false
+    )).thenReturn(future)
+
+    createKafkaApis().handleOffsetFetchRequest(requestChannelRequest)
+
+    val group1Response = new OffsetFetchResponseData.OffsetFetchResponseGroup()
+      .setGroupId("group-1")
+      .setTopics(List(
+        new OffsetFetchResponseData.OffsetFetchResponseTopics()
+          .setName("foo")
+          .setPartitions(List(
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(0)
+              .setCommittedOffset(100)
+              .setCommittedLeaderEpoch(1),
+            new OffsetFetchResponseData.OffsetFetchResponsePartitions()
+              .setPartitionIndex(1)
+              .setCommittedOffset(200)
+              .setCommittedLeaderEpoch(2)
+          ).asJava)
+      ).asJava)
+
+    val expectedOffsetFetchResponse = if (version >= 8) {
+      new OffsetFetchResponseData()
+        .setGroups(List(group1Response).asJava)
+    } else {
+      new OffsetFetchResponseData()
+        .setTopics(List(
+          new OffsetFetchResponseData.OffsetFetchResponseTopic()
+            .setName("foo")
+            .setPartitions(List(
+              new OffsetFetchResponseData.OffsetFetchResponsePartition()
+                .setPartitionIndex(0)
+                .setCommittedOffset(100)
+                .setCommittedLeaderEpoch(if (version >= 5) 1 else -1),
+              new OffsetFetchResponseData.OffsetFetchResponsePartition()
+                .setPartitionIndex(1)
+                .setCommittedOffset(200)
+                .setCommittedLeaderEpoch(if (version >= 5) 2 else -1)
+            ).asJava)
+        ).asJava)
+    }
+
+    future.complete(group1Response.topics)
+
+    val response = 
verifyNoThrottling[OffsetFetchResponse](requestChannelRequest)
+    assertEquals(expectedOffsetFetchResponse, response.data)
+  }
+
+  @Test
+  def testHandleOffsetFetchAuthorization(): Unit = {
+    def makeRequest(version: Short): RequestChannel.Request = {
+      val groups = Map(
+        "group-1" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("bar", 0)
+        ).asJava,
+        "group-2" -> List(
+          new TopicPartition("foo", 0),
+          new TopicPartition("bar", 0)
+        ).asJava,
+        "group-3" -> null,
+        "group-4" -> null,
+      ).asJava
+      buildRequest(new OffsetFetchRequest.Builder(groups, false, 
false).build(version))
+    }
+
+    val requestChannelRequest = makeRequest(ApiKeys.OFFSET_FETCH.latestVersion)
+
+    val authorizer: Authorizer = mock(classOf[Authorizer])
+
+    val acls = Map(
+      "group-1" -> AuthorizationResult.ALLOWED,
+      "group-2" -> AuthorizationResult.DENIED,
+      "group-3" -> AuthorizationResult.ALLOWED,
+      "group-4" -> AuthorizationResult.DENIED,
+      "foo" -> AuthorizationResult.DENIED,
+      "bar" -> AuthorizationResult.ALLOWED
+    )
+
+    when(authorizer.authorize(
+      any[RequestContext],
+      any[util.List[Action]]
+    )).thenAnswer { invocation =>
+      val actions = invocation.getArgument(1, classOf[util.List[Action]])
+      actions.asScala.map { action =>
+        acls.getOrElse(action.resourcePattern.name, AuthorizationResult.DENIED)
+      }.asJava
+    }
+
+    // group-1 is allowed and bar is allowed.
+    val group1Future = new 
CompletableFuture[util.List[OffsetFetchResponseData.OffsetFetchResponseTopics]]()
+    when(newGroupCoordinator.fetchOffsets(
+      requestChannelRequest.context,
+      "group-1",
+      List(new OffsetFetchRequestData.OffsetFetchRequestTopics()
+        .setName("bar")
+        .setPartitionIndexes(List[Integer](0).asJava)
+      ).asJava,
+      false
+    )).thenReturn(group1Future)
+
+    // group-3 is allows and bar is allowed.

Review Comment:
   nit: is allowed



##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1338,27 +1337,22 @@ class KafkaApis(val requestChannel: RequestChannel,
   /**
    * Handle an offset fetch request
    */
-  def handleOffsetFetchRequest(request: RequestChannel.Request): Unit = {
+  def handleOffsetFetchRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
     val version = request.header.apiVersion
     if (version == 0) {
-      // reading offsets from ZK
-      handleOffsetFetchRequestV0(request)
-    } else if (version >= 1 && version <= 7) {
-      // reading offsets from Kafka
-      handleOffsetFetchRequestBetweenV1AndV7(request)
+      handleOffsetFetchRequestFromZookeeper(request)

Review Comment:
   looks much more readable. nice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to