mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r470011774



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -892,136 +894,175 @@ class KafkaApis(val requestChannel: RequestChannel,
   def handleListOffsetRequest(request: RequestChannel.Request): Unit = {
     val version = request.header.apiVersion
 
-    val mergedResponseMap = if (version == 0)
+    val topics = if (version == 0)
       handleListOffsetRequestV0(request)
     else
       handleListOffsetRequestV1AndAbove(request)
 
-    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(requestThrottleMs, mergedResponseMap.asJava))
+    sendResponseMaybeThrottle(request, requestThrottleMs => new 
ListOffsetResponse(new ListOffsetResponseData()
+      .setThrottleTimeMs(requestThrottleMs)
+      .setTopics(topics.asJava)))
   }
 
-  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
Map[TopicPartition, ListOffsetResponse.PartitionData] = {
+  private def handleListOffsetRequestV0(request : RequestChannel.Request) : 
List[ListOffsetTopicResponse] = {
     val correlationId = request.header.correlationId
     val clientId = request.header.clientId
     val offsetRequest = request.body[ListOffsetRequest]
 
-    val partitionTimestamps = offsetRequest.partitionTimestamps.asScala
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, partitionTimestamps)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED, 
Seq.empty[JLong].asJava)
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      try {
-        val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
-          topicPartition = topicPartition,
-          timestamp = partitionData.timestamp,
-          maxNumOffsets = partitionData.maxNumOffsets,
-          isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
-          fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
-        (topicPartition, new ListOffsetResponse.PartitionData(Errors.NONE, 
offsets.map(JLong.valueOf).asJava))
-      } catch {
-        // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
-        // are typically transient and there is no value in logging the entire 
stack trace for the same
-        case e @ (_ : UnknownTopicOrPartitionException |
-                  _ : NotLeaderForPartitionException |
-                  _ : KafkaStorageException) =>
-          debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
-            correlationId, clientId, topicPartition, e.getMessage))
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-        case e: Throwable =>
-          error("Error while responding to offset request", e)
-          (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.forException(e), List[JLong]().asJava))
-      }
-    }
-    responseMap ++ unauthorizedResponseStatus
-  }
-
-  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData] 
= {
-    val correlationId = request.header.correlationId
-    val clientId = request.header.clientId
-    val offsetRequest = request.body[ListOffsetRequest]
-
-    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionMapByAuthorized(request.context,
-      DESCRIBE, TOPIC, offsetRequest.partitionTimestamps.asScala)(_.topic)
-
-    val unauthorizedResponseStatus = unauthorizedRequestInfo.map { case (k, _) 
=>
-      k -> new 
ListOffsetResponse.PartitionData(Errors.TOPIC_AUTHORIZATION_FAILED,
-        ListOffsetResponse.UNKNOWN_TIMESTAMP,
-        ListOffsetResponse.UNKNOWN_OFFSET,
-        Optional.empty())
-    }
-
-    val responseMap = authorizedRequestInfo.map { case (topicPartition, 
partitionData) =>
-      if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
-        debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
-            s"failed because the partition is duplicated in the request.")
-        (topicPartition, new 
ListOffsetResponse.PartitionData(Errors.INVALID_REQUEST,
-          ListOffsetResponse.UNKNOWN_TIMESTAMP,
-          ListOffsetResponse.UNKNOWN_OFFSET,
-          Optional.empty()))
-      } else {
-
-        def buildErrorResponse(e: Errors): (TopicPartition, 
ListOffsetResponse.PartitionData) = {
-          (topicPartition, new ListOffsetResponse.PartitionData(
-            e,
-            ListOffsetResponse.UNKNOWN_TIMESTAMP,
-            ListOffsetResponse.UNKNOWN_OFFSET,
-            Optional.empty()))
-        }
-
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setOldStyleOffsets(Seq.empty[JLong].asJava)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
         try {
-          val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
-          val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
-          val isolationLevelOpt = if (isClientRequest)
-            Some(offsetRequest.isolationLevel)
-          else
-            None
-
-          val foundOpt = replicaManager.fetchOffsetForTimestamp(topicPartition,
-            partitionData.timestamp,
-            isolationLevelOpt,
-            partitionData.currentLeaderEpoch,
-            fetchOnlyFromLeader)
-
-          val response = foundOpt match {
-            case Some(found) =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
found.timestamp, found.offset, found.leaderEpoch)
-            case None =>
-              new ListOffsetResponse.PartitionData(Errors.NONE, 
ListOffsetResponse.UNKNOWN_TIMESTAMP,
-                ListOffsetResponse.UNKNOWN_OFFSET, Optional.empty())
-          }
-          (topicPartition, response)
+          val offsets = replicaManager.legacyFetchOffsetsForTimestamp(
+            topicPartition = topicPartition,
+            timestamp = partition.timestamp,
+            maxNumOffsets = partition.maxNumOffsets,
+            isFromConsumer = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID,
+            fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID)
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.NONE.code)
+            .setOldStyleOffsets(offsets.map(JLong.valueOf).asJava)
         } catch {
-          // NOTE: These exceptions are special cased since these error 
messages are typically transient or the client
-          // would have received a clear exception and there is no value in 
logging the entire stack trace for the same
+          // NOTE: UnknownTopicOrPartitionException and 
NotLeaderForPartitionException are special cased since these error messages
+          // are typically transient and there is no value in logging the 
entire stack trace for the same
           case e @ (_ : UnknownTopicOrPartitionException |
                     _ : NotLeaderForPartitionException |
-                    _ : UnknownLeaderEpochException |
-                    _ : FencedLeaderEpochException |
-                    _ : KafkaStorageException |
-                    _ : UnsupportedForMessageFormatException) =>
-            debug(s"Offset request with correlation id $correlationId from 
client $clientId on " +
-                s"partition $topicPartition failed due to ${e.getMessage}")
-            buildErrorResponse(Errors.forException(e))
-
-          // Only V5 and newer ListOffset calls should get OFFSET_NOT_AVAILABLE
-          case e: OffsetNotAvailableException =>
-            if(request.header.apiVersion >= 5) {
-              buildErrorResponse(Errors.forException(e))
-            } else {
-              buildErrorResponse(Errors.LEADER_NOT_AVAILABLE)
-            }
-
+                    _ : KafkaStorageException) =>
+            debug("Offset request with correlation id %d from client %s on 
partition %s failed due to %s".format(
+              correlationId, clientId, topicPartition, e.getMessage))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
           case e: Throwable =>
             error("Error while responding to offset request", e)
-            buildErrorResponse(Errors.forException(e))
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(Errors.forException(e).code)
+              .setOldStyleOffsets(List[JLong]().asJava)
+        }
+      }
+      new 
ListOffsetTopicResponse().setName(topic.name).setPartitions(responsePartitions.asJava)
+    }
+    (responseTopics ++ unauthorizedResponseStatus).toList
+  }
+
+  private def handleListOffsetRequestV1AndAbove(request : 
RequestChannel.Request): List[ListOffsetTopicResponse] = {
+    val correlationId = request.header.correlationId
+    val clientId = request.header.clientId
+    val offsetRequest = request.body[ListOffsetRequest]
+
+    val (authorizedRequestInfo, unauthorizedRequestInfo) = 
partitionSeqByAuthorized(request.context, DESCRIBE, TOPIC, 
offsetRequest.topics.asScala.toSeq)(_.name)
+
+    val unauthorizedResponseStatus = unauthorizedRequestInfo.map(topic =>
+      new ListOffsetTopicResponse()
+        .setName(topic.name)
+        .setPartitions(topic.partitions.asScala.map(partition =>
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)).asJava)
+    )
+
+    val responseTopics = authorizedRequestInfo.map { topic =>
+      val responsePartitions = topic.partitions.asScala.map { partition =>
+        val topicPartition = new TopicPartition(topic.name, 
partition.partitionIndex)
+        if (offsetRequest.duplicatePartitions.contains(topicPartition)) {
+          debug(s"OffsetRequest with correlation id $correlationId from client 
$clientId on partition $topicPartition " +
+              s"failed because the partition is duplicated in the request.")
+          new ListOffsetPartitionResponse()
+            .setPartitionIndex(partition.partitionIndex)
+            .setErrorCode(Errors.INVALID_REQUEST.code)
+            .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+            .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+        } else {
+  
+          def buildErrorResponse(e: Errors): ListOffsetPartitionResponse = {
+            new ListOffsetPartitionResponse()
+              .setPartitionIndex(partition.partitionIndex)
+              .setErrorCode(e.code)
+              .setTimestamp(ListOffsetResponse.UNKNOWN_TIMESTAMP)
+              .setOffset(ListOffsetResponse.UNKNOWN_OFFSET)
+          }
+  
+          try {
+            val fetchOnlyFromLeader = offsetRequest.replicaId != 
ListOffsetRequest.DEBUGGING_REPLICA_ID
+            val isClientRequest = offsetRequest.replicaId == 
ListOffsetRequest.CONSUMER_REPLICA_ID
+            val isolationLevelOpt = if (isClientRequest)
+              Some(offsetRequest.isolationLevel)
+            else
+              None
+  
+            val foundOpt = 
replicaManager.fetchOffsetForTimestamp(topicPartition,
+              partition.timestamp,
+              isolationLevelOpt,
+              if (partition.currentLeaderEpoch == 
ListOffsetResponse.UNKNOWN_EPOCH) Optional.empty() else 
Optional.of(partition.currentLeaderEpoch),

Review comment:
       Yes let's keep Optional here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to