dajac commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1068450233


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -3110,61 +3110,69 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
   }
 
-  def handleOffsetDeleteRequest(request: RequestChannel.Request, requestLocal: 
RequestLocal): Unit = {
+  def handleOffsetDeleteRequest(
+    request: RequestChannel.Request,
+    requestLocal: RequestLocal
+  ): CompletableFuture[Unit] = {
     val offsetDeleteRequest = request.body[OffsetDeleteRequest]
-    val groupId = offsetDeleteRequest.data.groupId
 
-    if (authHelper.authorize(request.context, DELETE, GROUP, groupId)) {
-      val topics = offsetDeleteRequest.data.topics.asScala
-      val authorizedTopics = authHelper.filterByAuthorized(request.context, 
READ, TOPIC, topics)(_.name)
+    if (!authHelper.authorize(request.context, DELETE, GROUP, 
offsetDeleteRequest.data.groupId)) {
+      requestHelper.sendMaybeThrottle(request, 
offsetDeleteRequest.getErrorResponse(Errors.GROUP_AUTHORIZATION_FAILED.exception))
+      CompletableFuture.completedFuture[Unit](())
+    } else {
+      val authorizedTopics = authHelper.filterByAuthorized(
+        request.context,
+        READ,
+        TOPIC,
+        offsetDeleteRequest.data.topics.asScala
+      )(_.name)
+
+      val responseBuilder = new OffsetDeleteResponse.Builder
+      val authorizedTopicPartitions = new 
OffsetDeleteRequestData.OffsetDeleteRequestTopicCollection()
+      offsetDeleteRequest.data.topics.forEach { topic =>
+        if (!authorizedTopics.contains(topic.name)) {
+          // If the topic is not authorized, we add the topic and all its 
partitions
+          // to the response with TOPIC_AUTHORIZATION_FAILED.
+          
responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, 
Errors.TOPIC_AUTHORIZATION_FAILED)
+        } else if (!metadataCache.contains(topic.name)) {
+          // If the topic is unknown, we add the topic and all its partitions
+          // to the response with UNKNOWN_TOPIC_OR_PARTITION.
+          
responseBuilder.addPartitions[OffsetDeleteRequestData.OffsetDeleteRequestPartition](
+            topic.name, topic.partitions, _.partitionIndex, 
Errors.UNKNOWN_TOPIC_OR_PARTITION)
+        } else {
+          // Otherwise, we check all partitions to ensure that they all exist.
+          val topicWithValidPartitions = new 
OffsetDeleteRequestData.OffsetDeleteRequestTopic().setName(topic.name)
 
-      val topicPartitionErrors = mutable.Map[TopicPartition, Errors]()
-      val topicPartitions = mutable.ArrayBuffer[TopicPartition]()
+          topic.partitions.forEach { partition =>

Review Comment:
   I think that we did here: 
https://github.com/apache/kafka/pull/12902/files#diff-cc056b4960ededba37a438b1454f0f3c5ff5e8ad5e6d2ec9a08e813ca056ffebL3130.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to